I'm building a multi-threaded system where:
Each thread has its own asyncio event loop, and I use threads to isolate the heavy workflows.
I'm using janus.Queue to bridge async <-> sync code between threads. But every time I try to await queue.async_.get() in the worker thread (different loop than where it was created), I get:
RuntimeError: <janus.Queue object at 0x000002619212E920> is bound to a different event loop
What I tried:
I read that you cannot create a janus.Queue in one thread and use it in another thread's event loop — which matches what I'm seeing.
So I updated my logic like this:
This is the wrapper I use: import janus
class SignalQueues:
def __init__(self):
self._queue = janus.Queue()
@property
def sync(self):
return self._queue.sync_q
@property
def async_(self):
return self._queue.async_q
Still stuck:
Even after creating the queues inside the correct thread/loop, when I pass that reference to another thread and try to await queue.async_.get() in its event loop, I still get the RuntimeError.
I understand that each janus.Queue must stay tied to the loop where it's created, but how do I cleanly share queues between threads that each have their own event loops?
Ask:
Can anyone show a clean and safe pattern to share queues across asyncio threads (each with its own loop), using janus or another method? If janus is not appropriate, what’s the alternative to fan out messages from a WebSocket thread to async worker threads?
I’ll post my root ServiceActivator class if needed, but this is the core issue.
Thanks.
MAIN CODE
class ServiceActivator:
def __init__(self):
#Queues and events for inter-thread communication
self.closing_klines_queue = None
self.entry_signals_queue = None
#This event is set when the WebSocket is ready to start processing signals from the queues
self.websocket_ready_event = AsyncCompatibleEvent()
#These events are set when the WebSocket is shutting down, signaling other components to stop processing and cancel tasks
self.websocket_shutdown_event = AsyncCompatibleEvent()
self.worker_shutdown_event = AsyncCompatibleEvent()
self.entry_consumer_shutdown_event = AsyncCompatibleEvent()
self.order_monitor_shutdown_event = AsyncCompatibleEvent()
# Main Event to set all threads to exit gracefully
self._exit_flag = threading.Event()
# Threads for each component
self._websocket_thread = None
self._worker_threads = None
self._entry_thread = None
self._order_thread = None
def start(self):
"""
Starts all service threads and initializes the event loop for each component.
"""
self._websocket_thread = threading.Thread(target=self._run_websocket_loop)
self._websocket_thread.start()
self._worker_threads = threading.Thread(target=self._run_worker_manager_loop)
self._worker_threads.start()
def _run_websocket_loop(self):
"""
Initializes the 3rd-party WebSocket stream and starts listening for kline data.
"""
from domains.signal_service.runtime.thirdparty_stream_startup import ThirdPartyStreamStartup
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.closing_klines_queue = SignalQueues()
ws_stream = ThirdPartyStreamStartup(
self.closing_klines_queue,
self.websocket_ready_event,
self.websocket_shutdown_event,
)
loop.run_until_complete(ws_stream.launch_streaming_app())
def _run_worker_manager_loop(self):
"""
Initializes the worker manager that processes signals from the entry queue.
"""
from domains.signal_service.runtime.worker_manager_startup import WorkerManagerStartup
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.entry_signals_queue = SignalQueues()
worker_manager = WorkerManagerStartup(
self.entry_signals_queue,
self.closing_klines_queue,
self.websocket_ready_event,
self.worker_shutdown_event,
)
loop.run_until_complete(worker_manager.launch_worker_app())
THIS IS HOW I RUN IT MAINLY
activator = ServiceActivator()
activator.start()
Thanks for the help. After banging my head against janus.Queue
and running into the same event loop binding errors no matter how I structured it, I decided to scrap the whole threaded approach.
Here’s the problem in short: janus.Queue
is tightly bound to the event loop it was created in. Even if you create it inside the correct thread's loop and try to pass the reference around, as soon as you await queue.async_.put()
or .get()
in a different thread with a different loop, it blows up with RuntimeError: bound to a different event loop
. That made it a nightmare to coordinate multiple threads, each with their own event loop — especially when I had to pass queues between four separate threads.
I actually managed to get it working for a bit by carefully initializing each queue inside the correct thread and using .sync
on one side and .async_
on the other, depending on the direction of communication. But once I needed to pass another queue across threads (e.g., worker → entry → order monitor), the complexity started snowballing. At that point, the whole thing became too brittle and hard to maintain.
So I switched to using AnyIO and went fully async — no threads at all. Everything now runs as cleanly isolated tasks inside a single event loop using TaskGroup
, and communication is dead simple using AnyIO’s native queues. No more event loop headaches, no more threading edge cases.
Appreciate the suggestion about culsans
— looks promising and probably would’ve solved the issue technically, but going all-in on async with AnyIO ended up being the cleaner, more scalable solution in my case.