pythondjangomqttdjango-channelsasgi

Is it posible to listen to an MQTT server and publish with Websocket (endpoint) from our own server


I have got an mqtt consumer that listens to a topic and based on that, I used to send a response on another topic. However now I would like to create a Websocket Secure wss endpoint where i could stream this processed information. Could you tell me if it is possible to do that wth mqttasgi library, if yes how.

Here I leave the code of my consumer.

from mqttasgi.consumers import MqttConsumer
from mqtt_handler.tasks import processmqttmessage

import json

class MyMqttConsumer(MqttConsumer):

async def connect(self):
    await self.subscribe('application/+/device/+/event/up', 2)
    await self.channel_layer.group_add("stracontech", self.channel_name)

async def receive(self, mqtt_message):
    print('Received a message at topic:', mqtt_message['topic'])
    print('With payload', mqtt_message['payload'])
    print('And QOS:', mqtt_message['qos'])
    dictresult = json.loads(mqtt_message['payload'])
    jsonresult = json.dumps(dictresult)
    processmqttmessage.delay(jsonresult, mqtt_message['topic'])
    pass

async def publish_results(self, event):
    data = event['result']
    await self.publish("stracontech/procesed/"+event['result']['device_id']+"/result", json.dumps(data).encode('utf-8'), qos=2, retain=False)

async def disconnect(self):
    await self.unsubscribe('application/+/device/+/event/up')

Pd: @Santiago Ivulich maybe you can help me with that.


Solution

  • For Websocket consumer you need to follow the guide in channels.

    An example:

    from channels.generic.websocket import AsyncWebsocketConsumer
    
    class MyConsumer(AsyncWebsocketConsumer):
        groups = ["broadcast"]
    
        async def connect(self):
            # Called on connection.
            # To accept the connection call:
            await self.accept()
            # Or accept the connection and specify a chosen subprotocol.
            # A list of subprotocols specified by the connecting client
            # will be available in self.scope['subprotocols']
            await self.accept("subprotocol")
            # To reject the connection, call:
            await self.close()
    
        async def receive(self, text_data=None, bytes_data=None):
            # Called with either text_data or bytes_data for each frame
            # You can call:
            await self.send(text_data="Hello world!")
            # Or, to send a binary frame:
            await self.send(bytes_data="Hello world!")
            # Want to force-close the connection? Call:
            await self.close()
            # Or add a custom WebSocket error code!
            await self.close(code=4123)
    
        async def disconnect(self, close_code):
            # Called when the socket closes
    

    Then on your asgi.py:

    import os
    import django
    from channels.routing import ProtocolTypeRouter
    from my_application.consumers import MyMqttConsumer
    from django.core.asgi import get_asgi_application
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'integrator_mqtt.settings')
    
    django.setup()
    
    application = ProtocolTypeRouter({
            'http': get_asgi_application(),
            "websocket": MyConsumer.as_asgi(),
            'mqtt': MyMqttConsumer.as_asgi(),
        })
    

    I would recommend to use daphne as the protocol server for the service that listens and streams to websockets.