pythonmqttiotinfluxdbdataflow

Output data from Influxdb to MQTT broker/server


This is my first time working with MQTT and i want to get data from influxdb to Snowflake, but before doing that i must:

What i have tried was saving data from mqtt to InfluxDB, using the following script:

"""A MQTT to InfluxDB Bridge
This script receives MQTT data and saves those to InfluxDB.
"""

import re
from typing import NamedTuple

import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient

INFLUXDB_ADDRESS = '10.10.10.247'
INFLUXDB_USER = 'iotuser'
INFLUXDB_PASSWORD = 'iotpassword'
INFLUXDB_DATABASE = 'homeiot_db'

MQTT_ADDRESS = '10.10.10.247'
MQTT_USER = 'iotuser'
MQTT_PASSWORD = 'iotpassword'
MQTT_TOPIC = 'home/+/+'  # [room]/[temperature|humidity|light|status]
MQTT_REGEX = 'home/([^/]+)/([^/]+)'
MQTT_CLIENT_ID = 'MQTTInfluxDBBridge'

influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, INFLUXDB_USER, INFLUXDB_PASSWORD, None)


class SensorData(NamedTuple):
    location: str
    measurement: str
    value: float


def on_connect(client, userdata, flags, rc):
    """ The callback for when the client receives a CONNACK response from the server."""
    print('Connected with result code ' + str(rc))
    client.subscribe(MQTT_TOPIC)


def on_message(client, userdata, msg):
    """The callback for when a PUBLISH message is received from the server."""
    print(msg.topic + ' ' + str(msg.payload))
    sensor_data = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
    if sensor_data is not None:
        _send_sensor_data_to_influxdb(sensor_data)


def _parse_mqtt_message(topic, payload):
    match = re.match(MQTT_REGEX, topic)
    if match:
        location = match.group(1)
        measurement = match.group(2)
        if measurement == 'status':
            return None
        return SensorData(location, measurement, float(payload))
    else:
        return None


def _send_sensor_data_to_influxdb(sensor_data):
    json_body = [
        {
            'measurement': sensor_data.measurement,
            'tags': {
                'location': sensor_data.location
            },
            'fields': {
                'value': sensor_data.value
            }
        }
    ]
    print (json_body)
    influxdb_client.write_points(json_body)


def _init_influxdb_database():
    databases = influxdb_client.get_list_database()
    if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0:
        influxdb_client.create_database(INFLUXDB_DATABASE)
    influxdb_client.switch_database(INFLUXDB_DATABASE)


def main():
    _init_influxdb_database()

    mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
    mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
    mqtt_client.on_connect = on_connect
    mqtt_client.on_message = on_message

    mqtt_client.connect(MQTT_ADDRESS, 1883)
    mqtt_client.loop_forever()


if __name__ == '__main__':
    print('MQTT to InfluxDB bridge')
    main()

If anyone had done this before or have an idea how to tackle this, i would reaally appreciate any help.


Solution

  • If you just need to pipe the data from InfluxDB to Snowflake, you could try to make best of existing API below:

    import "sql"
      
    data
        |> sql.to(
            driverName: "snowflake",
            dataSourceName: "user:password@account/db/exampleschema?warehouse=wh",
            table: "example_table",
        )
    

    See more details here.

    Well, if you insisting using MQTT, there is an official and easy way to complete this task via Flux (you could configure these on the InfluxDB 2.0 UI) instead of Python:

    import "mqtt"
    from(bucket: "telegraf")
        |> range(start: -task.every)
        |> filter(fn: (r) =>
            (r._measurement == "cpu"))
        |> filter(fn: (r) =>
            (r._field == "usage_system"))
        |> filter(fn: (r) =>
            (r.cpu == "cpu-total"))
        |> last()
        |> mqtt.to(
            broker: "tcp://davidgs.com:8883",
            topic: "cpu",
            clientid: "cpu-flux",
            valueColumns: ["_value"],
            tagColumns: ["cpu", "host"],
        )