activemq-classicmqttraspberry-pi2kurawso2-das

WSO2 - DAS consuming MQTT messages


I am playing around with Eclipse Kura 1.2.2, WSO2 DAS 3.0.0 and ActiveMQ 5.12.1 to do some experimenting in the world of IoT. So far I managed to setup DAS as the M2M middleware server, Kura on a Raspberry PI2 as the IoT gateway and ActiveMQ as MQTT server.

I also wrote a very basic MQTT message producer sending a very simple MQTT message periodically to the MQTT server to simulate an actual device sending MQTT messages. The idea is to replace this application with a BlueTooth device sending data periodically.

When I use MQTTSpy to monitor incoming messages, I have noticed the MQTT messages are formatted binary. This is clearly stated in the docs as Kura uses Google protocol buffers when sending data using MQTT. Since DAS does not support this type of MQTT messages, I assume this causes the server to not respond to any incoming message.

I configured a DAS stream using the following definition:

{
  "streamId": "mqtt_sample_01:1.0.0",
  "name": "mqtt_sample_01",
  "version": "1.0.0",
  "nickName": "mqtt_sample_01",
  "description": "mqtt_sample_01",
  "metaData": [],
  "correlationData": [],
  "payloadData": [
    {
      "name": "temperature",
      "type": "FLOAT"
    }
  ]
}

I also created a receiver for the incoming MQTT message using the following code:

<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="mqtt_sample_receiver_protobuf" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
    <from eventAdapterType="mqtt-protobuf">
        <property name="topic">mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata</property>
        <property name="clientId">mqtt-client-01</property>
        <property name="url">tcp://192.168.1.42:1883</property>
        <property name="cleanSession">false</property>
    </from>
    <mapping customMapping="disable" type="map"/>
    <to streamName="mqtt_sample_01" version="1.0.0"/>
</eventReceiver>

Note: I have also tried JSON and XML as mapping type as well.

To display everything at the DAS console, I added a publisher using:

<?xml version="1.0" encoding="UTF-8"?>
<eventPublisher name="mqtt_sample_logger_01" statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventpublisher">
    <from streamName="mqtt_sample_01" version="1.0.0"/>
    <mapping customMapping="disable" type="text"/>
    <to eventAdapterType="logger">
        <property name="uniqueId">mqtt_sample_logger_01</property>
    </to>
</eventPublisher>  

Kura formats the MQTT message using Google protocol buffers which is not understood by the WSO2-DAS. To solve this problem, a few possibilities exist:

  1. The MQTT message formatting could be altered at Kura to not use Google protocol buffers for encoding. I found an article on SO which is more or less similar to this approach resulting both in the loss of all the advantages offered by the CloudClient class.
  2. A possibility is to write your own DAS receiver as described in this article or this article.
  3. A third option is to browse through the Kura code and create an own implementation of the CloudService/CloudClient implementation.

To my personal opinion, the best solution would be going for the second option, writing a custom event receiver that understands and decodes the Google protocol buffer format produced by Kura. Other, perhaps even better solutions are also more than welcome.

Important notice:
ActiveMQ uses the dot notation for the topic name in the GUI (mqtt-sender-topic.mqtt-client-01.MQTT_APP_V1.mydata). But the real name of the topic uses the /-notation (mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata).

To build the custom receiver, I decided to copy existing code from the original MQTT receiver and alter it to process the protobuf format and translate it into XML (at least that's the idea). After some struggling to get all dependencies setup correctly, I managed to build a working custom receiver.

Unfortunately, we are not entirely where I wanted to be. There seems to be a problem with the connection to the MQTT broker. The receiver starts up but seems to lose its connection frequently writing the following message in the log.

DEBUG {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT client subscribed to : mqtt-sender-topic/mqtt-client-01/MQTT_APP_V1/mydata
INFO {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT Connection successful
WARN {test.wso2.mqtt.receiver.internal.util.MQTTProtobufAdapterListener} -  MQTT connection not reachable
Connection lost (32109) - java.io.EOFException
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:138)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:56)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:100)
... 1 more

For what it's worth, the broker (ActiveMQ) complains with a warning stating:

WARN Stealing link for clientId mqtt-client-01 From Connection Transport Connection to: tcp://192.168.1.42:4594

My code must definitely do something wrong causing the connection to be dropped. The question is what. So any suggestions, ideas, solutions are again more than welcome!

HINT:
Start DAS with the -DosgiConsole option allowing you to investigate the status of your deployed bundle. After successful deployment of the receiver, the command diag [bundle_number] should output something like:
osgi> diag 473
reference:file:../dropins/test.wso2.mqtt.receiver.MqttProtobufReceiver->1.0.0.jar [473]
No unresolved constraints.


Solution

  • An example of an input receiver for WSO2 products (e.g. Data Analytics Server) able of processing Google protocol buffer formatted messages as created by Eclipse Kura (KuraPayload format) can be downloaded at Google Drive.

    The Kura sample application sending messages can also be downloaded at Google Drive.

    The receiver receives the binary formatted KuraPayload format and converts it into XML. Check the sample application for the XML format.

    Please share improvements/modifications you do on the receiver to help others.