pythonpython-3.xasynchronouspython-asynciopyaudio

Converting a Python function with a callback to an asyncio awaitable


I want to use the PyAudio library in an async context, but the main entry point for the library only has a callback-based API:

import pyaudio

def callback(in_data, frame_count, time_info, status):
    # Do something with data

pa = pyaudio.PyAudio()
self.stream = self.pa.open(
    stream_callback=callback
)

How I'm hoping to use it is something like this:

pa = SOME_ASYNC_COROUTINE()
async def listen():
    async for block in pa:
        # Do something with block

The problem is, I'm not sure how to convert this callback syntax to a future that completes when the callback fires. In JavaScript I would use promise.promisify(), but Python doesn't seem to have anything like that.


Solution

  • An equivalent of promisify wouldn't work for this use case for two reasons:

    Here is one possible implementation:

    def make_iter():
        loop = asyncio.get_event_loop()
        queue = asyncio.Queue()
        def put(*args):
            loop.call_soon_threadsafe(queue.put_nowait, args)
        async def get():
            while True:
                yield await queue.get()
        return get(), put
    

    make_iter returns a pair of <async iterator, put-callback>. The returned objects hold the property that invoking the callback causes the iterator to produce its next value (the arguments passed to the callback). The callback may be called from an arbitrary thread and is thus safe to pass to pyaudio.open. The async iterator should be used with async for in an asyncio coroutine, which will correctly suspend while waiting for the callback to supply the next value:

    async def main():
        stream_get, stream_put = make_iter()
        stream = pa.open(stream_callback=stream_put)
        stream.start_stream()
        async for in_data, frame_count, time_info, status in stream_get:
            # ...
    
    asyncio.get_event_loop().run_until_complete(main())
    

    Note that, according to the documentation, the callback must also return a meaningful value, a tuple of frames and a Boolean flag. This can be incorporated in the design by changing the fill function to also receive the data from the asyncio side. The implementation is not included because it might not make much sense without an understanding of the domain.