iosswiftbluetooth-lowenergymicropythonraspberry-pi-pico

Bluetooth BLE implementation between Swift app and Rasberry Pi Pico W


I have an IOS app using CoreBluetooth to connect to a Rasberry Pi Pico W. I am trying to send data to the board, but the data is not received, despite the connection is established properly. The same Central Manager code works with another IOS app using Peripheral Manager.

I have the feeling that there might be some issue with the way the UUIDs are advertised between the 2 devices. This is because I have another issue (maybe related): if I scan for peripherals from the Central by filtering for the specific UUID( centralManager.scanForPeripherals(withServices: [picServiceUUID], options: nil) ) the pico W is not found, while if I leave the filter out ( centralManager.scanForPeripherals(withServices: nil, options: nil) ) then Central detects pico W and can connect. Here below my code for micropython and for the Central swift code. Thanks in advance for anyone who can help.

Micropython for Pico W

import bluetooth
import time

# Define the service and characteristic UUIDs
SERVICE_UUID = bluetooth.UUID("2035166A-756E-3021-98A1-15242BAAB874")
CHAR_UUID_TX = bluetooth.UUID("AF0BADB1-5B99-43CD-917A-A77BC549E3CC")
CHAR_UUID_RX = bluetooth.UUID("AF0BADB1-5B99-43CD-917A-A77BC549E3CD")

# Initialize BLE
ble = bluetooth.BLE()
ble.active(True)

# Set the device name
device_name = "PicoW_BLE_Device"
ble.config(gap_name=device_name)

# Create a BLE service
service = (SERVICE_UUID, [
    (CHAR_UUID_TX, bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY),
    (CHAR_UUID_RX, bluetooth.FLAG_WRITE | bluetooth.FLAG_WRITE_NO_RESPONSE),
])
services = ble.gatts_register_services([service])

# Extract handles for the characteristics
tx_handle = services[0][0]
rx_handle = services[0][1]

# Function to reverse the UUID bytes
def reverse_bytes(uuid):
    return bytes(reversed(bytes(uuid)))

# Function to construct advertisement payload
def adv_payload(name, services):
    payload = bytearray()
    # Add Flags
    payload.extend(bytearray([0x02, 0x01, 0x06]))  # General Discoverable Mode
    # Add Complete List of 128-bit Service Class UUIDs
    for uuid in services:
        b = reverse_bytes(uuid)
        payload.extend(bytearray([len(b) + 1, 0x07]) + b)
    # Add Complete Local Name
    payload.extend(bytearray([len(name) + 1, 0x09]) + name.encode())
    return payload

# Construct the advertisement data
adv_data = adv_payload(device_name, [SERVICE_UUID])

# Log the advertisement data for debugging
print("Advertisement Data (hex):", adv_data.hex())

# Callback for handling BLE events
def irq(event, data):
    print(data)
    if event == 1:  # Central connected
        conn_handle, addr_type, addr = data
        addr_str = ':'.join('{:02x}'.format(b) for b in addr)
        print(f"Connected to central: {addr_str}")

    elif event == 2:  # Central disconnected
        conn_handle, addr_type, addr = data
        addr_str = ':'.join('{:02x}'.format(b) for b in addr)
        print(f"Disconnected from central: {addr_str}")
        # Restart advertising
        ble.gap_advertise(advertisement_interval_ms, adv_data, connectable=True)

    elif event == 3:  # GATT server write event
        conn_handle, attr_handle = data
        if attr_handle == rx_handle:
            request = ble.gatts_read(rx_handle)
            data = bytes(request)
            print(f"Received data: {data.decode()}")
            # Respond to the central device
            response = "Data received"
            ble.gatts_notify(conn_handle, tx_handle, response.encode())

# Set the IRQ handler
ble.irq(irq)

# Set the advertisement interval in milliseconds
advertisement_interval_ms = 100  # 100ms interval

# Main loop for advertising
while True:
    print(f"Advertising as '{device_name}' with interval {advertisement_interval_ms}ms...")
    
    # Start advertising with the constructed payload and interval
    ble.gap_advertise(advertisement_interval_ms, adv_data, connectable=True)
    
    # Wait before advertising again
    time.sleep(1)

Here is the Bluetooth Manager in Swift

import Foundation
import CoreBluetooth

class BluetoothManager: NSObject, ObservableObject, CBCentralManagerDelegate, CBPeripheralDelegate {
    var centralManager: CBCentralManager!
    var connectedPeripheral: CBPeripheral?
    var txCharacteristic: CBCharacteristic?
    
