oauth-2.0wso2mqttwso2-message-brokerwso2-identity-server

MQTT Oauth2 authentication against WSO2 Message broker


I want to get a MQTT broker to authenticate with an OAUTH2 server.

I am coding the MQTT client in Python 3 using Paho.

I have succesfullly connected to the IS to obtain the access token.

I have succeesfully sent json messages through the MQTT Message Broker.

I have configured WSO2 Message Broker and Identity Server to work together following the oficial instructions.

When I try to connect with the authentication information I cannot get to send correctly the acces token to be processed.

client = mqtt.Client("oauthmqttcli") 
client.connect(broker_address, port=1883) #connect to broker
client.username_pw_set(access_token)
client.publish("test","{'a':'b'}")

In the logs I see this:

TID: [] [] [2018-09-21 10:56:33,019] ERROR {org.dna.mqtt.wso2.MqttLogExceptionHandler} -  ValueEvent exception occurred on disruptor. {org.dna.mqtt.wso2.MqttLogExceptionHandler}
java.lang.NullPointerException
        at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:390)
        at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:171)
        at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:47)
        at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
TID: [] [] [2018-09-21 10:56:33,026] ERROR {org.dna.mqtt.moquette.server.netty.metrics.MessageMetricsHandler} -  Connection reset by peer {org.dna.mqtt.moquette.server.netty.metrics.MessageMetricsHandler}
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:446)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:748)
TID: [] [] [2018-09-21 10:56:33,027]  INFO {org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor} -  Lost connection with client oauthmqttcli {org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor}
TID: [] [] [2018-09-21 10:56:33,027]  WARN {org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor} -  MQTTAuthorizationSubject for client ID oauthmqttcli is not removed since the entry does not exist {org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor}

How can I send the access token to the server to be accessed? Where is the Message Broker searching for the access token?


Solution

  • I finally found it. The access token must be sent as a username with a blank password.

    client = mqtt.Client("dev7") 
    client.username_pw_set(username=access_token,password="")
    client.connect(broker_address, port=broker_port) #connect to broker
    print("Connected")
    client.publish("test","{'c':'Payload!'}")
    print("Sending message")