c++zeromqdistributed-systempyzmqmulti-agent

How to monitor whether a ZeroMQ server exists?


I want to check the existence ( state ) of a server before I send a ZeroMQ request, but I have no idea how to do it.


Solution

  • Q : I want to check the existence ( state ) of a server before I send a ZeroMQ request

    The solution is to setup and use the services of a zmq_socket_monitor()

    // Read one event off the monitor socket; return value and address
    // by reference, if not null, and event number by value. Returns -1
    // in case of error.
    
    static int
    get_monitor_event ( void  *monitor,
                        int   *value,
                        char **address
                        )
    {   
        zmq_msg_t msg;
        zmq_msg_init ( &msg );                                       // First frame in message contains event number and value
        if ( zmq_msg_recv ( &msg, monitor, 0 ) == -1 ) return -1;    // Interrupted, presumably
        assert ( zmq_msg_more ( &msg )              & "REASON: Frame #1 FAILED TO SIG 2nd, EXPECTED, FRAME TO COME" );
    
        uint8_t  *data  =  ( uint8_t  * ) zmq_msg_data ( &msg );
        uint16_t  event = *( uint16_t * ) ( data );
    
        if ( value )
            *value = *( uint32_t * ) ( data + 2 );
    
    
        zmq_msg_init ( &msg );                                      // Second frame in message contains event address
        if ( zmq_msg_recv ( &msg, monitor, 0 ) == -1 ) return -1;   // Interrupted, presumably
        assert ( !zmq_msg_more ( &msg )             & "REASON: Frame #2 FAILED TO SIG more, NOT EXPECTED, FRAMEs TO COME" );
    
        if ( address ) {
            uint8_t *data = ( uint8_t * ) zmq_msg_data ( &msg );
            size_t   size =               zmq_msg_size ( &msg );
            *address = ( char * ) malloc ( size + 1 );
            memcpy ( *address, data, size );
            ( *address )[size] = 0;
        }
        return event;
    }
    
    int main ( void )
    {   
        void    *ctx = zmq_ctx_new ();
        assert ( ctx                                & "REASON: Context FAILED to instantiate" );
    
        void    *client = zmq_socket ( ctx, ZMQ_DEALER );
        assert ( client                             & "REASON: Socket FAILED to instantiate" );
    
     // Socket monitoring only works over inproc://
        int      rc = zmq_socket_monitor ( client, "inproc://monitor-client-side", ZMQ_EVENT_ALL );
        assert ( rc == 0                            & "REASON: socket_monitor FAILED to instantiate over INPROC:// transport-class" );
    
     // Create socket for collecting monitor events
        void    *client_side_mon = zmq_socket ( ctx, ZMQ_PAIR );
        assert ( client_side_mon                    & "REASON: socket_monitor receiving Socket FAILED to instantiate " );
    
     // Connect these to the inproc endpoints so they'll get events
                 rc = zmq_connect ( client_side_mon, "inproc://monitor-client-side" );
        assert ( rc == 0                            & "REASON: .connect()-method FAILED to get connected" );
    
     // Now do whatever you need
        ...
    
     // Close client
        close_zero_linger ( client );
    
     // --------------------------------------------------------------------
     // How to collect and check events from socket_monitor:
        int  event =  get_monitor_event ( client_side_mon, NULL, NULL );
    
        if ( event == ZMQ_EVENT_CONNECT_DELAYED )
             event =  get_monitor_event ( client_side_mon, NULL, NULL );
    
        assert ( event == ZMQ_EVENT_CONNECTED       & "REASON: [client]-socket still not in an expected, .connect()-ed, state" );
        ...
    
        ...
        event = get_monitor_event ( client_side_mon, NULL, NULL );
        assert ( event == ZMQ_EVENT_MONITOR_STOPPED & "REASON: [client]-socket not in an expected, .close()-ed, state" );
    
     // --------------------------------------------------------------------
     // FINALLY:
     // --------------------------------------------------------------------
     // Close down the sockets
        close_zero_linger ( client_side_mon );
    
        zmq_ctx_term ( ctx );
    
        return 0;
        }
    

    ( included in API since v3.2+ )