zeromqpyzmqjzmq

ZeroMQ: Same context for multiple sockets


I am trying to use ZeroMQ's pub-sub sockets. However, I don't clearly understand the role of context (zmq::context_t) while creating sockets (zmq::socket_t).

Assuming that I want to create 5 subscriber sockets (zmq::socket_t using ZMQ_SUB), do I need 5 contexts, one for each of the subscriber sockets? Or can I use a single context for all 5 sockets?


Solution

  • Assuming that I want to create 5 subscriber sockets ( zmq::socket_t using ZMQ_SUB ), do I need 5 contexts, one for each of the subscriber sockets? Or can I use a single context for all 5 sockets?

    You need only one Context instance for this light-weight use-case. Check the part of documentation attached below explaining 0MQ context usage and an example I made for you attached at the end of this post.

    ZeroMQ applications always start by creating a context, and then using that for creating sockets. In C, it's the zmq_ctx_new() call. You should create and use exactly one context in your process. Technically, the context is the container for all sockets in a single process, and acts as the transport for inproc sockets, which are the fastest way to connect threads in one process. If at runtime a process has two contexts, these are like separate ZeroMQ instances.

    I made an example for you below as a help for you in understanding ZMQ context and ZMQ PUB-SUB pattern. Creating 5 subscriber sockets is fine as long as you have 5 producing services. However if you have one source publishing notifications I would recommend using PUB-SUB pattern and filtering property of ZMQ SUB sockets. You can check how to set that up below in my code in communication between publisher #1 and subscriber.

    Publisher #1 sends temperature and humidity updates..

    import zmq
    from time import sleep
    
    # Server socket
    context = zmq.Context()
    socket  = context.socket( zmq.PUB )
    socket.bind( "tcp://*:5556" )
    
    while True:
        socket.send_multipart( [ "TEMP", "25.40" ] )
        socket.send_multipart( [ "HUMD", "48.90" ] )
        sleep( 1 )
    

    Publisher #2 sends pressure updates..

    import zmq
    from time import sleep
    
    # Server socket
    context = zmq.Context()
    socket2 = context.socket( zmq.PUB )
    socket2.bind( "tcp://*:5557" )
    
    while True:
        socket2.send_multipart( [ "PRSS", "10000.00" ] )
        sleep( 1 )
    

    Subscriber registered to temperature, humidity and pressure updates on two different servers..

    import zmq
    from time import sleep
    
    # Sockets to talk to servers
    context = zmq.Context()
    socket  = context.socket( zmq.SUB )
    socket.connect(  "tcp://localhost:5556" )
    socket2 = context.socket( zmq.SUB )
    socket2.connect( "tcp://localhost:5557" )
    
    # Set filters
    socket.setsockopt_string(  zmq.SUBSCRIBE, "TEMP".decode( 'ascii' ) )
    socket.setsockopt_string(  zmq.SUBSCRIBE, "HUMD".decode( 'ascii' ) )
    socket2.setsockopt_string( zmq.SUBSCRIBE, "PRSS".decode( 'ascii' ) )
    
    poller = zmq.Poller()
    poller.register( socket,  zmq.POLLIN )
    poller.register( socket2, zmq.POLLIN )
    
    while True:
        socks = dict( poller.poll() )
        if socket in socks and socks[socket] == zmq.POLLIN:
            [ measurement, value ] = socket.recv_multipart()
            print measurement
            print value
    
        if socket2 in socks and socks[socket2] == zmq.POLLIN:
            [ measurement, value ] = socket2.recv_multipart()
            print measurement
            print value
    
        sleep( 1 )