multithreadingscalazeromqjeromq

JeroMQ Subscriber in a Runnable


I'm trying to embed ZMQ subscriber in a Runnable. I'm able to start the Runnable for the first time and everything seems okay. The problem is when I interrupt the Thread and try to start a new Thread, the subscriber does not get any messages. For example:

  1. I have a publisher runnable

    class ZMQPublisherRunnable() extends Runnable {
    
     override def run() {
       val ZMQcontext = ZMQ.context(1)
       val publisher = ZMQcontext.socket(ZMQ.PUB)
       var count = 0
    
       publisher.connect(s"tcp://127.0.0.1:16666")
    
       while (!Thread.currentThread().isInterrupted) {
         try {
           println(s"PUBLISHER -> $count")
           publisher.send(s"PUBLISHER -> $count")
           count += 1
           Thread.sleep(1000)
         }
         catch {
           case e: Exception =>
           println(e.getMessage)
           publisher.disconnect(s"tcp://127.0.0.1:16666")
           ZMQcontext.close()
         }
       }
     }
    }
    
  2. I have a Subscriber Runnable:

    class ZMQSubscriberRunnable1() extends Runnable {
    
      override def run() {
    
        println("STARTING SUBSCRIBER")
    
        val ZMQcontext = ZMQ.context(1)
        val subscriber = ZMQcontext.socket(ZMQ.SUB)
        subscriber.subscribe("".getBytes)
    
       subscriber.bind(s"tcp://127.0.0.1:16666")
    
        while (!Thread.currentThread().isInterrupted) {
          try {
            println("waiting")
            val mesg = new String(subscriber.recv(0))
            println(s"SUBSCRIBER -> $mesg")
          }
          catch {
            case e: Exception =>
              println(e.getMessage)
              subscriber.unbind("tcp://127.0.0.1:16666")
              subscriber.close()
              ZMQcontext.close()
          }
        }
      }
    }
    
  3. My main code looks like this:

    object Application extends App {
      val zmqPUB = new ZMQPublisherRunnable
      val zmqThreadPUB = new Thread(zmqPUB, "MY_PUB")
    
      zmqThreadPUB.setDaemon(true)
      zmqThreadPUB.start()
    
      val zmqRunnable = new ZMQSubscriberRunnable1
      val zmqThread = new Thread(zmqRunnable, "MY_TEST")
    
      zmqThread.setDaemon(true)
      zmqThread.start()
    
      Thread.sleep(10000)
    
      zmqThread.interrupt()
      zmqThread.join()
    
      Thread.sleep(2000)
    
      val zmqRunnable_2 = new ZMQSubscriberRunnable1
      val zmqThread_2 = new Thread(zmqRunnable_2, "MY_TEST_2")
    
      zmqThread_2.setDaemon(true)
      zmqThread_2.start()
    
      Thread.sleep(10000)
    
      zmqThread_2.interrupt()
      zmqThread_2.join()
    }
    

The first time I start the Subscriber, I'm able to receive all messages:

STARTING SUBSCRIBER
PUBLISHER -> 0
waiting
PUBLISHER -> 1
SUBSCRIBER -> PUBLISHER -> 1
waiting
PUBLISHER -> 2
SUBSCRIBER -> PUBLISHER -> 2
waiting
PUBLISHER -> 3
SUBSCRIBER -> PUBLISHER -> 3
waiting
...

Once I interrupt the Thread and start a new one from the same Runnable, I'm not able to read messages anymore. It is waiting forever

STARTING SUBSCRIBER
waiting
PUBLISHER -> 13
PUBLISHER -> 14
PUBLISHER -> 15
PUBLISHER -> 16
PUBLISHER -> 17
...

Any insights about what I'm doing wrong?

Thanks


Solution

  • JeroMQ is not Thread.interrupt safe.

    To work around it you have to stop the ZMQContext before you call the Thread.interrupt

    1. Instantiate the ZMQContext outside the Runnable
    2. Pass the ZMQContext as an argument to the ZMQ Runnable (You can also use it is a global variable)
    3. Call zmqContext.term()
    4. Call zmqSubThread.interrupt()
    5. Call zmqSubThread.join()

    For more details take a look at: https://github.com/zeromq/jeromq/issues/116

    My subscriber Runnable looks like:

    class ZMQSubscriberRunnable(zmqContext:ZMQ.Context, port: Int, ip: String, topic: String) extends Runnable {
    
      override def run() {
    
        var contextTerminated = false
        val subscriber = zmqContext.socket(ZMQ.SUB)
        subscriber.subscribe(topic.getBytes)
    
        subscriber.bind(s"tcp://$ip:$port")
    
        while (!contextTerminated && !Thread.currentThread().isInterrupted) {
          try {
            println(new String(subscriber.recv(0)))
          }
          catch {
            case e: ZMQException if e.getErrorCode == ZMQ.Error.ETERM.getCode =>
              contextTerminated = true
              subscriber.close()
            case e: Exception =>
              zmqContext.term()
              subscriber.close()
          }
        }
      }
    }
    

    To interrupt the Thread:

    zmqContext.term()
    zmqSubThread.interrupt()
    zmqSubThread.join()