pythonpython-asynciostreamingpsycopg3

Async call of conn.execute seems to block other async calls from executing


I'm using two libraries that are supposed to handle all operations asynchronously: one for receiving streaming data from the server and another one, psycopg, for saving data into TimescaleDB. However, for some reason, conn.commit() is blocking the main event loop, and because of this, the time lag between receiving and saving data to the database increases with each iteration. How can I fix this issue without moving database calls into a separate thread?

async def exec_insert_trade(conn, trade, current_time):
    trade_values = await generate_trade_values(trade)
    columns = ', '.join(trade_values.keys())
    placeholders = ', '.join(['%s'] * len(trade_values))
    query = f"INSERT INTO trades ({columns}) VALUES ({placeholders})"
    time_difference = current_time - trade.time  # Calculate the difference
    print(f"Time difference trade: {time_difference}")
    await conn.execute(query, list(trade_values.values()))
    await conn.commit()



async def main():
    async with AsyncRetryingClient(TINKOFF_READONLY_TOKEN, settings=retry_settings) as client, \
            AsyncConnectionPool(POSTGRES_DATABASE_URL, min_size=2) as pool:
        async for marketdata in client.market_data_stream.market_data_stream(request_iterator()):
            current_time = datetime.now(timezone.utc)
            async with pool.connection() as conn:
                if marketdata.trade is not None:
                    await exec_insert_trade(conn, marketdata.trade, current_time)



if __name__ == "__main__":
    asyncio.run(main())
....
Time difference trade: 0:00:00.233646
Time difference trade: 0:00:00.952377
Time difference trade: 0:00:01.187182
Time difference trade: 0:00:01.042835
Time difference trade: 0:00:03.101548
Time difference trade: 0:00:06.067422
Time difference trade: 0:00:07.025047

...


Solution

  • Your code is using coroutines, but it is only using a single task, the one created by asyncio.run. Thus, nothing in your code is actually asynchronous. It is executing all synchronously.

    See my answer here for some more details on understanding the difference between pure coroutines and tasks. The short of it is that you need tasks to execute things concurrently.

    When you call await exec_insert_trade(conn, marketdata.trade, current_time), it will block the current task until it completes. So unless any of the underlying libraries used in exec_insert_trade do task creation, it will be synchronous. If you don't need any return value from exec_insert_trade, launch it as a task.


    Another way to go about this is realizing that this is a classic producer-consumer architecture. You have one piece of code producing data (the part that receives streaming data), and the other part consuming that data (the database writer).

    Here is a simple example of how to do this using asyncio queues.

    import asyncio
    from datetime import datetime
    
    async def write_to_database(queue: asyncio.Queue) -> None:
        while True:
            message = await queue.get()
            await asyncio.sleep(1)
            print(f"Wrote {message} to database")
    
    async def receive_data_from_external_source(queue: asyncio.Queue) -> None:
        while True:
            now = datetime.now()
            current_time = now.strftime("%H:%M:%S")
            queue.put_nowait(current_time)
            await asyncio.sleep(1)
    
    async def main() -> None:
        queue = asyncio.Queue()
        await asyncio.gather(
            write_to_database(queue),
            receive_data_from_external_source(queue),
        )
    
    asyncio.run(main())
    

    This program prints out something like the following to the console:

    Wrote 11:14:39 to database
    Wrote 11:14:40 to database
    Wrote 11:14:41 to database
    Wrote 11:14:42 to database
    Wrote 11:14:43 to database
    Wrote 11:14:44 to database
    Wrote 11:14:45 to database
    Wrote 11:14:46 to database
    Wrote 11:14:47 to database
    Wrote 11:14:48 to database
    Wrote 11:14:49 to database
    Wrote 11:14:50 to database
    

    Note that while both tasks take a second to complete, the data is still still written out at a cadence of once per second. That's because the coroutines are launched as tasks, in this case using asyncio.gather.

    In your real program, you of course wouldn't be calling asyncio.sleep. And you will need to be aware of the rate that data is coming in and the rate that you are able to write it, but that goes for any solution.