pythonmessage-queuepriority-queuemessage-passing

Python3.6 is there a erlang style message queue?


I'm looking for a message queue implementation for python 3.6 (this exact version) that can be used to communicate between multiprocess.Processes, specifically, It should be a multiproducer, single consumer, fifo with priority receive of messages of application specific types (e.g. if there is an system message (in erlang terms) in the middle of the queue, and an normal message in the head of the queue, the next receive should return the system message rather than the normal one)

But I doubt there will be such an library, so the question becomes, is there any stdlib or third party lib that gives me a chunk of shared memory or better a list, so I can read write to a buffer backed but the memory/list and guard the order with something like mp.Lock?

multiprocessing.Manager uses tcp, and starts a new process


Solution

  • I'm not so familiar with Erlang, but, based on how you described your needs, I think you might be able to take the approach of using multiprocessing.Queue and sorting your messages before reading them.

    The idea is to have a multiprocessing.Queue (FIFO message queue) for each process. When process A sends a message to process B, process A puts its message along with the message's priority into process B's message queue. When a process reads its messages, it transfers the messages from the FIFO queue into a list and then sorts the list before handling the messages. Messages are sorted first by their priority followed by the time at which they arrived in the message queue.

    Here's an example that has been tested with Python 3.6 on Windows.

    from multiprocessing import Process, Queue
    import queue
    import time
    
    def handle_messages(process_id, message_queue):
        # Keep track of the message number to ensure messages with the same priority
        # are read in a FIFO fashion.
        message_no = 0
        messages = []
        while True:
            try:
                priority, contents = message_queue.get_nowait()
                messages.append((priority, message_no, contents))
                message_no+=1
            except queue.Empty:
                break
        # Handle messages in correct order.
        for message in sorted(messages):
            print("{}: {}".format(process_id, message[-1]))
        
    def send_message_with_priority(destination_queue, message, priority):
        # Send a message to a destination queue with a specified priority.
        destination_queue.put((-priority,message))
    
    def process_0(my_id, queues):
        while True:
            # Do work
            print("Doing work...")
            time.sleep(5)
            # Receive messages
            handle_messages(my_id, queues[my_id])
                
    def process_1(my_id, queues):
        message_no = 0
        while True:
            # Do work
            time.sleep(1)
            # Receive messages
            handle_messages(my_id, queues[my_id])
            send_message_with_priority(queues[0], "This is message {} from process {}".format(message_no, my_id), 1)
            message_no+=1
            
    def process_2(my_id, queues):
        message_no = 0
        while True:
            # Do work
            time.sleep(3)
            # Receive messages
            handle_messages(my_id, queues[my_id])
            send_message_with_priority(queues[0], "This is urgent message {} from process {}".format(message_no, my_id), 2)
            message_no+=1
    
    if __name__ == "__main__":
        qs = {i: Queue() for i in range(3)}
        processes = [Process(target=p, args=(i, qs)) for i, p in enumerate([process_0, process_1, process_2])]
        for p in processes:
            p.start()
        for p in processes:
            p.join()