I’m building a Go backend to control IoT motors via MQTT. Each device activation is handled asynchronously by a goroutine. Here’s the simplified flow:
A client POSTs /activate-device with device_id and duration.
The request is sent into an unbuffered channel:
deviceQueue <- &DeviceRequest{UserID: userID, DeviceID: id, Duration: duration}
for req := range deviceQueue {
// Publish ON to MQTT
time.Sleep(req.Duration)
// Publish OFF to MQTT
}
Problem: I want an admin endpoint /force-shutdown to immediately stop any ongoing device activation—even if the goroutine is sleeping.
I initially tried context.WithCancel
per device, but:
Questions:
I’ve tried:
For context, the full repo is available here. Feel free to explore the implementation and experiment with it. I’m looking for guidance on best practices for allowing an admin to cancel long-running MQTT device activations in Go.
I don't know if this is an idiomatic go way, but this is my proposal: Create a thread-safe map to store the cancel
function for each currently active device
When a request to activate comes in, then
create a new context.WithCancel
store the returned cancel
func in map (you can use device_id as key)
launch a new goroutine for this activation passing the new context
Now the worker goroutine -> use select
to wait on two channels
timer channel time.After(duration)
context's cancellation channel ctx.Done()
Finally when an admin calls this endpoint
look up the device_id
in the map
if found then call its cancel()
func. This will close ctx.Done()
in the worker goroutine
worker goroutine's select
will execute shutdown logic
EDIT: Maybe something like this?
package main
import (
"context"
"fmt"
"sync"
"time"
)
type DevManager struct {
mu sync.Mutex
activeDevices map[string]context.CancelFunc
}
func NewDeviceManager() *DevManager {
return &DevManager{
activeDevices: make(map[string]context.CancelFunc),
}
}
func (m *DevManager) ActivateDevice(deviceID string, duration time.Duration) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.activeDevices[deviceID]; exists {
return fmt.Errorf("dev %s is already active", deviceID)
}
ctx, cancel := context.WithCancel(context.Background())
m.activeDevices[deviceID] = cancel
go m.runDeviceWorker(ctx, deviceID, duration)
fmt.Printf("dev %s activated for %s\n", deviceID, duration)
return nil
}
func (m *DevManager) removeDevice(deviceID string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.activeDevices, deviceID)
}
func (m *DevManager) runDeviceWorker(ctx context.Context, deviceID string, duration time.Duration) {
defer m.removeDevice(deviceID)
defer fmt.Printf("Worker for dev %s finished.\n", deviceID)
//logic code, MQTT?
select {
case <-time.After(duration):
// Some logic after duration
case <-ctx.Done():
// force-cancelled
}
}
func (m *DevManager) ForceShutdown(deviceID string) error {
m.mu.Lock()
defer m.mu.Unlock()
cancel, exists := m.activeDevices[deviceID]
if !exists {
return fmt.Errorf("no active operation found for dev %s", deviceID)
}
cancel()
fmt.Printf("Shutdown signal sent to dev %s\n", deviceID)
return nil
}
func main() {
// usage might be like this
manager := NewDeviceManager()
manager.ActivateDevice("dev1", 2*time.Second)
manager.ActivateDevice("dev2", 10*time.Second)
manager.ForceShutdown("dev2") // can be dev1
}