influxdbtelegraftelegraf-inputs-plugin

Telegraf cannot correctly read JSON data in mqtt protocol


When I use telegraf inputs.mqtt_consumer cannot parse the data correctly when subscribing to mqtt

  1. This is the raw data I read from the client using emqx:
{
  "air": "air",
  "humidity": 35,
  "region": "shanghai",
  "temperature": 34,
  "time": 1655371268
}
  1. This is the configuration of telegraf
[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  topics = [
        "test/#"
  ]
  data_format = "json"
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "air/humidity/region/temperature/time"
    measurement = "measurement/_/_/_/_"
    tags = "_/_/region/_/_"
    fields = "_/_/_/temperature/_"
  1. But the data I got in the incluxdb is like this
> select * from mqtt_consumer
name: mqtt_consumer
time                           humidity temperature topic
----                           -------- ----------- -----
2022-06-16T09:28:45.663600586Z  41       41          test
2022-06-16T09:28:45.764128509Z  38       48          test
2022-06-16T09:28:46.664330569Z  43       47          test
2022-06-16T09:28:46.764848563Z. 41       34          test
2022-06-16T09:28:47.665094746Z. 49       50          test
2022-06-16T09:28:47.765611758Z. 5       50          test
2022-06-16T09:28:48.66629661Z   42       32          test
  1. I expect the data to be like this
name: mqtt_consumer
time                           region   temperature topic
----                           ----     ----------- -----
2022-06-16T09:28:45.663600586Z shanghai 41          test
2022-06-16T09:28:45.764128509Z shanghai 48          test
2022-06-16T09:28:46.664330569Z shanghai 47          test
2022-06-16T09:28:46.764848563Z shanghai 34          test
2022-06-16T09:28:47.665094746Z shanghai 50          test
2022-06-16T09:28:47.765611758Z shanghai 50          test
2022-06-16T09:28:48.66629661Z  shanghai 32          test
  1. The configuration of telegraf cannot be changed to this
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "air/humidity/region/temperature/time"
    measurement = "measurement/_/_/_/_"
    tags = "_/_/tag/_/_"
    fields = "_/_/_/field/_"

Solution

  • [[inputs.mqtt_consumer]]
    data_format = "json"
    tag_keys = [
        "region"
    ]
    

    see doc https://docs.influxdata.com/telegraf/v1.22/data_formats/input/json/