I'm looking for a message queue implementation for python 3.6 (this exact version) that can be used to communicate between multiprocess.Process
es, specifically, It should be a multiproducer, single consumer, fifo with priority receive of messages of application specific types (e.g. if there is an system message (in erlang terms) in the middle of the queue, and an normal message in the head of the queue, the next receive should return the system message rather than the normal one)
But I doubt there will be such an library, so the question becomes, is there any stdlib or third party lib that gives me a chunk of shared memory or better a list, so I can read write to a buffer backed but the memory/list and guard the order with something like mp.Lock
?
multiprocessing.Manager
uses tcp, and starts a new process
I'm not so familiar with Erlang, but, based on how you described your needs, I think you might be able to take the approach of using multiprocessing.Queue
and sorting your messages before reading them.
The idea is to have a multiprocessing.Queue
(FIFO message queue) for each process. When process A sends a message to process B, process A puts its message along with the message's priority into process B's message queue. When a process reads its messages, it transfers the messages from the FIFO queue into a list and then sorts the list before handling the messages. Messages are sorted first by their priority followed by the time at which they arrived in the message queue.
Here's an example that has been tested with Python 3.6 on Windows.
from multiprocessing import Process, Queue
import queue
import time
def handle_messages(process_id, message_queue):
# Keep track of the message number to ensure messages with the same priority
# are read in a FIFO fashion.
message_no = 0
messages = []
while True:
try:
priority, contents = message_queue.get_nowait()
messages.append((priority, message_no, contents))
message_no+=1
except queue.Empty:
break
# Handle messages in correct order.
for message in sorted(messages):
print("{}: {}".format(process_id, message[-1]))
def send_message_with_priority(destination_queue, message, priority):
# Send a message to a destination queue with a specified priority.
destination_queue.put((-priority,message))
def process_0(my_id, queues):
while True:
# Do work
print("Doing work...")
time.sleep(5)
# Receive messages
handle_messages(my_id, queues[my_id])
def process_1(my_id, queues):
message_no = 0
while True:
# Do work
time.sleep(1)
# Receive messages
handle_messages(my_id, queues[my_id])
send_message_with_priority(queues[0], "This is message {} from process {}".format(message_no, my_id), 1)
message_no+=1
def process_2(my_id, queues):
message_no = 0
while True:
# Do work
time.sleep(3)
# Receive messages
handle_messages(my_id, queues[my_id])
send_message_with_priority(queues[0], "This is urgent message {} from process {}".format(message_no, my_id), 2)
message_no+=1
if __name__ == "__main__":
qs = {i: Queue() for i in range(3)}
processes = [Process(target=p, args=(i, qs)) for i, p in enumerate([process_0, process_1, process_2])]
for p in processes:
p.start()
for p in processes:
p.join()