pythonmultithreadingqueuecommunicationjanus

How do i comunicate between Threads Using Queues(janus)?? In Python


I'm building a multi-threaded system where:

  1. One thread connects to a WebSocket, processes live kline data, and queues symbol-interval keys for processing.
  2. Another thread runs multiple async workers that await those keys from the queue and generate signals.

Each thread has its own asyncio event loop, and I use threads to isolate the heavy workflows.

I'm using janus.Queue to bridge async <-> sync code between threads. But every time I try to await queue.async_.get() in the worker thread (different loop than where it was created), I get:

RuntimeError: <janus.Queue object at 0x000002619212E920> is bound to a different event loop

What I tried:

I read that you cannot create a janus.Queue in one thread and use it in another thread's event loop — which matches what I'm seeing.

So I updated my logic like this:

This is the wrapper I use: import janus

class SignalQueues:

    def __init__(self):
        self._queue = janus.Queue()

    @property
    def sync(self):
        return self._queue.sync_q

    @property
    def async_(self):
        return self._queue.async_q

Still stuck:

Even after creating the queues inside the correct thread/loop, when I pass that reference to another thread and try to await queue.async_.get() in its event loop, I still get the RuntimeError.

I understand that each janus.Queue must stay tied to the loop where it's created, but how do I cleanly share queues between threads that each have their own event loops?

Ask:

Can anyone show a clean and safe pattern to share queues across asyncio threads (each with its own loop), using janus or another method? If janus is not appropriate, what’s the alternative to fan out messages from a WebSocket thread to async worker threads?

I’ll post my root ServiceActivator class if needed, but this is the core issue.

Thanks.

MAIN CODE

class ServiceActivator:

def __init__(self):

    #Queues and events for inter-thread communication
    self.closing_klines_queue = None
    self.entry_signals_queue = None

    #This event is set when the WebSocket is ready to start processing signals from the queues
    self.websocket_ready_event = AsyncCompatibleEvent()

    #These events are set when the WebSocket is shutting down, signaling other components to stop processing and cancel tasks
    self.websocket_shutdown_event = AsyncCompatibleEvent()
    self.worker_shutdown_event = AsyncCompatibleEvent()
    self.entry_consumer_shutdown_event = AsyncCompatibleEvent()
    self.order_monitor_shutdown_event = AsyncCompatibleEvent()
    
    # Main Event to set all threads to exit gracefully
    self._exit_flag = threading.Event()

    # Threads for each component
    self._websocket_thread = None
    self._worker_threads = None
    self._entry_thread = None
    self._order_thread = None
    
def start(self):
    """
        Starts all service threads and initializes the event loop for each component.
    """
    self._websocket_thread = threading.Thread(target=self._run_websocket_loop)
    self._websocket_thread.start()

    self._worker_threads = threading.Thread(target=self._run_worker_manager_loop)
    self._worker_threads.start()

def _run_websocket_loop(self):
    """
    Initializes the 3rd-party WebSocket stream and starts listening for kline data.
    """
    from domains.signal_service.runtime.thirdparty_stream_startup import ThirdPartyStreamStartup
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    self.closing_klines_queue = SignalQueues()
    ws_stream = ThirdPartyStreamStartup(
        self.closing_klines_queue,
        self.websocket_ready_event,
        self.websocket_shutdown_event,
    )
    loop.run_until_complete(ws_stream.launch_streaming_app())

def _run_worker_manager_loop(self):
    """
    Initializes the worker manager that processes signals from the entry queue.
    """
    from domains.signal_service.runtime.worker_manager_startup import WorkerManagerStartup
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    self.entry_signals_queue = SignalQueues()
    worker_manager = WorkerManagerStartup(
        self.entry_signals_queue,
        self.closing_klines_queue,
        self.websocket_ready_event,
        self.worker_shutdown_event,
    )
    loop.run_until_complete(worker_manager.launch_worker_app())

THIS IS HOW I RUN IT MAINLY

activator = ServiceActivator()
activator.start()

Solution

  • Thanks for the help. After banging my head against janus.Queue and running into the same event loop binding errors no matter how I structured it, I decided to scrap the whole threaded approach.

    Here’s the problem in short: janus.Queue is tightly bound to the event loop it was created in. Even if you create it inside the correct thread's loop and try to pass the reference around, as soon as you await queue.async_.put() or .get() in a different thread with a different loop, it blows up with RuntimeError: bound to a different event loop. That made it a nightmare to coordinate multiple threads, each with their own event loop — especially when I had to pass queues between four separate threads.

    I actually managed to get it working for a bit by carefully initializing each queue inside the correct thread and using .sync on one side and .async_ on the other, depending on the direction of communication. But once I needed to pass another queue across threads (e.g., worker → entry → order monitor), the complexity started snowballing. At that point, the whole thing became too brittle and hard to maintain.

    So I switched to using AnyIO and went fully async — no threads at all. Everything now runs as cleanly isolated tasks inside a single event loop using TaskGroup, and communication is dead simple using AnyIO’s native queues. No more event loop headaches, no more threading edge cases.

    Appreciate the suggestion about culsans — looks promising and probably would’ve solved the issue technically, but going all-in on async with AnyIO ended up being the cleaner, more scalable solution in my case.