I wrote a Python script using the bleak Bluetooth Low Energy client that works from a command line. I want to split the script into REST endpoints.
To that end, I want to replace async with BleakClient(d, timeout=10) as client:
with something that allows me to make client
global so that I can use it in another REST endpoint. In other words, I want client
to hang around beyond the call that connects to the device. Any ideas?
I tried client = await BleakClient(d, timeout=10)
but this line throws an exception: object BleakClient can't be used in 'await' expression
.
EDIT
I am including the entire (truncated) Python script. Call to ConnectAndReadChar() works. Call to ConnectUUT() does not work. Exception on line 'client = await BleakClient(d, timeout=10)', <class 'TypeError'>.
import asyncio
import traceback
from bleak import BleakScanner, BleakClient
import sys
import time
global client
async def ConnectAndReadChar(MAC_ID):
d = await BleakScanner.find_device_by_address(MAC_ID, timeout=20)
time.sleep(0.5)
while True:
try:
if d.address == MAC_ID:
async with BleakClient(d, timeout=10) as client:
time.sleep(0.5)
res = await client.pair(protection_level=None)
print(res)
x = client.is_connected
print("Connected: {0}".format(x))
svc = client.services.get_service("Service ID")
thisCharacteristic = svc.get_characteristic("Char ID")
value = bytes(await client.read_gatt_char(thisCharacteristic))
print("\tValue: {0} ".format(value))
await client.disconnect()
await client.unpair()
return
except:
e = sys.exc_info()[0]
print(e)
async def ConnectUUT(MAC_ID):
global client
d = await BleakScanner.find_device_by_address(MAC_ID, timeout=20)
while True:
try:
if d.address == MAC_ID:
# async with BleakClient(d, timeout=10) as client:
client = await BleakClient(d, timeout=10)
if client is not None:
time.sleep(0.5)
res = await client.pair(protection_level=None)
print(res)
x = client.is_connected
print("Connected: {0}".format(x))
return
except:
e = sys.exc_info()[0]
print(e)
async def ReadFrames():
global client
try:
svc = client.services.get_service("Service ID")
thisCharacteristic = svc.get_characteristic("Char ID")
return
except:
e = sys.exc_info()[0]
print(e)
async def DisconnectUUT():
global client
try:
await client.disconnect()
await client.unpair()
except:
e = sys.exc_info()[0]
print(e)
return
async def main():
await ConnectAndReadChar("MAC ID goes here")
await ConnectUUT("MAC ID goes here")
await ReadFrames()
await DisconnectUUT()
if __name__ == "__main__":
asyncio.run(main())
If you use a BleakClient as an async context manager, it implicitly performs a connect
. You can also do this explicitly:
client = BleakClient(d, timeout=10)
await client.connect()