goconcurrencymqttchannelgoroutine

How to allow admin to cancel a long-running MQTT device activation in Go when using unbuffered channels?


I’m building a Go backend to control IoT motors via MQTT. Each device activation is handled asynchronously by a goroutine. Here’s the simplified flow:

  1. A client POSTs /activate-device with device_id and duration.

  2. The request is sent into an unbuffered channel:

deviceQueue <- &DeviceRequest{UserID: userID, DeviceID: id, Duration: duration}
  1. A goroutine reads from the channel and turns the device ON, sleeps for duration, then turns it OFF:
for req := range deviceQueue {
    // Publish ON to MQTT
    time.Sleep(req.Duration)
    // Publish OFF to MQTT
}

Problem: I want an admin endpoint /force-shutdown to immediately stop any ongoing device activation—even if the goroutine is sleeping.

I initially tried context.WithCancel per device, but:

Questions:

  1. What is a clean, idiomatic Go approach to allow an admin to cancel an ongoing device activation when using unbuffered channels?
  2. Should I restructure this as an actor per device pattern, a message bus, or something else?
  3. Any references, blogs, or patterns that handle real-time cancellation of long-running operations in Go while interacting with external systems (MQTT, hardware)?

I’ve tried:

  1. Using context.WithCancel in the handler.
  2. Considering buffered channels for queueing but want to understand the cancellation problem first.

For context, the full repo is available here. Feel free to explore the implementation and experiment with it. I’m looking for guidance on best practices for allowing an admin to cancel long-running MQTT device activations in Go.


Solution

  • I don't know if this is an idiomatic go way, but this is my proposal: Create a thread-safe map to store the cancel function for each currently active device

    1. When a request to activate comes in, then

      • create a new context.WithCancel

      • store the returned cancel func in map (you can use device_id as key)

      • launch a new goroutine for this activation passing the new context

    2. Now the worker goroutine -> use select to wait on two channels

      • timer channel time.After(duration)

      • context's cancellation channel ctx.Done()

    3. Finally when an admin calls this endpoint

      • look up the device_id in the map

      • if found then call its cancel() func. This will close ctx.Done() in the worker goroutine

      • worker goroutine's select will execute shutdown logic

    EDIT: Maybe something like this?

    package main
    
    import (
        "context"
        "fmt"
        "sync"
        "time"
    )
    
    type DevManager struct {
        mu            sync.Mutex
        activeDevices map[string]context.CancelFunc
    }
    
    func NewDeviceManager() *DevManager {
        return &DevManager{
            activeDevices: make(map[string]context.CancelFunc),
        }
    }
    
    func (m *DevManager) ActivateDevice(deviceID string, duration time.Duration) error {
        m.mu.Lock()
        defer m.mu.Unlock()
    
        if _, exists := m.activeDevices[deviceID]; exists {
            return fmt.Errorf("dev %s is already active", deviceID)
        }
    
        ctx, cancel := context.WithCancel(context.Background())
        m.activeDevices[deviceID] = cancel
    
        go m.runDeviceWorker(ctx, deviceID, duration)
    
        fmt.Printf("dev %s activated for %s\n", deviceID, duration)
        return nil
    }
    
    func (m *DevManager) removeDevice(deviceID string) {
        m.mu.Lock()
        defer m.mu.Unlock()
        delete(m.activeDevices, deviceID)
    }
    
    func (m *DevManager) runDeviceWorker(ctx context.Context, deviceID string, duration time.Duration) {
        defer m.removeDevice(deviceID)
        defer fmt.Printf("Worker for dev %s finished.\n", deviceID)
    
        //logic code, MQTT?
    
        select {
        case <-time.After(duration):
            // Some logic after duration
        case <-ctx.Done():
            // force-cancelled
        }
    }
    
    func (m *DevManager) ForceShutdown(deviceID string) error {
        m.mu.Lock()
        defer m.mu.Unlock()
    
        cancel, exists := m.activeDevices[deviceID]
        if !exists {
            return fmt.Errorf("no active operation found for dev %s", deviceID)
        }
    
        cancel()
    
        fmt.Printf("Shutdown signal sent to dev %s\n", deviceID)
        return nil
    }
    
    func main() {
            // usage might be like this
        manager := NewDeviceManager()
        manager.ActivateDevice("dev1", 2*time.Second)
        manager.ActivateDevice("dev2", 10*time.Second)
        manager.ForceShutdown("dev2") // can be dev1
    }