pythontornadoblockingprocess-pool

How to use Process Pool Executor in tornado with synchronous code?


I am new to tornado and have an API that makes a blocking database call. Because of this blocking call, tornado isn't able to serve all the requests if multiple requests come at the same time.

I looked about it and found out that it could be solved using two approaches: Making the code asynchronous and/or using Process Pool Executors. My assumption here is that having multiple process pool executors is like having multiple processes on tornado to serve multiple requests. Every single example I looked at about implementing Process Pool Executor also makes the code asynchronous.

I cannot make the code asynchronous for the time being because it would require more code changes and so I was looking at simple fix using Process Pool Executors.

What I have currently

import tornado.ioloop
import tornado.web


def blocking_call():
    import time
    time.sleep(60)
    return "Done"


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        val = blocking_call()
        self.write(val)


if __name__ == "__main__":
    app = tornado.web.Application([(r"/", MainHandler)])
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

What I tried

import tornado.ioloop
import tornado.web
from concurrent.futures import ProcessPoolExecutor


def blocking_call():
    import time
    time.sleep(60)
    return "Done"


class MainHandler(tornado.web.RequestHandler):
    def initialize(self, executor):
        self.executor = executor

    def get(self):
        val = self.executor.submit(blocking_call)
        self.write(val)


if __name__ == "__main__":
    executor = ProcessPoolExecutor(5)

    app = tornado.web.Application(
        [(r"/", MainHandler, dict(executor=executor))])
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

My problem with this approach is that now I am getting a future object instead of actual response. How do I make the Get request wait for self.executor to complete before sending back the response?


Solution

  • The executor.submit() returns a concurrent.futures.Future which is not awaitable.

    I suggest you use Tornado's run_in_executor method to execute the blocking task.

    async def get(self):
        loop = tornado.ioloop.IOLoop.current()
        val = await loop.run_in_executor(self.executor, blocking_call)
        self.write(val)