    let picoServiceUUID = CBUUID(string: "2035166A-756E-3021-98A1-15242BAAB874")
    let txCharacteristicUUID = CBUUID(string: "AF0BADB1-5B99-43CD-917A-A77BC549E3CC")
    let rxCharacteristicUUID = CBUUID(string: "AF0BADB1-5B99-43CD-917A-A77BC549E3CD")
    
    @Published var peripherals: [CBPeripheral] = []
    @Published var isConnected = false
    @Published var statusMessage = "Scanning for devices..."
    
    override init() {
        super.init()
        centralManager = CBCentralManager(delegate: self, queue: nil)
    }
    
    func centralManagerDidUpdateState(_ central: CBCentralManager) {
        if central.state == .poweredOn {
            centralManager.scanForPeripherals(withServices: nil, options: nil)
        } else {
            statusMessage = "Bluetooth not available."
        }
    }
    
    func centralManager(_ central: CBCentralManager, didDiscover peripheral: CBPeripheral, advertisementData: [String : Any], rssi RSSI: NSNumber) {
        if peripheral.name == "PicoW_BLE_Device" {
            print("payload \(advertisementData)")
        }
        if !peripherals.contains(where: { $0.identifier == peripheral.identifier }) {
            peripherals.append(peripheral)
        }
    }
    
    func connectToPeripheral(peripheral: CBPeripheral) {
        connectedPeripheral = peripheral
        connectedPeripheral?.delegate = self
        centralManager.stopScan()
        centralManager.connect(peripheral, options: nil)
        statusMessage = "Connecting to \(peripheral.name ?? "device")..."
    }
    
    func centralManager(_ central: CBCentralManager, didConnect peripheral: CBPeripheral) {
        peripheral.discoverServices(nil)
        statusMessage = "Connected to \(peripheral.name ?? "device")."
        isConnected = true
    }
    
    func peripheral(_ peripheral: CBPeripheral, didDiscoverServices error: Error?) {
        guard let services = peripheral.services else { return }
        for service in services {
            peripheral.discoverCharacteristics(nil, for: service)
        }
    }
    
    func centralManager(_ central: CBCentralManager, didDisconnectPeripheral peripheral: CBPeripheral, error: Error?) {
        statusMessage = "Disconnected from \(peripheral.name ?? "device")."
        isConnected = false
        if let error = error {
            statusMessage += " Error: \(error.localizedDescription)"
        }
        // Optionally, you can restart scanning here
        centralManager.scanForPeripherals(withServices: nil, options: nil)
    }
    
    func peripheral(_ peripheral: CBPeripheral, didDiscoverCharacteristicsFor service: CBService, error: Error?) {
        guard let characteristics = service.characteristics else { return }
        for characteristic in characteristics {
            if characteristic.properties.contains(.write) || characteristic.properties.contains(.writeWithoutResponse) {
                // RX Characteristic
                print("Found RX Characteristic: \(characteristic.uuid)")
            }
            if characteristic.properties.contains(.notify) {
                // TX Characteristic
                print("Found TX Characteristic: \(characteristic.uuid)")
            }
            if characteristic.uuid == txCharacteristicUUID {
                txCharacteristic = characteristic
                statusMessage = "Ready to send data."
            }
        }
    }
    
    func sendData(_ dictionary: [String: Any]) {
        guard let txCharacteristic = txCharacteristic, let peripheral = connectedPeripheral else { return }
        do {
            let data = try JSONSerialization.data(withJSONObject: dictionary, options: [])
            peripheral.writeValue(data, for: txCharacteristic, type: .withResponse)
            statusMessage = "Sent data: \(dictionary)"
        } catch {
            statusMessage = "Failed to send data: \(error.localizedDescription)"
            print("Failed to send data: \(error.localizedDescription)")
        }
    }
}

Additional info: here is the advertised data from picoW detected by the app and it looks like the Service UUID is missing ["kCBAdvDataTimestamp": 740772445.177747, "kCBAdvDataRxSecondaryPHY": 0, "kCBAdvDataRxPrimaryPHY": 129, "kCBAdvDataIsConnectable": 1]


Solution

  • The most likely reason is that your advertising payload is too large. The maximum payload in "normal" (non-extended) adverstising is 31 bytes. Your payload is 39 bytes. You need to change your local name to something shorter, or send a shortened local name (0x08). There is only room for an 8 character name if you advertise a 128-bit service.

    While it's probably not the issue, the interval parameter of gap_advertise is in microseconds, not milliseconds. You're requesting an advertising interval that is too fast. I expect it's either being rounded up to 625µs or it's being assigned a default value.

    A similar setup will work between two iPhones for two reasons: