azureazure-iot-edge

Azure IoT Edge forwared messages between modules don't include properties (destination Node-RED) (SOLVED)


I have deployed an IoT Edge device with 2 modules, one instance of Simulated Temperature Sensor and another of Node-RED (both from the Marketplace). I can forward the messages generated from the first one to one of the inputs of Node-RED but the received message doesn't include any properties, nor system's neither user-defined.

The list of routes configured for this IoT Edge device includes 2:

  1. NoderedToIotHub = FROM /messages/modules/nodered/outputs/* INTO $upstream
  2. SimulatedTemperatureSensorToNodered = FROM /messages/modules/SimulatedTemperatureSensor/* INTO BrokeredEndpoint("/modules/nodered/inputs/input1")

Although the message is received by Node-RED, I don't have access to the properties (iothub-connection-device-id, etc.). This is the message received:

{
    "payload": {
        "machine": {
            "temperature": 78.45809454307097,
            "pressure": 7.545858871995427
        },
        "ambient": {
            "temperature": 21.225943549765738,
            "humidity": 26
        },
        "timeCreated": "2024-05-10T12:40:41.4387644Z"
    },
    "topic": "input",
    "input": "input1",
    "_msgid": "ac58dc85f99232e7"
}

Following the recommendations I found in another thread of this site, I added "WITH $includeProperties" at the end of the second route, but still the same. I've also tried to add "WITH $systemProperties" to see at least the system's, just for testing, and yet nothing.

Is there any way to forward them? I might need them in some scenarios.

Just in case it's relevant, the Node-RED version available in the Marketplace uses node-red-contrib-azure-iot-edge-module (1.0.4). I know there's an alternative and slightly more updated version available (node-red-contrib-azure-iot-edge-kpm) but it looks like it won't be maintained in the future.

So I've entered to see the source code of this module and the main file azureiotedge.js looks like this:

module.exports = function (RED) {
    'use strict'

    var Transport = require('azure-iot-device-mqtt').Mqtt;
    var Client = require('azure-iot-device').ModuleClient;
    var Message = require('azure-iot-device').Message;

    var statusEnum = {
        disconnected: { color: "red", text: "Disconnected" },
        connected: { color: "green", text: "Connected" },
        sent: { color: "blue", text: "Sending message" },
        received: { color: "yellow", text: "Receiving message" },
        reported: { color: "blue", text: "Sending reported properties" },
        desired: { color: "yellow", text: "Receiving desired properties" },
        method: { color: "yellow", text: "Receiving direct method" },
        response: { color: "blue", text: "Sending method response" },
        error: { color: "grey", text: "Error" }
    };

    var edgeClient;
    var moduleTwin;
    var methodResponses = [];

    // Function to create the IoT Edge Client 
    function IoTEdgeClient(config) {
        // Store node for further use
        var node = this;
        node.connected = false;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);

        // Create the IoT Edge client
        Client.fromEnvironment(Transport, function (err, client) {
            if (err) {
                node.log('Module Client creation error:' + err);
            }
            else {
                client.on('error', function (err) {
                    node.log('Module Client error:' + err);
                });
                node.log('Module Client created.');
                // connect to the Edge instance
                client.open(function (err) {
                    if (err) {
                        node.log('Module Client open error:' + err);
                        throw err;
                    } else {
                        node.log('Module Client connected.');
                        edgeClient = client;
                        client.getTwin(function(err, twin) {
                            if (err) {
                                node.error('Could not get the module twin: ' + err);
                                throw err;
                            } else {
                                node.log('Module twin created.');
                                node.log('Twin contents:');
                                node.log(JSON.stringify(twin.properties));

                                node.on('close', function() {
                                    node.log('Azure IoT Edge Module Client closed.');
                                    edgeClient = null;
                                    moduleTwin = null;
                                    twin.removeAllListeners();
                                    client.removeAllListeners();
                                    client.close();
                                });
                                moduleTwin = twin;
                            }
                        });
                    }
                });
            }
        });
    }

    // Function to create the Module Twin 
    function ModuleTwin(config) {
        // Store node for further use
        var node = this;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);
        setStatus(node, statusEnum.disconnected);  

        // Get the twin
        getTwin().then(function(twin){
            setStatus(node, statusEnum.connected);
            // Register for changes
            twin.on('properties.desired', function(delta) {
                setStatus(node, statusEnum.desired);
                node.log('New desired properties received:');
                node.log(JSON.stringify(delta));
                node.send({payload: delta, topic: "desired"})
                setStatus(node, statusEnum.connected);
            });

            node.on('input', function (msg) {
                setStatus(node, statusEnum.reported);
                var messageJSON = null;
    
                if (typeof (msg.payload) != "string") {
                    messageJSON = msg.payload;
                } else {
                    //Converting string to JSON Object
                    messageJSON = JSON.parse(msg.payload);
                }

                twin.properties.reported.update(messageJSON, function(err) {
                    if (err) throw err;
                    node.log('Twin state reported.');
                    setStatus(node, statusEnum.connected);
                });
            });
        })
        .catch(function(err){
            node.log('Module Twin error:' + err);
        });
                        
        node.on('close', function(done) {
            setStatus(node, statusEnum.disconnected);
            done();
        });
    }

    // Module input to receive input from edgeHub
    function ModuleInput(config) {
        // Store node for further use
        var node = this;
        node.input = config.input;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);
        setStatus(node, statusEnum.disconnected);
        getClient().then(function(client){
            setStatus(node, statusEnum.connected);
            // Act on module input messages
            node.log("Module Input created: " + node.input);
            client.on('inputMessage', function (inputName, msg) {
                outputMessage(client, node, inputName, msg);
            });
        })
        .catch(function(err){
            node.log("Module Input can't be loaded: " + err);
        });
       
        node.on('close', function(done) {
            setStatus(node, statusEnum.disconnected);
            done();
        });
    }

    // Module output to send output to edgeHub 
    function ModuleOutput(config) {
        // Store node for further use
        var node = this;
        node.output = config.output;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);
        setStatus(node, statusEnum.disconnected);
        getClient().then(function(client){
            setStatus(node, statusEnum.connected);
            // React on input from node-red
            node.log("Module Output created: " + node.output);
            node.on('input', function (msg) {
                setStatus(node, statusEnum.sent);
                var messageJSON = null;

                if (typeof (msg.payload) != "string") {
                    messageJSON = msg.payload;
                } else {
                    //Converting string to JSON Object
                    messageJSON = JSON.parse(msg.payload);
                }

                var messageOutput = node.output;
                sendMessageToEdgeHub(client, node, messageJSON, messageOutput);
            });
        })
        .catch(function(err){
            node.error("Module Output can't be loaded: " + err);
        });

        node.on('close', function(done) {
            setStatus(node, statusEnum.disconnected);
            done();
        });
    }

    // Module method to receive methods from IoT Hub 
    function ModuleMethod(config) {
        // Store node for further use
        var node = this;
        node.method = config.method;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);
        setStatus(node, statusEnum.disconnected);
        getClient().then(function(client){
            setStatus(node, statusEnum.connected);
            var mthd = node.method;
            node.log('Direct Method created: ' + mthd);
            client.onMethod(mthd, function(request, response) {
                // Set status
                setStatus(node, statusEnum.method);
                node.log('Direct Method called: ' + request.methodName);

                if(request.payload) {
                    node.log('Method Payload:' + JSON.stringify(request.payload));
                    node.send({payload: request.payload,topic: "method", method: request.methodName});
                }
                else {
                    node.send({payload: null,topic: "method", method: request.methodName});
                }

                getResponse(node).then(function(rspns){
                    var responseBody;
                    if (typeof (rspns.response) != "string") {
                        // Turn message object into string 
                        responseBody = JSON.stringify(rspns.response);
                    } else {
                        responseBody = rspns.response;
                    }
                    response.send(rspns.status, responseBody, function(err) {
                        if (err) {
                        node.log('Failed sending method response: ' + err);
                        } else {
                        node.log('Successfully sent method response.');
                        }
                    });
                })
                .catch(function(err){
                    node.error("Failed sending method response: response not received.");
                });
                // reset response
                node.response = null;

                setStatus(node, statusEnum.connected);
            }); 
            
            // Set method response on input
            node.on('input', function (msg) {
                var method = node.method;
                methodResponses.push(
                    {method: method, response: msg.payload, status: msg.status}
                );
                node.log("Module Method response set through node input: " + JSON.stringify(methodResponses.find(function(m){return m.method === method}))); 
            });
        })
        .catch(function(err){
            node.error("Module Method can't be loaded: " + err);
        });

        node.on('close', function(done) {
            setStatus(node, statusEnum.disconnected);
            done();
        });
    }

    // Get module client using promise, and retry, and slow backoff
    function getClient(){
        var retries = 20;
        var timeOut = 1000;
        // Retrieve client using progressive promise to wait for module client to be opened
        var promise = Promise.reject();
        for(var i=1; i <= retries; i++) {
            promise = promise.catch( function(){
                    if (edgeClient){
                        return edgeClient;
                    }
                    else {
                        throw new Error("Module Client not initiated..");
                    }
                })
                .catch(function rejectDelay(reason) {
                    retries++;
                    return new Promise(function(resolve, reject) {
                        setTimeout(reject.bind(null, reason), timeOut * ((retries % 10) + 1));
                    });
                });
        }
        return promise;
    }

    // Get module twin using promise, and retry, and slow backoff
    function getTwin(){
        var retries = 10;
        var timeOut = 1000;
        // Retrieve twin using progressive promise to wait for module twin to be opened
        var promise = Promise.reject();
        for(var i=1; i <= retries; i++) {
            promise = promise.catch( function(){
                    if (moduleTwin){
                        return moduleTwin;
                    }
                    else {
                        throw new Error("Module Client not initiated..");
                    }
                })
                .catch(function rejectDelay(reason) {
                    return new Promise(function(resolve, reject) {
                        setTimeout(reject.bind(null, reason), timeOut * i);
                });
            });
        }
        return promise;
    }

    // Get module method response using promise, and retry, and slow backoff
    function getResponse(node){
        var retries = 20;
        var timeOut = 1000;
        var m = {};
        node.log("Module Method node method: " + node.method);
        // Retrieve client using progressive promise to wait for module client to be opened
        var promise = Promise.reject();
        for(var i=1; i <= retries; i++) {
            promise = promise.catch( function(){
                    var methodResponse = methodResponses.find(function(m){return m.method === node.method});
                    if (methodResponse){
                        // get the response and clean the array
                        var response = methodResponse;
                        node.log("Module Method response object found: " + JSON.stringify(response));
                        methodResponses.splice(methodResponses.findIndex(function(m){return m.method === node.method}),1);
                        return response;
                    }
                    else {
                        throw new Error("Module Method Response not initiated..");
                    }
                })
                .catch(function rejectDelay(reason) {
                    retries++;
                    return new Promise(function(resolve, reject) {
                        setTimeout(reject.bind(null, reason), timeOut * ((retries % 10) + 1));
                    });
                });
        }
        return promise;
    }

    // This function just sends the incoming message to the node output adding the topic "input" and the input name.
    var outputMessage = function(client, node, inputName, msg) {

        client.complete(msg, function (err) {
            if (err) {
                node.error('Failed sending message to node output:' + err);
                setStatus(node, statusEnum.error);
            }
        });

        if (inputName === node.input){
            setStatus(node, statusEnum.received);
            var message = JSON.parse(msg.getBytes().toString('utf8'));
            if (message) {
                node.log('Processed input message:' + inputName)
                // send to node output
                node.send({payload: message, topic: "input", input: inputName});
            }
            setStatus(node, statusEnum.connected);
        }   
    }

    var setStatus = function (node, status) {
        node.status({ fill: status.color, shape: "dot", text: status.text });
    }

    var sendMessageToEdgeHub = function (client, node, message, output) {

        // Send the message to IoT Edge
        if (!output)
        {
            output = "output";
        }
        node.log('Sending Message to Azure IoT Edge: ' + output + '\n   Payload: ' + JSON.stringify(message));
        var msg = new Message(JSON.stringify(message));
        client.sendOutputEvent(output, msg, function (err, res) {
            if (err) {
                node.error('Error while trying to send message:' + err.toString());
                setStatus(node, statusEnum.error);
            } else {
                node.log('Message sent.');
                setStatus(node, statusEnum.connected);
            }
        });
    }

    // Registration of the client into Node-RED
    RED.nodes.registerType("edgeclient", IoTEdgeClient, {
        defaults: {
            module: {value: ""}
        }
    });
    
    // Registration of the node into Node-RED
    RED.nodes.registerType("moduletwin", ModuleTwin, {
        defaults: {
            name: { value: "Module Twin" }
        }
    });

    // Registration of the node into Node-RED
    RED.nodes.registerType("moduleinput", ModuleInput, {
        defaults: {
            input: { value: "input1"}
        }
    });

    // Registration of the node into Node-RED
    RED.nodes.registerType("moduleoutput", ModuleOutput, {
        defaults: {
             output: { value: "output1"}
        }
    });

    // Registration of the node into Node-RED
    RED.nodes.registerType("modulemethod", ModuleMethod, {
        defaults: {
            method: { value: "method1"},
            response: { value: "{}"}
        }
    });

}

And going more in detail, we have 2 relevant things:

    function ModuleInput(config) {
        // Store node for further use
        var node = this;
        node.input = config.input;

        // Create the Node-RED node
        RED.nodes.createNode(this, config);
        setStatus(node, statusEnum.disconnected);
        getClient().then(function(client){
            setStatus(node, statusEnum.connected);
            // Act on module input messages
            node.log("Module Input created: " + node.input);
            client.on('inputMessage', function (inputName, msg) {
                outputMessage(client, node, inputName, msg);
            });
        })
        .catch(function(err){
            node.log("Module Input can't be loaded: " + err);
        });
       
        node.on('close', function(done) {
            setStatus(node, statusEnum.disconnected);
            done();
        });
    }

So, every time an inputMessage event occurs, the code executed is:

    var outputMessage = function(client, node, inputName, msg) {

        client.complete(msg, function (err) {
            if (err) {
                node.error('Failed sending message to node output:' + err);
                setStatus(node, statusEnum.error);
            }
        });

        if (inputName === node.input){
            setStatus(node, statusEnum.received);
            var message = JSON.parse(msg.getBytes().toString('utf8'));
            if (message) {
                node.log('Processed input message:' + inputName)
                // send to node output
                node.send({payload: message, topic: "input", input: inputName});
            }
            setStatus(node, statusEnum.connected);
        }   
    }

What's doing there is pretty much a JSON.parse of the received message without any change. Obviously Node-RED will internally add the standard stuff (payload, _msgid, etc.), but as long as the properties are included in the message, they should be forwarded.

And I guess that's the big question here, are the properties part of the message itself? If they're not, that would explain why I can't see then in Node-RED while I can see them with Azure IoT Explorer in case I just let the messages flow to the IoT Hub.

UPDATE The answer provided by @Sampath works fine, at least I can see the deviceid and the moduleid now:

{
    "payload": {
        "machine": {
            "temperature": 107.36094975136817,
            "pressure": 10.838589212181184
        },
        "ambient": {
            "temperature": 21.03510694838928,
            "humidity": 25
        },
        "timeCreated": "2024-05-15T13:58:58.0540963Z"
    },
    "properties": {
        "propertyList": [
            {
                "key": "sequenceNumber",
                "value": "464"
            },
            {
                "key": "batchId",
                "value": "56554fbc-4a74-4dc3-81fa-834d46bc232b"
            },
            {
                "key": "$.cdid",
                "value": "my-edge-device-1"
            },
            {
                "key": "$.cmid",
                "value": "SimulatedTemperatureSensor"
            }
        ]
    },
    "topic": "input",
    "input": "input1",
    "_msgid": "36c79982b8df928e"
}

Regards


Solution

  • To forward messages between modules in Azure IoT Edge while including properties, you need to ensure that the properties are preserved and forwarded along with the message payload. Looking at your current setup and code, it seems that the properties are not explicitly included in the message payload being forwarded to Node-RED.

    enter image description here

    To include properties in the forwarded messages, you can modify the outputMessage function in the azureiotedge.js file. You need to extract the properties from the incoming message and include them along with the payload when forwarding the message to Node-RED.

    Modify the outputMessage function to include properties:

    var outputMessage = function(client, node, inputName, msg) {
    
        client.complete(msg, function (err) {
            if (err) {
                node.error('Failed sending message to node output:' + err);
                setStatus(node, statusEnum.error);
            }
        });
    
        if (inputName === node.input){
            setStatus(node, statusEnum.received);
            var message = JSON.parse(msg.getBytes().toString('utf8'));
            if (message) {
                // Extract properties from the incoming message
                var properties = msg.properties;
                // Include properties along with the payload
                var forwardedMessage = {
                    payload: message,
                    properties: properties, // Include properties
                    topic: "input",
                    input: inputName
                };
                node.log('Processed input message:' + inputName)
                // send to node output
                node.send(forwardedMessage);
            }
            setStatus(node, statusEnum.connected);
        }   
    }
    

    Output:

    enter image description here