I am trying to use ZeroMQ's pub-sub sockets. However, I don't clearly understand the role of context (zmq::context_t
) while creating sockets (zmq::socket_t
).
Assuming that I want to create 5 subscriber sockets (zmq::socket_t
using ZMQ_SUB
), do I need 5 contexts, one for each of the subscriber sockets? Or can I use a single context for all 5 sockets?
Assuming that I want to create 5 subscriber sockets (
zmq::socket_t
using
ZMQ_SUB
), do I need 5 contexts, one for each of the subscriber sockets? Or can I use a single context for all 5 sockets?
You need only one Context
instance for this light-weight use-case. Check the part of documentation attached below explaining 0MQ context usage and an example I made for you attached at the end of this post.
ZeroMQ applications always start by creating a context, and then using that for creating sockets. In C, it's the
zmq_ctx_new()
call. You should create and use exactly one context in your process. Technically, the context is the container for all sockets in a single process, and acts as the transport forinproc
sockets, which are the fastest way to connect threads in one process. If at runtime a process has two contexts, these are like separate ZeroMQ instances.
I made an example for you below as a help for you in understanding ZMQ context
and ZMQ PUB-SUB
pattern. Creating 5 subscriber sockets is fine as long as you have 5 producing services. However if you have one source publishing notifications I would recommend using PUB-SUB
pattern and filtering property of ZMQ SUB
sockets. You can check how to set that up below in my code in communication between publisher #1
and subscriber
.
Publisher #1 sends temperature and humidity updates..
import zmq
from time import sleep
# Server socket
context = zmq.Context()
socket = context.socket( zmq.PUB )
socket.bind( "tcp://*:5556" )
while True:
socket.send_multipart( [ "TEMP", "25.40" ] )
socket.send_multipart( [ "HUMD", "48.90" ] )
sleep( 1 )
Publisher #2 sends pressure updates..
import zmq
from time import sleep
# Server socket
context = zmq.Context()
socket2 = context.socket( zmq.PUB )
socket2.bind( "tcp://*:5557" )
while True:
socket2.send_multipart( [ "PRSS", "10000.00" ] )
sleep( 1 )
Subscriber registered to temperature, humidity and pressure updates on two different servers..
import zmq
from time import sleep
# Sockets to talk to servers
context = zmq.Context()
socket = context.socket( zmq.SUB )
socket.connect( "tcp://localhost:5556" )
socket2 = context.socket( zmq.SUB )
socket2.connect( "tcp://localhost:5557" )
# Set filters
socket.setsockopt_string( zmq.SUBSCRIBE, "TEMP".decode( 'ascii' ) )
socket.setsockopt_string( zmq.SUBSCRIBE, "HUMD".decode( 'ascii' ) )
socket2.setsockopt_string( zmq.SUBSCRIBE, "PRSS".decode( 'ascii' ) )
poller = zmq.Poller()
poller.register( socket, zmq.POLLIN )
poller.register( socket2, zmq.POLLIN )
while True:
socks = dict( poller.poll() )
if socket in socks and socks[socket] == zmq.POLLIN:
[ measurement, value ] = socket.recv_multipart()
print measurement
print value
if socket2 in socks and socks[socket2] == zmq.POLLIN:
[ measurement, value ] = socket2.recv_multipart()
print measurement
print value
sleep( 1 )