pythonmacospython-asynciokqueue

How to use kqueue for file monitoring in asyncio?


I want to use kqueue to monitor files for changes. I can see how to use select.kqueue() in a threaded way.

I'm searching for a way to use it with asyncio. I may have missed something really obvious here. I know that python uses kqueue for asyncio on macos. I'm happy for any solution to only work when kqueue selector is used.

So far the only way I can see to do this is create a thread to continually kqueue.control() from another thread and then inject the events in with asyncio.loop.call_soon_threadsafe(). I feel like there should be a better way.


Solution

  • You can add the FD from the kqueue objet as a reader to the control loop using loop.add_reader(). The control loop will then inform you events are ready to collect.

    There's two features of doing this which might be odd to those familiar with kqueue:

    There are more efficient ways to write this, but here's an example of how to completely replace select.kqueue.control with an async method (here named kqueue_control):

    async def kqueue_control(kqueue: select.kqueue,
                             changes: Optional[Iterable[select.kevent]],
                             max_events: int,
                             timeout: Optional[int]):
    
        def receive_result():
            try:
                # Events are ready to collect; fetch them but do not block
                results = kqueue.control(None, max_events, 0)
            except Exception as ex:
                future.set_exception(ex)
            else:
                future.set_result(results)
            finally:
                loop.remove_reader(kqueue.fileno())
                
        # If this call is non-blocking then just execute it
        if timeout == 0 or max_events == 0:
            return kqueue.control(changes, max_events, 0)
        
        # Apply the changes, but DON'T wait for events
        kqueue.control(changes, 0)
        loop = asyncio.get_running_loop()
        future = loop.create_future()
        loop.add_reader(kqueue.fileno(), receive_result)
        if timeout is None:
            return await future
        else:
            return await asyncio.wait_for(future, timeout)