swiftcore-bluetoothcombine

Combine/CoreBluetooth: How to make `.scanForPeripherals`/`.stop` run as a side effect as subscribers accumulate?


I'm working right now with a wrapper for CoreBluetooth using Combine So I'm trying to wrap my head around managing side effects dependent on the creation of a Publisher. Basically whenever someone sinks into the publisher generated by scan() (it can be multiple subscribers) I want to be able to trigger .scanForPeripherals on CBCentralManager as the subscriber count increases and trigger .stopScan on CBCentralManager as the subscriber count decreases

The signature I kinda want to achieve is similar to this:

extension DispatchQueue {
  static var bluetoothQueue: DispatchQueue {
     return DispatchQueue(label: "com.mycompany.bluetootQueue", qos: .utility, attributes: [])
  }
}

final class Bluetooth {
  enum Error: Swift.Error {
    case poweredOff
    case unsupported
    case unauthorized(CBManagerAuthorization)
  }

  private struct DiscoverResult {
    let peripheral: CBPeripheral
    let advertisementData: [String: Any]
    let rssi: NSNumber
  }

  private final class DelegateWrapper: NSObject, CBCentralManagerDelegate {
     private var peripherals: [UUID: DiscoverResult] = [:]
     
     var state = CurrentValueSubject<CBManagerState, Never>(.unknown)
     var didDiscoverPeripheral = CurrentValueSubject<[UUID: DiscoverResult], Never>([:])
     
     func centralManagerDidUpdateState(_ central: CBCentralManager) {
       state.send(central.state)
     }

     func centralManager(_ central: CBCentralManager, didDiscover peripheral: CBPeripheral, advertisementData: [String: Any], rssi RSSI: NSNumber) {
       let discoverResult = DiscoverResult(
         peripheral: peripheral,
         advertisementData: advertisementData,
         rssi: RSSI
       )
       peripherals[peripheral.identifier] = discoverResult
       
       didDiscoverPeripheral.send(peripherals)
     }
  }

  private var delegateWrapper: DelegateWrapper
  private var central: CBCentralManager

  init() {
     let delegateWrapper = DelegateWrapper()
     let central = CBCentralManager(delegate: delegateWrapper, queue: .bluetoothQueue)

     self.delegateWrapper = delegateWrapper
     self.central = central
  }

  func scan() -> AnyPublisher<[UUID: DiscoverResult], Bluetooth.Error> {
     let isBluetoothReadyUpstream = delegateWrapper.state
       .map { state -> AnyPublisher<Bool, Error> in
         switch state {
           case .poweredOn: Just(true).setFailureType(to: Error.self).eraseToAnyPublisher()
           case .unknown, .resetting: Just(false).setFailureType(to: Error.self).eraseToAnyPublisher()
           case .poweredOff: Fail<Bool, Error>(error: Error.poweredOff).eraseToAnyPublisher()
           case .unsupported: Fail<Bool, Error>(error: Error.unsupported).eraseToAnyPublisher()
           case .unauthorized:
             Fail<Bool, Error>(error: .unauthorized(CBCentralManager.authorization)).eraseToAnyPublisher()
           @unknown default: Just(false).setFailureType(to: Error.self).eraseToAnyPublisher()
         }
       }
       .switchToLatest()
     
     let didDiscoverUpstream = delegateWrapper.didDiscoverPeripheral
       .setFailureType(to: Bluetooth.Error.self)
       .combineLatest(isBluetoothReadyUpstream)
       .map { devices, ready -> AnyPublisher<[UUID: DiscoverResult], Error> in 
         guard ready else { return Empty().eraseToAnyPublisher() }
         return Just(devices).setFailureType(to: BluetoothError.self).eraseToAnyPublisher()
       }
       .switchToLatest()

     // QUESTION: how to return from here such as once a subscriber enters it triggers central.scanForPeripherals(...) in the central IF bluetooth is ready and central.stopScan() as we reach zero subscribers 
  }
}

It's been a while since I last done anything Combine so I'm kinda rusty on it :/

I used .handleEvents before but the problem is the multiple calls to .scanForPeripherals or .stop earlier than the subscription count.

Not sure if I should use a custom publisher for this (if anyone has any guides on it)


