pythonwebsocketdownloadaiohttpsimultaneous

Process websocket stream while downloading multiple files using aiohttp


I'm following instructions (here) to mirror multiple orderbooks on Binance Exchange on my local machine.

Suppose for simplicity, I wish to mirror orderbooks for 2 symbols: ETHBTC and DOGEBTC (in reality it's 350+).

First I have to buffer the websocket order-update streams:

Now I have to download snapshots:

As soon as I have the snapshots, I apply the buffer (which is ongoing) to them, yielding a STATE.

After that, all order-updates can simply be applied to the state.

For the updates stream I can do:

        async with aiohttp.ClientSession() as session:
            async with session.ws_connect(URL) as wsock:
                async for msg in wsock:
                    if msg.type != aiohttp.WSMsgType.TEXT:
                        J = json.loads(msg.data)
                        symbol = J['data']['s']

                        process_update(symbol, J)

But how can I, once the first update has come in, initiate downloading the snapshot, with a completion handler that will process it, in such a way as to not interrupt the stream?

If I'm tracking 300 symbols, that's 300 downloads happening at the same time.

I found resources on async downloading of multiple files, but I cannot see how to integrate this with the requirement of processing the stream.

I could always do the downloads in a separate thread, but isn't this fighting the architecture goals of aiohttp?

REF:


Solution

  • Answer thanks to graingert on IRC Freenode #python 🙏

    import anyio.to_thread
    
    
    async def foo():
        async def download(symbol):
            async with session.get(f"{url}/{symbol}") as resp:
                await do_something(resp)
    
        async with aiohttp.ClientSession() as session, session.ws_connect(
            URL
        ) as wsock, anyio.create_task_group() as tg:
            async for msg in wsock:
                if msg.type != aiohttp.WSMsgType.TEXT:
                    J = json.loads(msg.data)
                    symbol = J["data"]["s"]
    
                    tg.start_soon(download, symbol)
                    await anyio.to_thread.run(process_update, symbol, J)