This is my first time working with MQTT and i want to get data from influxdb to Snowflake, but before doing that i must:
What i have tried was saving data from mqtt to InfluxDB, using the following script:
"""A MQTT to InfluxDB Bridge
This script receives MQTT data and saves those to InfluxDB.
"""
import re
from typing import NamedTuple
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
INFLUXDB_ADDRESS = '10.10.10.247'
INFLUXDB_USER = 'iotuser'
INFLUXDB_PASSWORD = 'iotpassword'
INFLUXDB_DATABASE = 'homeiot_db'
MQTT_ADDRESS = '10.10.10.247'
MQTT_USER = 'iotuser'
MQTT_PASSWORD = 'iotpassword'
MQTT_TOPIC = 'home/+/+' # [room]/[temperature|humidity|light|status]
MQTT_REGEX = 'home/([^/]+)/([^/]+)'
MQTT_CLIENT_ID = 'MQTTInfluxDBBridge'
influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, INFLUXDB_USER, INFLUXDB_PASSWORD, None)
class SensorData(NamedTuple):
location: str
measurement: str
value: float
def on_connect(client, userdata, flags, rc):
""" The callback for when the client receives a CONNACK response from the server."""
print('Connected with result code ' + str(rc))
client.subscribe(MQTT_TOPIC)
def on_message(client, userdata, msg):
"""The callback for when a PUBLISH message is received from the server."""
print(msg.topic + ' ' + str(msg.payload))
sensor_data = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
if sensor_data is not None:
_send_sensor_data_to_influxdb(sensor_data)
def _parse_mqtt_message(topic, payload):
match = re.match(MQTT_REGEX, topic)
if match:
location = match.group(1)
measurement = match.group(2)
if measurement == 'status':
return None
return SensorData(location, measurement, float(payload))
else:
return None
def _send_sensor_data_to_influxdb(sensor_data):
json_body = [
{
'measurement': sensor_data.measurement,
'tags': {
'location': sensor_data.location
},
'fields': {
'value': sensor_data.value
}
}
]
print (json_body)
influxdb_client.write_points(json_body)
def _init_influxdb_database():
databases = influxdb_client.get_list_database()
if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0:
influxdb_client.create_database(INFLUXDB_DATABASE)
influxdb_client.switch_database(INFLUXDB_DATABASE)
def main():
_init_influxdb_database()
mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_ADDRESS, 1883)
mqtt_client.loop_forever()
if __name__ == '__main__':
print('MQTT to InfluxDB bridge')
main()
If anyone had done this before or have an idea how to tackle this, i would reaally appreciate any help.
If you just need to pipe the data from InfluxDB to Snowflake, you could try to make best of existing API below:
import "sql"
data
|> sql.to(
driverName: "snowflake",
dataSourceName: "user:password@account/db/exampleschema?warehouse=wh",
table: "example_table",
)
See more details here.
Well, if you insisting using MQTT, there is an official and easy way to complete this task via Flux (you could configure these on the InfluxDB 2.0 UI) instead of Python:
import "mqtt"
from(bucket: "telegraf")
|> range(start: -task.every)
|> filter(fn: (r) =>
(r._measurement == "cpu"))
|> filter(fn: (r) =>
(r._field == "usage_system"))
|> filter(fn: (r) =>
(r.cpu == "cpu-total"))
|> last()
|> mqtt.to(
broker: "tcp://davidgs.com:8883",
topic: "cpu",
clientid: "cpu-flux",
valueColumns: ["_value"],
tagColumns: ["cpu", "host"],
)