I have deployed an IoT Edge device with 2 modules, one instance of Simulated Temperature Sensor and another of Node-RED (both from the Marketplace). I can forward the messages generated from the first one to one of the inputs of Node-RED but the received message doesn't include any properties, nor system's neither user-defined.
The list of routes configured for this IoT Edge device includes 2:
Although the message is received by Node-RED, I don't have access to the properties (iothub-connection-device-id, etc.). This is the message received:
{
"payload": {
"machine": {
"temperature": 78.45809454307097,
"pressure": 7.545858871995427
},
"ambient": {
"temperature": 21.225943549765738,
"humidity": 26
},
"timeCreated": "2024-05-10T12:40:41.4387644Z"
},
"topic": "input",
"input": "input1",
"_msgid": "ac58dc85f99232e7"
}
Following the recommendations I found in another thread of this site, I added "WITH $includeProperties" at the end of the second route, but still the same. I've also tried to add "WITH $systemProperties" to see at least the system's, just for testing, and yet nothing.
Is there any way to forward them? I might need them in some scenarios.
Just in case it's relevant, the Node-RED version available in the Marketplace uses node-red-contrib-azure-iot-edge-module (1.0.4). I know there's an alternative and slightly more updated version available (node-red-contrib-azure-iot-edge-kpm) but it looks like it won't be maintained in the future.
So I've entered to see the source code of this module and the main file azureiotedge.js looks like this:
module.exports = function (RED) {
'use strict'
var Transport = require('azure-iot-device-mqtt').Mqtt;
var Client = require('azure-iot-device').ModuleClient;
var Message = require('azure-iot-device').Message;
var statusEnum = {
disconnected: { color: "red", text: "Disconnected" },
connected: { color: "green", text: "Connected" },
sent: { color: "blue", text: "Sending message" },
received: { color: "yellow", text: "Receiving message" },
reported: { color: "blue", text: "Sending reported properties" },
desired: { color: "yellow", text: "Receiving desired properties" },
method: { color: "yellow", text: "Receiving direct method" },
response: { color: "blue", text: "Sending method response" },
error: { color: "grey", text: "Error" }
};
var edgeClient;
var moduleTwin;
var methodResponses = [];
// Function to create the IoT Edge Client
function IoTEdgeClient(config) {
// Store node for further use
var node = this;
node.connected = false;
// Create the Node-RED node
RED.nodes.createNode(this, config);
// Create the IoT Edge client
Client.fromEnvironment(Transport, function (err, client) {
if (err) {
node.log('Module Client creation error:' + err);
}
else {
client.on('error', function (err) {
node.log('Module Client error:' + err);
});
node.log('Module Client created.');
// connect to the Edge instance
client.open(function (err) {
if (err) {
node.log('Module Client open error:' + err);
throw err;
} else {
node.log('Module Client connected.');
edgeClient = client;
client.getTwin(function(err, twin) {
if (err) {
node.error('Could not get the module twin: ' + err);
throw err;
} else {
node.log('Module twin created.');
node.log('Twin contents:');
node.log(JSON.stringify(twin.properties));
node.on('close', function() {
node.log('Azure IoT Edge Module Client closed.');
edgeClient = null;
moduleTwin = null;
twin.removeAllListeners();
client.removeAllListeners();
client.close();
});
moduleTwin = twin;
}
});
}
});
}
});
}
// Function to create the Module Twin
function ModuleTwin(config) {
// Store node for further use
var node = this;
// Create the Node-RED node
RED.nodes.createNode(this, config);
setStatus(node, statusEnum.disconnected);
// Get the twin
getTwin().then(function(twin){
setStatus(node, statusEnum.connected);
// Register for changes
twin.on('properties.desired', function(delta) {
setStatus(node, statusEnum.desired);
node.log('New desired properties received:');
node.log(JSON.stringify(delta));
node.send({payload: delta, topic: "desired"})
setStatus(node, statusEnum.connected);
});
node.on('input', function (msg) {
setStatus(node, statusEnum.reported);
var messageJSON = null;
if (typeof (msg.payload) != "string") {
messageJSON = msg.payload;
} else {
//Converting string to JSON Object
messageJSON = JSON.parse(msg.payload);
}
twin.properties.reported.update(messageJSON, function(err) {
if (err) throw err;
node.log('Twin state reported.');
setStatus(node, statusEnum.connected);
});
});
})
.catch(function(err){
node.log('Module Twin error:' + err);
});
node.on('close', function(done) {
setStatus(node, statusEnum.disconnected);
done();
});
}
// Module input to receive input from edgeHub
function ModuleInput(config) {
// Store node for further use
var node = this;
node.input = config.input;
// Create the Node-RED node
RED.nodes.createNode(this, config);
setStatus(node, statusEnum.disconnected);
getClient().then(function(client){
setStatus(node, statusEnum.connected);
// Act on module input messages
node.log("Module Input created: " + node.input);
client.on('inputMessage', function (inputName, msg) {
outputMessage(client, node, inputName, msg);
});
})
.catch(function(err){
node.log("Module Input can't be loaded: " + err);
});
node.on('close', function(done) {
setStatus(node, statusEnum.disconnected);
done();
});
}
// Module output to send output to edgeHub
function ModuleOutput(config) {
// Store node for further use
var node = this;
node.output = config.output;
// Create the Node-RED node
RED.nodes.createNode(this, config);
setStatus(node, statusEnum.disconnected);
getClient().then(function(client){
setStatus(node, statusEnum.connected);
// React on input from node-red
node.log("Module Output created: " + node.output);
node.on('input', function (msg) {
setStatus(node, statusEnum.sent);
var messageJSON = null;
if (typeof (msg.payload) != "string") {
messageJSON = msg.payload;
} else {
//Converting string to JSON Object
messageJSON = JSON.parse(msg.payload);
}
var messageOutput = node.output;
sendMessageToEdgeHub(client, node, messageJSON, messageOutput);
});
})
.catch(function(err){
node.error("Module Output can't be loaded: " + err);
});
node.on('close', function(done) {
setStatus(node, statusEnum.disconnected);
done();
});
}
// Module method to receive methods from IoT Hub
function ModuleMethod(config) {
// Store node for further use
var node = this;
node.method = config.method;
// Create the Node-RED node
RED.nodes.createNode(this, config);
setStatus(node, statusEnum.disconnected);
getClient().then(function(client){
setStatus(node, statusEnum.connected);
var mthd = node.method;
node.log('Direct Method created: ' + mthd);
client.onMethod(mthd, function(request, response) {
// Set status
setStatus(node, statusEnum.method);
node.log('Direct Method called: ' + request.methodName);
if(request.payload) {
node.log('Method Payload:' + JSON.stringify(request.payload));
node.send({payload: request.payload,topic: "method", method: request.methodName});
}
else {
node.send({payload: null,topic: "method", method: request.methodName});
}
getResponse(node).then(function(rspns){
var responseBody;
if (typeof (rspns.response) != "string") {
// Turn message object into string
responseBody = JSON.stringify(rspns.response);
} else {
responseBody = rspns.response;
}
response.send(rspns.status, responseBody, function(err) {
if (err) {
node.log('Failed sending method response: ' + err);
} else {
node.log('Successfully sent method response.');
}
});
})
.catch(function(err){
node.error("Failed sending method response: response not received.");
});
// reset response
node.response = null;
setStatus(node, statusEnum.connected);
});
// Set method response on input
node.on('input', function (msg) {
var method = node.method;
methodResponses.push(
{method: method, response: msg.payload, status: msg.status}
);
node.log("Module Method response set through node input: " + JSON.stringify(methodResponses.find(function(m){return m.method === method})));
});
})
.catch(function(err){
node.error("Module Method can't be loaded: " + err);
});
node.on('close', function(done) {
setStatus(node, statusEnum.disconnected);
done();
});
}
// Get module client using promise, and retry, and slow backoff
function getClient(){
var retries = 20;
var timeOut = 1000;
// Retrieve client using progressive promise to wait for module client to be opened
var promise = Promise.reject();
for(var i=1; i <= retries; i++) {
promise = promise.catch( function(){
if (edgeClient){
return edgeClient;
}
else {
throw new Error("Module Client not initiated..");
}
})
.catch(function rejectDelay(reason) {
retries++;
return new Promise(function(resolve, reject) {
setTimeout(reject.bind(null, reason), timeOut * ((retries % 10) + 1));
});
});
}
return promise;
}
// Get module twin using promise, and retry, and slow backoff
function getTwin(){
var retries = 10;
var timeOut = 1000;
// Retrieve twin using progressive promise to wait for module twin to be opened
var promise = Promise.reject();
for(var i=1; i <= retries; i++) {
promise = promise.catch( function(){
if (moduleTwin){
return moduleTwin;
}
else {
throw new Error("Module Client not initiated..");
}
})
.catch(function rejectDelay(reason) {
return new Promise(function(resolve, reject) {
setTimeout(reject.bind(null, reason), timeOut * i);
});
});
}
return promise;
}
// Get module method response using promise, and retry, and slow backoff
function getResponse(node){
var retries = 20;
var timeOut = 1000;
var m = {};
node.log("Module Method node method: " + node.method);
// Retrieve client using progressive promise to wait for module client to be opened
var promise = Promise.reject();
for(var i=1; i <= retries; i++) {
promise = promise.catch( function(){
var methodResponse = methodResponses.find(function(m){return m.method === node.method});
if (methodResponse){
// get the response and clean the array
var response = methodResponse;
node.log("Module Method response object found: " + JSON.stringify(response));
methodResponses.splice(methodResponses.findIndex(function(m){return m.method === node.method}),1);
return response;
}
else {
throw new Error("Module Method Response not initiated..");
}
})
.catch(function rejectDelay(reason) {
retries++;
return new Promise(function(resolve, reject) {
setTimeout(reject.bind(null, reason), timeOut * ((retries % 10) + 1));
});
});
}
return promise;
}
// This function just sends the incoming message to the node output adding the topic "input" and the input name.
var outputMessage = function(client, node, inputName, msg) {
client.complete(msg, function (err) {
if (err) {
node.error('Failed sending message to node output:' + err);
setStatus(node, statusEnum.error);
}
});
if (inputName === node.input){
setStatus(node, statusEnum.received);
var message = JSON.parse(msg.getBytes().toString('utf8'));
if (message) {
node.log('Processed input message:' + inputName)
// send to node output
node.send({payload: message, topic: "input", input: inputName});
}
setStatus(node, statusEnum.connected);
}
}
var setStatus = function (node, status) {
node.status({ fill: status.color, shape: "dot", text: status.text });
}
var sendMessageToEdgeHub = function (client, node, message, output) {
// Send the message to IoT Edge
if (!output)
{
output = "output";
}
node.log('Sending Message to Azure IoT Edge: ' + output + '\n Payload: ' + JSON.stringify(message));
var msg = new Message(JSON.stringify(message));
client.sendOutputEvent(output, msg, function (err, res) {
if (err) {
node.error('Error while trying to send message:' + err.toString());
setStatus(node, statusEnum.error);
} else {
node.log('Message sent.');
setStatus(node, statusEnum.connected);
}
});
}
// Registration of the client into Node-RED
RED.nodes.registerType("edgeclient", IoTEdgeClient, {
defaults: {
module: {value: ""}
}
});
// Registration of the node into Node-RED
RED.nodes.registerType("moduletwin", ModuleTwin, {
defaults: {
name: { value: "Module Twin" }
}
});
// Registration of the node into Node-RED
RED.nodes.registerType("moduleinput", ModuleInput, {
defaults: {
input: { value: "input1"}
}
});
// Registration of the node into Node-RED
RED.nodes.registerType("moduleoutput", ModuleOutput, {
defaults: {
output: { value: "output1"}
}
});
// Registration of the node into Node-RED
RED.nodes.registerType("modulemethod", ModuleMethod, {
defaults: {
method: { value: "method1"},
response: { value: "{}"}
}
});
}
And going more in detail, we have 2 relevant things:
function ModuleInput(config) {
// Store node for further use
var node = this;
node.input = config.input;
// Create the Node-RED node
RED.nodes.createNode(this, config);
setStatus(node, statusEnum.disconnected);
getClient().then(function(client){
setStatus(node, statusEnum.connected);
// Act on module input messages
node.log("Module Input created: " + node.input);
client.on('inputMessage', function (inputName, msg) {
outputMessage(client, node, inputName, msg);
});
})
.catch(function(err){
node.log("Module Input can't be loaded: " + err);
});
node.on('close', function(done) {
setStatus(node, statusEnum.disconnected);
done();
});
}
So, every time an inputMessage event occurs, the code executed is:
var outputMessage = function(client, node, inputName, msg) {
client.complete(msg, function (err) {
if (err) {
node.error('Failed sending message to node output:' + err);
setStatus(node, statusEnum.error);
}
});
if (inputName === node.input){
setStatus(node, statusEnum.received);
var message = JSON.parse(msg.getBytes().toString('utf8'));
if (message) {
node.log('Processed input message:' + inputName)
// send to node output
node.send({payload: message, topic: "input", input: inputName});
}
setStatus(node, statusEnum.connected);
}
}
What's doing there is pretty much a JSON.parse of the received message without any change. Obviously Node-RED will internally add the standard stuff (payload, _msgid, etc.), but as long as the properties are included in the message, they should be forwarded.
And I guess that's the big question here, are the properties part of the message itself? If they're not, that would explain why I can't see then in Node-RED while I can see them with Azure IoT Explorer in case I just let the messages flow to the IoT Hub.
UPDATE The answer provided by @Sampath works fine, at least I can see the deviceid and the moduleid now:
{
"payload": {
"machine": {
"temperature": 107.36094975136817,
"pressure": 10.838589212181184
},
"ambient": {
"temperature": 21.03510694838928,
"humidity": 25
},
"timeCreated": "2024-05-15T13:58:58.0540963Z"
},
"properties": {
"propertyList": [
{
"key": "sequenceNumber",
"value": "464"
},
{
"key": "batchId",
"value": "56554fbc-4a74-4dc3-81fa-834d46bc232b"
},
{
"key": "$.cdid",
"value": "my-edge-device-1"
},
{
"key": "$.cmid",
"value": "SimulatedTemperatureSensor"
}
]
},
"topic": "input",
"input": "input1",
"_msgid": "36c79982b8df928e"
}
Regards
To forward messages between modules in Azure IoT Edge while including properties, you need to ensure that the properties are preserved and forwarded along with the message payload. Looking at your current setup and code, it seems that the properties are not explicitly included in the message payload being forwarded to Node-RED.
To include properties in the forwarded messages, you can modify the outputMessage
function in the azureiotedge.js
file. You need to extract the properties from the incoming message and include them along with the payload when forwarding the message to Node-RED.
Modify the outputMessage
function to include properties:
var outputMessage = function(client, node, inputName, msg) {
client.complete(msg, function (err) {
if (err) {
node.error('Failed sending message to node output:' + err);
setStatus(node, statusEnum.error);
}
});
if (inputName === node.input){
setStatus(node, statusEnum.received);
var message = JSON.parse(msg.getBytes().toString('utf8'));
if (message) {
// Extract properties from the incoming message
var properties = msg.properties;
// Include properties along with the payload
var forwardedMessage = {
payload: message,
properties: properties, // Include properties
topic: "input",
input: inputName
};
node.log('Processed input message:' + inputName)
// send to node output
node.send(forwardedMessage);
}
setStatus(node, statusEnum.connected);
}
}
Output: