Here's simple client-server example on pub-sub pattern with zeromq, created by chatGPT v4. It's pretty self-explanatory and serve for pattern demonstraion purpose. The problem is that client hangs forver on while receiving data from message server.Code looks fine, but I am newcomer at zeromq so I can be wrong. Anyway, any suggestions how to fix this will be appreciated!
Server.py
# -*- coding: utf-8 -*-
import zmq
import threading
import json
import os
import time
# Connect the publisher socket
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")
# Connect the router socket for receiving acknowledgements
router = context.socket(zmq.ROUTER)
router.bind("tcp://*:5557")
# Save message info to a dictionary (a stand-in for your database)
messages = {}
# Send messages
consumers = ['consumer1', 'consumer2', ]
for i in range(10):
message_id = str(i)
file_path = f'./content/{i}'
# Save message info to the database
messages[message_id] = {
'file_path': file_path,
'consumers': consumers.copy(), # copy the list because we're going to modify it
'processed_by': [],
}
# Send the message to all consumers
publisher.send_json({
'message_id': message_id,
'text': f'This is message {i}',
'media_path': file_path,
})
# Cleanup process
def cleanup():
while True:
for message_id, message in messages.items():
#print(message.items())
#print(message['processed_by'])
if set(message['consumers']) == set(message['processed_by']):
print(f"Deleting file {message['file_path']}")
# os.remove(message['file_path']) # uncomment this to actually delete the file
del messages[message_id]
time.sleep(5) # pause between cleanup runs
cleanup_thread = threading.Thread(target=cleanup)
cleanup_thread.start()
# Receive acknowledgements
while True:
# Wait for next request from client
message = router.recv_json()
print(f"Received request: {message}")
# Process the acknowledgement
if message['message_id'] in messages:
messages[message['message_id']]['processed_by'].append(message['consumer'])
#time.sleep(5)
Client.py
import zmq
import time
# Prepare context and sockets
context = zmq.Context()
consumer_id = 'consumer1' # change this for each consumer
# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
# Connect the dealer socket for sending acknowledgements
dealer = context.socket(zmq.DEALER)
dealer.identity = consumer_id.encode()
dealer.connect("tcp://localhost:5557")
# Process messages
while True:
message = subscriber.recv_json()
print(f"Received message: {message}")
#print(message)
# Send acknowledgement
dealer.send_json({
'message_id': message['message_id'],
'consumer': consumer_id,
})
time.sleep(5) # pause between processing messages
Code seems to look right and must work properly byt for some reason it won't
Running with python 3.10.5 on Windows 11
The main problem with your code is that PUB/SUB sockets are lossy -- there is no synchronization between publishers and subscribers, and if a subscriber is not connected and subscribed when a message is sent it will never see that message.
By the time your client has connected and finished negotiating with the server, all of the messages have already been sent.
If you (a) start the subscriber first and (b) add a sleep
to your publisher to give the subscriber a chance to connect, you'll see messages being received by the subscriber as expected.
Then you'll encounter a second problem:
When you receive a message on a ROUTER socket, that message is a multipart message consisting of the client id followed by the actual message data. So when you write:
message = router.recv_json()
This will blow up because you'll be receiving the client id, rather than the JSON message data, and this will fail with:
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
A working version of the server might look like this:
import json
import zmq
import threading
import time
# Cleanup process
def cleanup():
while True:
for message_id, message in messages.items():
# print(message.items())
# print(message['processed_by'])
if set(message["consumers"]) == set(message["processed_by"]):
print(f"Deleting file {message['file_path']}")
# os.remove(message['file_path']) # uncomment this to actually delete the file
del messages[message_id]
time.sleep(5) # pause between cleanup runs
# Connect the publisher socket
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://*:5556")
# Connect the router socket for receiving acknowledgements
router = context.socket(zmq.ROUTER)
router.bind("tcp://*:5557")
# Save message info to a dictionary (a stand-in for your database)
messages = {}
# Send messages
consumers = [
"consumer1",
"consumer2",
]
# give the subscriber a chance to connect
time.sleep(2)
for i in range(10):
message_id = str(i)
file_path = f"./content/{i}"
# Save message info to the database
messages[message_id] = {
"file_path": file_path,
"consumers": consumers.copy(), # copy the list because we're going to modify it
"processed_by": [],
}
# Send the message to all consumers
publisher.send_json(
{
"message_id": message_id,
"text": f"This is message {i}",
"media_path": file_path,
}
)
cleanup_thread = threading.Thread(target=cleanup)
cleanup_thread.start()
# Receive acknowledgements
while True:
# Wait for next request from client
client, data = router.recv_multipart()
message = json.loads(data)
print(f"Received request: {message}")
# Process the acknowledgement
if message["message_id"] in messages:
messages[message["message_id"]]["processed_by"].append(message["consumer"])
# time.sleep(5)
...but this isn't great because of the issues with pub/sub sockets I mentioned earlier. If you're looking for some sort of synchronization between the client and the server, pub/sub sockets are the wrong solution.