pythonpython-asyncio

Calling a coroutine from asyncio.Protocol.data_received


This is similar to Calling coroutines in asyncio.Protocol.data_received but I think it warrants a new question.

I have a simple server set up like this

loop.create_unix_server(lambda: protocol, path=serverSocket)

It works fine, if I do this

 def data_received(self, data):
    data = b'data reply'
    self.send(data)

my client gets the reply. But I can't get it to work with any sort of asyncio call. I tried all of the following and none of them worked.

@asyncio.coroutine
def go(self):
    yield from asyncio.sleep(1, result = b'data reply')

def data_received(self, data):
    print('Data Received', flush=True)

    task = asyncio.get_event_loop().create_task(self.go())
    data = yield from asyncio.wait_for(task,10)
    self.send(data)

that one hung and printed nothing (if I decorated data_received with @asyncio.coroutine I get that that is not yielded from) OK, I get that using yield in data_received isn't right.

If I try a new event loop, as below, that hangs in run_until_complete

    loop = asyncio.new_event_loop()
    task = loop.create_task(self.go())
    loop.run_until_complete(task)
    data = task.result()
    self.send(data)

If I use a Future, that also hangs in run_until_complete

@asyncio.coroutine
def go(self, future):
    yield from asyncio.sleep(1)
    future.set_result(b'data reply')

def data_received(self, data):
    print('Data Received', flush=True)

    loop = asyncio.new_event_loop()
    future = asyncio.Future(loop=loop)
    asyncio.async(self.go(future))
    loop.run_until_complete(future)
    data = future.result()
    self.send(data)

The following gets close, but it returns immediately and the result is of type asyncio.coroutines.CoroWrapper, implying that the wait_for line returned immediately with the unfinished task?

@asyncio.coroutine
def go(self):
    return(yield from asyncio.sleep(3, result = b'data reply'))

@asyncio.coroutine
def go2(self):
    task = asyncio.get_event_loop().create_task(self.go())
    res = yield from asyncio.wait_for(task, 10)
    return result

def data_received(self, data):
    print('Data Received', flush=True)

    data = self.go2()
    self.send(data)

I'm a bit stuck really, and would appreciate some pointers about what to look at.


Solution

  • You need to add your coroutine to the event loop, and then use Future.add_done_callback to handle the result when the coroutine completes:

    @asyncio.coroutine
    def go(self):
        return(yield from asyncio.sleep(3, result = b'data reply'))
    
    def data_received(self, data):
        print('Data Received', flush=True)
    
        task = asyncio.async(self.go()) # or asyncio.get_event_loop().create_task()
        task.add_done_callback(self.handle_go_result)
    
    def handle_go_result(self, task):
        data = task.result()
        self.send(data)
    

    Calling a coroutine directly in data_received just simply isn't allowed, since the caller isn't going to try to yield from it, and creating/running a new event loop inside of data_received will always end up blocking the main event loop until the inner event loop finishes its work.

    You just want to schedule some work with your main event loop (asyncio.async/loop.create_task()), and schedule a callback to run when the work is done (add_done_callback).