apache-flinkflink-statefun

Flink stateful functions, async calls with the python SDK


I'm trying out the Stateful Functions 2.1 API with the Python SDK and I can't see a clear way on how to make async calls to external api's without blocking the application.

Is this possible or can somebody send me on the right path?


Solution

  • As of StateFun 2.2.0, you can use the AsyncRequestReplyHandler.

    From the docs:

    The Python SDK ships with an additional handler, AsyncRequestReplyHandler, that supports Python’s awaitable functions (coroutines). This handler can be used with asynchronous Python frameworks, for example aiohttp.

    @functions.bind("example/hello")
    async def hello(context, message):
       response = await compute_greeting(message)
       context.reply(response)
    
    from aiohttp import web
    
    handler = AsyncRequestReplyHandler(functions)
    
    async def handle(request):
       req = await request.read()
       res = await handler(req)
       return web.Response(body=res, content_type="application/octet-stream")
    
    app = web.Application()
    app.add_routes([web.post('/statefun', handle)])
    
    if __name__ == '__main__':
      web.run_app(app, port=5000)
    

    You can see here a complete example.