javamultithreadingmqttpaho

Process multiple messaging with Mqttclient asynchronously and concurrently


I am developing a program that consumes messages from a MQTT topic and my goal is that I can consume and process several messages asynchronously.

I am using the eclipse clients: https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttClient.html https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttAsyncClient.html

The problem is that several messages are not processed at the same time, they are all executed in the same thread. I do not understand very well the difference between using MqttClient and MqttAsyncClient. The javadoc says:

MqttClient

Lightweight client for talking to an MQTT server using methods that block until an operation completes.

MqttAsyncClient

Lightweight client for talking to an MQTT server using non-blocking methods that allow an operation to run in the background.

Neither I have very clear the difference between using the method "subscribe" or "setCallback". Only with "subscribe" you can declare several Listener: setCallback

Sets a callback listener to use for events that happen asynchronously. subscribe Subscribe to a topic...

It has tried to send ten messages at the same time. My tests are the following:

public class FooListener implements IMqttMessageListener {
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Thread [ " + Thread.currentThread().getName() + 
                "], Topic[ "+ topic + "],  Message [" + message +"] ");
    }
}

public class FooCallbackListener implements MqttCallback {

    @Override
    public void connectionLost(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        //TODO:emtpy
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Thread [ " + Thread.currentThread().getName() + 
                "], Topic[ "+ topic + "],  Message [" + message +"] ");
    }

}

MqttClient and subscribe:

public class FooMqttClient {

    public static void main(String[] args) {
        MqttConnectOptions connOpt = new MqttConnectOptions();
        connOpt.setCleanSession(true);
        connOpt.setKeepAliveInterval(30);
        String serverUri = "tcp://iot.eclipse.org:1883";
        String clientId = UUID.randomUUID().toString();

        try {
            MqttClient myClient = new MqttClient(serverUri, clientId);
            myClient.connect(connOpt);
            myClient.subscribe("topic/foo", new FooListener());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

Results:

Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[topic/foo],  Message [Foo 0]  
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo],  Message [Foo 1]  
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo],  Message [Foo 2]  
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo],  Message [Foo 3]  
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo],  Message [Foo 4]  
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo],  Message [Foo 5]  
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo],  Message [Foo 6]  
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo],  Message [Foo 7]  
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo],  Message [Foo 8]  
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo],  Message [Foo 9]

MqttClient and setCallback:

public class FooMqttCallbackClient {

    public static void main(String[] args) {
        MqttConnectOptions connOpt = new MqttConnectOptions();
        connOpt.setCleanSession(true);
        connOpt.setKeepAliveInterval(30);
        String serverUri = "tcp://iot.eclipse.org:1883";
        String clientId = UUID.randomUUID().toString();

        try {
            MqttClient myClient = new MqttClient(serverUri, clientId);
            myClient.connect(connOpt);
            myClient.subscribe("topic/foo");
            myClient.setCallback(new FooCallbackListener());
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

Results:

Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo],  Message [Foo 0]  
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo],  Message [Foo 1]  
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo],  Message [Foo 2]  
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo],  Message [Foo 3]  
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo],  Message [Foo 4]  
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo],  Message [Foo 5]  
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo],  Message [Foo 6]  
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo],  Message [Foo 7]  
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo],  Message [Foo 8]  
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo],  Message [Foo 9]

MqttAsyncClient and subscribe:

public class FooAsyncMqttClient {
    public static void main(String[] args) {
        MqttConnectOptions connOpt = new MqttConnectOptions();
        connOpt.setCleanSession(true);
        connOpt.setKeepAliveInterval(30);
        String serverUri = "tcp://iot.eclipse.org:1883";
        String clientId = UUID.randomUUID().toString();

        try {
            MqttAsyncClient myClient = new MqttAsyncClient(serverUri, clientId);
            myClient.connect(connOpt);
            Thread.sleep(1000);
            myClient.subscribe("topic/foo", 1, new FooListener());
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

Results:

Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo],  Message [Foo 0]  
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo],  Message [Foo 1]  
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo],  Message [Foo 2]  
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo],  Message [Foo 3]  
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo],  Message [Foo 4]  
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo],  Message [Foo 5]  
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo],  Message [Foo 6]  
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo],  Message [Foo 7]  
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo],  Message [Foo 8]  
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo],  Message [Foo 9]

