I'm working on an asynchronous server using asyncio, but I need to monitor a multiprocessing.Event (used to signal termination from another process) inside my async event loop. Here's the simplified version of my server loop:
self.__terminateExchange: Event = multiprocessing.Event()
server = await asyncio.start_server(
self.handle_client,
self.ipAddress,
self.port,
backlog=self.maxSocketConnections
)
async with server:
while not self.__terminateExchange.is_set():
await asyncio.sleep(0.01)
self.__terminateExchange is a multiprocessing.Event. The idea is that another process can call .set() on it, and I want the server to shut down gracefully when that happens.
Problem: This setup is unreliable — sometimes the server exits as expected when the event is set, and sometimes it doesn't. I suspect that checking .is_set() inside the asyncio loop is the issue, possibly due to it being a blocking or non-async-safe call.
What is the correct way to wait for or monitor a multiprocessing.Event in an asyncio event loop?
Is there a non-blocking or async-compatible way to integrate multiprocessing.Event with asyncio?
Would using a thread to bridge the multiprocessing.Event to an asyncio.Event be a better approach?
Any help would be deeply appreciated.
I tried stopping an async code using multiprocessing.Event. Here, when I set the event, it sometimes stops and sometimes doesn't. I also used process.is_alive() to check if the process is still alive and kill it, but it still runs sometimes.
I want to be able to stop the code once the event is set.
I think the issue you're experiencing stems from multiprocessing.Event.is_set() not being truly async-safe, the problem is that you're polling a blocking call in your asyncio loop, which can create timing issues.
For that you can asyncio.to_thread():
import asyncio
import multiprocessing
async def wait_for_termination(terminate_event):
"""I believe this approach properly isolates the blocking call"""
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, terminate_event.wait)
# In your server code:
async with server:
termination_task = asyncio.create_task(
wait_for_termination(self.__terminateExchange)
)
await termination_task
Or also you could also bridge to an asyncio.Event:
class EventBridge:
def __init__(self, mp_event):
self.mp_event = mp_event
self.async_event = asyncio.Event()
def start_monitoring(self):
def monitor():
self.mp_event.wait() # Block in thread
asyncio.run_coroutine_threadsafe(
self._signal_async(), asyncio.get_running_loop()
)
threading.Thread(target=monitor, daemon=True).start()
async def wait(self):
await self.async_event.wait()
The key insight is: never call blocking multiprocessing primitives directly in asyncio. I think your original tight polling loop with await asyncio.sleep(0.01) creates race conditions because is_set() isn't guaranteed to be non-blocking.