javamultiplexing

Is there a way for a thread to listen to multiple BlockingQueues in Java?


In Java, is there a way for a thread to listen to multiple java.util.concurrent.BlockingQueue? Just like java.nio.channels.Selector listen to multiple java.nio.channels.SocketChannel.


Solution

  • If you come across a situation like this, I would recommend you to first think about why you need to listen to multiple BlockingQueues. If possible, try inserting to the same BlockingQueue in all of the producer tasks. This should be applicable if all consumer tasks are want to listen to all of the queues.


    You can create another (master) BlockingQueue that gets entries from all other BlockingQueue and one Thread for each BlockingQueue you want to listen to that moves data to the master BlockingQueue:

    //data
    private BlockingQueue<T> masterQueue = new LinkedBlockingQueue<T>(1);
    private List<BlockingQueue<T>> toListenFrom = new ArrayList<>();//assume that has elements
    
    private ExecutorService queueDistributorExecutor = Executors.newVirtualThreadPerTaskExecutor();//use virtual threads if they are available - if not, use a different executor
    
    // distribute elements
    for(BlockingQueue queue : toListenFrom){
        queueDistributorExecutor.execute(()->{
            try {
                while(true) {
                    masterQueue.put(queue.take());
                }
            } catch(InterruptedException e){
                Thread.currentThread().interrupt();
            }
        });
    }
    

    Then, you can read from all queues by reading from masterQueue.

    This code makes the assumptions that it's no problem if elements from toListenFrom queues are removed for masterQueue even if masterQueue is full (these will be inserted later).
    Due to that, my implementation is not "fair". The threads reading from the masterQueue would "reserve" one element from each other queue.