MqttAsyncClient and setCallback

public class FooAsyncMqttCallbackClient {

    public static void main(String[] args) {
        MqttConnectOptions connOpt = new MqttConnectOptions();
        connOpt.setCleanSession(true);
        connOpt.setKeepAliveInterval(30);
        String serverUri = "tcp://iot.eclipse.org:1883";
        String clientId = UUID.randomUUID().toString();

        try {
            MqttAsyncClient myClient = new MqttAsyncClient(serverUri, clientId);
            myClient.connect(connOpt);
            Thread.sleep(1000);
            myClient.subscribe("topic/foo", 1);
            myClient.setCallback(new FooCallbackListener());
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}

Results:

Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo],  Message [Foo 0]  
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo],  Message [Foo 1]  
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo],  Message [Foo 2]  
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo],  Message [Foo 3]  
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo],  Message [Foo 4]  
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo],  Message [Foo 5]  
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo],  Message [Foo 6]  
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo],  Message [Foo 7]  
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo],  Message [Foo 8]  
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo],  Message [Foo 9]

In all my tests, the listeners are executed in the same thread and not concurrently. How can I do to process the messages at the same time and concurrently? What is the difference between MqttClient and MqttAsyncClient?

Solution:

public class FooExecutorListener implements IMqttMessageListener {

    private ExecutorService pool = Executors.newFixedThreadPool(10);

    class MessageHandler implements Runnable {
        MqttMessage message;
        String topic;

        public MessageHandler(String topic, MqttMessage message) {
            this.message = message;
            this.topic = topic;
        }

        public void run() {
            System.out.println("Thread [ " + Thread.currentThread().getName() + 
                    "], Topic[ "+ topic + "],  Message [" + message +"] ");
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        pool.execute(new MessageHandler(topic, message));
    }

}

Results:

Thread [ pool-2-thread-1], Topic[ topic/foo],  Message [Foo 0] 
Thread [ pool-2-thread-2], Topic[ topic/foo],  Message [Foo 1] 
Thread [ pool-2-thread-5], Topic[ topic/foo],  Message [Foo 4] 
Thread [ pool-2-thread-4], Topic[ topic/foo],  Message [Foo 3] 
Thread [ pool-2-thread-7], Topic[ topic/foo],  Message [Foo 6] 
Thread [ pool-2-thread-6], Topic[ topic/foo],  Message [Foo 5] 
Thread [ pool-2-thread-8], Topic[ topic/foo],  Message [Foo 7] 
Thread [ pool-2-thread-3], Topic[ topic/foo],  Message [Foo 2] 
Thread [ pool-2-thread-1], Topic[ topic/foo],  Message [Foo 10] 
Thread [ pool-2-thread-2], Topic[ topic/foo],  Message [Foo 11] 
Thread [ pool-2-thread-5], Topic[ topic/foo],  Message [Foo 12] 
Thread [ pool-2-thread-5], Topic[ topic/foo],  Message [Foo 13] 
Thread [ pool-2-thread-5], Topic[ topic/foo],  Message [Foo 14] 
Thread [ pool-2-thread-9], Topic[ topic/foo],  Message [Foo 8] 
Thread [ pool-2-thread-10], Topic[ topic/foo],  Message [Foo 9] 

Solution

  • The difference between the 2 version of the clients is to do with connecting/publishing not subscribing. The Async version will connect and publish without blocking.

    Subscription handling in both cases is handled on the background network tread.

    If you want to handle incoming messages in parallel then you need to implement your own thread pool and distribute the incoming messages to the pool.

    The easiest way to do this is with Java's ExecutorService class. e.g.

    public class FooListener implements IMqttMessageListener {
    
      ExecutorService pool = Executors.newFixedThreadPool(10);
    
      class MessageHandler implements Runnable {
        MqttMessage message;
        String topic;
    
        public MessageHandler(String topic; MqttMessage message) {
          this.message = message;
        }
    
        public void run() {
          //process message
        }
      }
    
      @Override
      public void messageArrived(String topic, MqttMessage message) throws Exception {
        pool.execute(new MessageHandler(topic,message));
      }
    }