I'm following instructions (here) to mirror multiple orderbooks on Binance Exchange on my local machine.
Suppose for simplicity, I wish to mirror orderbooks for 2 symbols: ETHBTC and DOGEBTC (in reality it's 350+).
First I have to buffer the websocket order-update streams:
Now I have to download snapshots:
As soon as I have the snapshots, I apply the buffer (which is ongoing) to them, yielding a STATE.
After that, all order-updates can simply be applied to the state.
For the updates stream I can do:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(URL) as wsock:
async for msg in wsock:
if msg.type != aiohttp.WSMsgType.TEXT:
J = json.loads(msg.data)
symbol = J['data']['s']
process_update(symbol, J)
But how can I, once the first update has come in, initiate downloading the snapshot, with a completion handler that will process it, in such a way as to not interrupt the stream?
If I'm tracking 300 symbols, that's 300 downloads happening at the same time.
I found resources on async downloading of multiple files, but I cannot see how to integrate this with the requirement of processing the stream.
I could always do the downloads in a separate thread, but isn't this fighting the architecture goals of aiohttp?
REF:
Answer thanks to graingert on IRC Freenode #python 🙏
import anyio.to_thread
async def foo():
async def download(symbol):
async with session.get(f"{url}/{symbol}") as resp:
await do_something(resp)
async with aiohttp.ClientSession() as session, session.ws_connect(
URL
) as wsock, anyio.create_task_group() as tg:
async for msg in wsock:
if msg.type != aiohttp.WSMsgType.TEXT:
J = json.loads(msg.data)
symbol = J["data"]["s"]
tg.start_soon(download, symbol)
await anyio.to_thread.run(process_update, symbol, J)