python-3.xapache-kafkafaust

Running Faust Agent Synchronously


Check the below code

@app.agent()
async def process(stream):
    async for value in stream.take(5000, within=5):
        process(value)

The agent takes 5000 records within 5 seconds asynchronously and process them. I don't want the agent to pick another 5000 thousand records until the processing of previous one is finished. Basically I want to run the agent Synchronously. Is there a way we can do it?


Solution

  • I tried with the following code to see whether the worker is executing second batch of record while the processing of first batch has not yet finished

    @app.agent()
    async def process(stream):
        async for value in stream.take(5000, within=5):
            print(1)
            await async.sleep(30)
    

    The worker printed 1 and waited for 30 seconds to print 2. The await statement gives control back to the event loop but in this case it waited which implies that the batches are executed one after the another. Hence this is synchronous.

    PS. Committing offset, rebalancing, monitoring etc are asynchronous operations which are handled by event loop.