pythonmultiprocessingpython-asyncio

How to safely check a multiprocessing.Event in an asyncio loop without blocking?


I'm working on an asynchronous server using asyncio, but I need to monitor a multiprocessing.Event (used to signal termination from another process) inside my async event loop. Here's the simplified version of my server loop:

self.__terminateExchange: Event = multiprocessing.Event()
server = await asyncio.start_server(
    self.handle_client,
    self.ipAddress,
    self.port,
    backlog=self.maxSocketConnections
)

async with server:
    while not self.__terminateExchange.is_set():
        await asyncio.sleep(0.01)

self.__terminateExchange is a multiprocessing.Event. The idea is that another process can call .set() on it, and I want the server to shut down gracefully when that happens.

Problem: This setup is unreliable — sometimes the server exits as expected when the event is set, and sometimes it doesn't. I suspect that checking .is_set() inside the asyncio loop is the issue, possibly due to it being a blocking or non-async-safe call.

  1. What is the correct way to wait for or monitor a multiprocessing.Event in an asyncio event loop?

  2. Is there a non-blocking or async-compatible way to integrate multiprocessing.Event with asyncio?

Would using a thread to bridge the multiprocessing.Event to an asyncio.Event be a better approach?

Any help would be deeply appreciated.

I tried stopping an async code using multiprocessing.Event. Here, when I set the event, it sometimes stops and sometimes doesn't. I also used process.is_alive() to check if the process is still alive and kill it, but it still runs sometimes.

I want to be able to stop the code once the event is set.


Solution

  • I think the issue you're experiencing stems from multiprocessing.Event.is_set() not being truly async-safe, the problem is that you're polling a blocking call in your asyncio loop, which can create timing issues.

    For that you can asyncio.to_thread():

    import asyncio
    import multiprocessing
    
    async def wait_for_termination(terminate_event):
        """I believe this approach properly isolates the blocking call"""
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, terminate_event.wait)
    
    # In your server code:
    async with server:
        termination_task = asyncio.create_task(
            wait_for_termination(self.__terminateExchange)
        )
        await termination_task
    

    Or also you could also bridge to an asyncio.Event:

    class EventBridge:
        def __init__(self, mp_event):
            self.mp_event = mp_event
            self.async_event = asyncio.Event()
            
        def start_monitoring(self):
            def monitor():
                self.mp_event.wait()  # Block in thread
                asyncio.run_coroutine_threadsafe(
                    self._signal_async(), asyncio.get_running_loop()
                )
            threading.Thread(target=monitor, daemon=True).start()
            
        async def wait(self):
            await self.async_event.wait()
    

    The key insight is: never call blocking multiprocessing primitives directly in asyncio. I think your original tight polling loop with await asyncio.sleep(0.01) creates race conditions because is_set() isn't guaranteed to be non-blocking.