Solution

  • I played with your problem last night and the code I came up with is below. I created a new application project called ScanOnDemand and this is the source of the main application. To use it you have to enable Bluetooth and add an entry in the info.plist.

    It's a lot of code, I know, but it does show a custom publisher that does what you asked about. It has not been exhaustively tested, may and not be 100% thread safe – use at your own risk.

    The main player is the BluetoothPeripheralsPublisher which is a publisher of PeripheralDiscovery structures (those structures simply capture information about discovered peripherals).

    As subscribers are added, and peripherals are discovered, the publisher's adjustScanForDemand function is called. It looks through the list of subscribers and if any of them are still waiting for peripherals, it makes sure a scan is running. When no more peripherals are demanded, it shuts the scan down.

    BluetoothCentralManager is a wrapper for a CBCentralManager that creates and exposes the publisher (as well as a publisher of Bluetooth state). The rest is just a sample application that waits for Bluetooth to be available, then runs a scan for 3 seconds using the publisher. When the subscriber is cancelled, and there is no demand, the scan stops.

    import SwiftUI
    import CoreBluetooth
    import Combine
    
    struct PeripheralDiscovery {
      let peripheral: CBPeripheral
      let advertisementData: [String : Any]
      let rssi: Int
    }
    
    extension CBManagerState: CustomDebugStringConvertible {
      public var debugDescription: String {
        switch self.rawValue {
        case CBManagerState.unknown.rawValue: "unknowmn"
        case CBManagerState.resetting.rawValue: "resetting"
        case CBManagerState.unsupported.rawValue: "unsupported"
        case CBManagerState.unauthorized.rawValue: "unauthorized"
        case CBManagerState.poweredOff.rawValue: "poweredOff"
        case CBManagerState.poweredOn.rawValue: "poweredOn"
        default: "Totally Unrecognzied"
        }
      }
    }
    
    class BluetoothPeripheralsPublisher: Publisher {
      typealias Output = PeripheralDiscovery
      typealias Failure = Error
    
      private let cbCentralManager: CBCentralManager
      private var subscriptions: [CombineIdentifier: AskForPeripherals] = [:]
    
      func adjustScanForDemand() {
        let totalDemand = subscriptions.reduce(Subscribers.Demand.none) { totalDemand, pair in
          let subscription = pair.value
          return totalDemand + subscription.awaiting
        }
    
        debugPrint("Total demand is \(String(describing: totalDemand))")
    
        if (totalDemand == .none) && cbCentralManager.isScanning {
          debugPrint("Stopping Scan")
          cbCentralManager.stopScan()
        }
    
        if (totalDemand > .none) && !cbCentralManager.isScanning {
          // Probably want this publisher to take an array of services to scan for instead
          // of using nil here.
          debugPrint("Starting Scan")
          cbCentralManager.scanForPeripherals(withServices: nil)
        }
      }
    
      init(manager: CBCentralManager) {
        cbCentralManager = manager
      }
    
      class AskForPeripherals: Subscription {
        let combineIdentifier = CombineIdentifier()
        private let publisher: BluetoothPeripheralsPublisher
        private let subscriber: any Subscriber<PeripheralDiscovery, any Error>
        fileprivate var awaiting: Subscribers.Demand = .none
    
        init(publisher: BluetoothPeripheralsPublisher,
             subscriber: any Subscriber<PeripheralDiscovery, any Error>) {
          self.publisher = publisher
          self.subscriber = subscriber
        }
    
        func request(_ demand: Subscribers.Demand) {
          self.awaiting += demand
          publisher.adjustScanForDemand()
        }
    
        func cancel() {
          publisher.remove(subscriber: self)
          subscriber.receive(completion: .finished)
        }
    
        func reportPeripheral(discovery: PeripheralDiscovery) {
          if awaiting != .none {
            self.awaiting -= 1
            subscriber.receive(discovery)
          }
        }
      }
    
      func receive<S>(subscriber: S) where S : Subscriber, any Failure == S.Failure, PeripheralDiscovery == S.Input {
    
        let askForPeripherals = AskForPeripherals(publisher: self, subscriber: subscriber)
        subscriptions[askForPeripherals.combineIdentifier] = askForPeripherals
        subscriber.receive(subscription: askForPeripherals)
    
        adjustScanForDemand()
      }
    
      func remove(subscriber: AskForPeripherals) {
        subscriptions.removeValue(forKey: subscriber.combineIdentifier)
        adjustScanForDemand()
      }
    
      func reportPeripheral(discovery: PeripheralDiscovery) {
        subscriptions.forEach { pair in
          let subscription = pair.value
          subscription.reportPeripheral(discovery: discovery)
        }
    
        adjustScanForDemand()
      }
    }
    
    class BluetoothCentralManager: NSObject {
      let bluetoothState = CurrentValueSubject<CBManagerState, Never>(.unknown)
      let peripherals: BluetoothPeripheralsPublisher
    
      private let bluetoothQueue = DispatchQueue(label: "Bluetooth queue")
      private let centralManager: CBCentralManager
    
      override init() {
        centralManager = CBCentralManager(delegate: nil, queue: bluetoothQueue)
        peripherals = BluetoothPeripheralsPublisher(manager: centralManager)
    
        super.init()
    
        centralManager.delegate = self
        bluetoothState.send(centralManager.state)
      }
    }
    
    extension BluetoothCentralManager: CBCentralManagerDelegate {
      func centralManagerDidUpdateState(_ central: CBCentralManager) {
        bluetoothState.send(central.state)
      }
    
      func centralManager(_ central: CBCentralManager,
                          didDiscover peripheral: CBPeripheral,
                          advertisementData: [String : Any],
                          rssi RSSI: NSNumber) {
    
        let discovery = PeripheralDiscovery(
          peripheral: peripheral,
          advertisementData: advertisementData,
          rssi: RSSI.intValue)
    
        peripherals.reportPeripheral(discovery: discovery)
      }
    }
    
    @main
    class ScanOnDemandApp: App {
      let bluetoothManager = BluetoothCentralManager()
      var subscriptions = Set<AnyCancellable>()
    
      required init() {
        bluetoothManager
          .bluetoothState
          .sink { [weak self] managerState in
            debugPrint("Manager State is \(managerState)")
            if managerState == .poweredOn {
              self?.tryScan()
            }
          }
          .store(in: &subscriptions)
    
      }
      var body: some Scene {
        WindowGroup {
          ContentView()
        }
      }
    
      func tryScan() {
        let cancellable = bluetoothManager
          .peripherals
          .sink { completion in
            switch completion {
            case .finished:
              debugPrint("Stopped receiving discovery")
            case .failure(let error):
              debugPrint("Stopped receiving discovery with error \(error)")
            }
          } receiveValue: { discovery in
            debugPrint("Discovered peripheral \(discovery.peripheral.name ?? "Unknown name")")
          }
    
        DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
          cancellable.cancel()
        }
      }
    }