I'm working right now with a wrapper for CoreBluetooth using Combine
So I'm trying to wrap my head around managing side effects dependent on the creation of a Publisher.
Basically whenever someone sinks into the publisher generated by scan()
(it can be multiple subscribers) I want to be able to trigger .scanForPeripherals
on CBCentralManager
as the subscriber count increases and trigger .stopScan
on CBCentralManager
as the subscriber count decreases
The signature I kinda want to achieve is similar to this:
extension DispatchQueue {
static var bluetoothQueue: DispatchQueue {
return DispatchQueue(label: "com.mycompany.bluetootQueue", qos: .utility, attributes: [])
}
}
final class Bluetooth {
enum Error: Swift.Error {
case poweredOff
case unsupported
case unauthorized(CBManagerAuthorization)
}
private struct DiscoverResult {
let peripheral: CBPeripheral
let advertisementData: [String: Any]
let rssi: NSNumber
}
private final class DelegateWrapper: NSObject, CBCentralManagerDelegate {
private var peripherals: [UUID: DiscoverResult] = [:]
var state = CurrentValueSubject<CBManagerState, Never>(.unknown)
var didDiscoverPeripheral = CurrentValueSubject<[UUID: DiscoverResult], Never>([:])
func centralManagerDidUpdateState(_ central: CBCentralManager) {
state.send(central.state)
}
func centralManager(_ central: CBCentralManager, didDiscover peripheral: CBPeripheral, advertisementData: [String: Any], rssi RSSI: NSNumber) {
let discoverResult = DiscoverResult(
peripheral: peripheral,
advertisementData: advertisementData,
rssi: RSSI
)
peripherals[peripheral.identifier] = discoverResult
didDiscoverPeripheral.send(peripherals)
}
}
private var delegateWrapper: DelegateWrapper
private var central: CBCentralManager
init() {
let delegateWrapper = DelegateWrapper()
let central = CBCentralManager(delegate: delegateWrapper, queue: .bluetoothQueue)
self.delegateWrapper = delegateWrapper
self.central = central
}
func scan() -> AnyPublisher<[UUID: DiscoverResult], Bluetooth.Error> {
let isBluetoothReadyUpstream = delegateWrapper.state
.map { state -> AnyPublisher<Bool, Error> in
switch state {
case .poweredOn: Just(true).setFailureType(to: Error.self).eraseToAnyPublisher()
case .unknown, .resetting: Just(false).setFailureType(to: Error.self).eraseToAnyPublisher()
case .poweredOff: Fail<Bool, Error>(error: Error.poweredOff).eraseToAnyPublisher()
case .unsupported: Fail<Bool, Error>(error: Error.unsupported).eraseToAnyPublisher()
case .unauthorized:
Fail<Bool, Error>(error: .unauthorized(CBCentralManager.authorization)).eraseToAnyPublisher()
@unknown default: Just(false).setFailureType(to: Error.self).eraseToAnyPublisher()
}
}
.switchToLatest()
let didDiscoverUpstream = delegateWrapper.didDiscoverPeripheral
.setFailureType(to: Bluetooth.Error.self)
.combineLatest(isBluetoothReadyUpstream)
.map { devices, ready -> AnyPublisher<[UUID: DiscoverResult], Error> in
guard ready else { return Empty().eraseToAnyPublisher() }
return Just(devices).setFailureType(to: BluetoothError.self).eraseToAnyPublisher()
}
.switchToLatest()
// QUESTION: how to return from here such as once a subscriber enters it triggers central.scanForPeripherals(...) in the central IF bluetooth is ready and central.stopScan() as we reach zero subscribers
}
}
It's been a while since I last done anything Combine so I'm kinda rusty on it :/
I used .handleEvents
before but the problem is the multiple calls to .scanForPeripherals
or .stop
earlier than the subscription count.
Not sure if I should use a custom publisher for this (if anyone has any guides on it)
I played with your problem last night and the code I came up with is below. I created a new application project called ScanOnDemand
and this is the source of the main application. To use it you have to enable Bluetooth and add an entry in the info.plist.
It's a lot of code, I know, but it does show a custom publisher that does what you asked about. It has not been exhaustively tested, may and not be 100% thread safe – use at your own risk.
The main player is the BluetoothPeripheralsPublisher
which is a publisher of PeripheralDiscovery
structures (those structures simply capture information about discovered peripherals).
As subscribers are added, and peripherals are discovered, the publisher's adjustScanForDemand
function is called. It looks through the list of subscribers and if any of them are still waiting for peripherals, it makes sure a scan is running. When no more peripherals are demanded, it shuts the scan down.
BluetoothCentralManager
is a wrapper for a CBCentralManager
that creates and exposes the publisher (as well as a publisher of Bluetooth state). The rest is just a sample application that waits for Bluetooth to be available, then runs a scan for 3 seconds using the publisher. When the subscriber is cancelled, and there is no demand, the scan stops.
import SwiftUI
import CoreBluetooth
import Combine
struct PeripheralDiscovery {
let peripheral: CBPeripheral
let advertisementData: [String : Any]
let rssi: Int
}
extension CBManagerState: CustomDebugStringConvertible {
public var debugDescription: String {
switch self.rawValue {
case CBManagerState.unknown.rawValue: "unknowmn"
case CBManagerState.resetting.rawValue: "resetting"
case CBManagerState.unsupported.rawValue: "unsupported"
case CBManagerState.unauthorized.rawValue: "unauthorized"
case CBManagerState.poweredOff.rawValue: "poweredOff"
case CBManagerState.poweredOn.rawValue: "poweredOn"
default: "Totally Unrecognzied"
}
}
}
class BluetoothPeripheralsPublisher: Publisher {
typealias Output = PeripheralDiscovery
typealias Failure = Error
private let cbCentralManager: CBCentralManager
private var subscriptions: [CombineIdentifier: AskForPeripherals] = [:]
func adjustScanForDemand() {
let totalDemand = subscriptions.reduce(Subscribers.Demand.none) { totalDemand, pair in
let subscription = pair.value
return totalDemand + subscription.awaiting
}
debugPrint("Total demand is \(String(describing: totalDemand))")
if (totalDemand == .none) && cbCentralManager.isScanning {
debugPrint("Stopping Scan")
cbCentralManager.stopScan()
}
if (totalDemand > .none) && !cbCentralManager.isScanning {
// Probably want this publisher to take an array of services to scan for instead
// of using nil here.
debugPrint("Starting Scan")
cbCentralManager.scanForPeripherals(withServices: nil)
}
}
init(manager: CBCentralManager) {
cbCentralManager = manager
}
class AskForPeripherals: Subscription {
let combineIdentifier = CombineIdentifier()
private let publisher: BluetoothPeripheralsPublisher
private let subscriber: any Subscriber<PeripheralDiscovery, any Error>
fileprivate var awaiting: Subscribers.Demand = .none
init(publisher: BluetoothPeripheralsPublisher,
subscriber: any Subscriber<PeripheralDiscovery, any Error>) {
self.publisher = publisher
self.subscriber = subscriber
}
func request(_ demand: Subscribers.Demand) {
self.awaiting += demand
publisher.adjustScanForDemand()
}
func cancel() {
publisher.remove(subscriber: self)
subscriber.receive(completion: .finished)
}
func reportPeripheral(discovery: PeripheralDiscovery) {
if awaiting != .none {
self.awaiting -= 1
subscriber.receive(discovery)
}
}
}
func receive<S>(subscriber: S) where S : Subscriber, any Failure == S.Failure, PeripheralDiscovery == S.Input {
let askForPeripherals = AskForPeripherals(publisher: self, subscriber: subscriber)
subscriptions[askForPeripherals.combineIdentifier] = askForPeripherals
subscriber.receive(subscription: askForPeripherals)
adjustScanForDemand()
}
func remove(subscriber: AskForPeripherals) {
subscriptions.removeValue(forKey: subscriber.combineIdentifier)
adjustScanForDemand()
}
func reportPeripheral(discovery: PeripheralDiscovery) {
subscriptions.forEach { pair in
let subscription = pair.value
subscription.reportPeripheral(discovery: discovery)
}
adjustScanForDemand()
}
}
class BluetoothCentralManager: NSObject {
let bluetoothState = CurrentValueSubject<CBManagerState, Never>(.unknown)
let peripherals: BluetoothPeripheralsPublisher
private let bluetoothQueue = DispatchQueue(label: "Bluetooth queue")
private let centralManager: CBCentralManager
override init() {
centralManager = CBCentralManager(delegate: nil, queue: bluetoothQueue)
peripherals = BluetoothPeripheralsPublisher(manager: centralManager)
super.init()
centralManager.delegate = self
bluetoothState.send(centralManager.state)
}
}
extension BluetoothCentralManager: CBCentralManagerDelegate {
func centralManagerDidUpdateState(_ central: CBCentralManager) {
bluetoothState.send(central.state)
}
func centralManager(_ central: CBCentralManager,
didDiscover peripheral: CBPeripheral,
advertisementData: [String : Any],
rssi RSSI: NSNumber) {
let discovery = PeripheralDiscovery(
peripheral: peripheral,
advertisementData: advertisementData,
rssi: RSSI.intValue)
peripherals.reportPeripheral(discovery: discovery)
}
}
@main
class ScanOnDemandApp: App {
let bluetoothManager = BluetoothCentralManager()
var subscriptions = Set<AnyCancellable>()
required init() {
bluetoothManager
.bluetoothState
.sink { [weak self] managerState in
debugPrint("Manager State is \(managerState)")
if managerState == .poweredOn {
self?.tryScan()
}
}
.store(in: &subscriptions)
}
var body: some Scene {
WindowGroup {
ContentView()
}
}
func tryScan() {
let cancellable = bluetoothManager
.peripherals
.sink { completion in
switch completion {
case .finished:
debugPrint("Stopped receiving discovery")
case .failure(let error):
debugPrint("Stopped receiving discovery with error \(error)")
}
} receiveValue: { discovery in
debugPrint("Discovered peripheral \(discovery.peripheral.name ?? "Unknown name")")
}
DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
cancellable.cancel()
}
}
}