I have an existing (working) setup that uses InfluxDB's Native Subscriptions to transfer data from the AWS IoT-Core (as the MQTT broker) to our InfluxDB-Cloud instance. Native Subscriptions are being removed as a feature in InfluxDB - and while I can still use it at the moment there's no guarantee in the long run. The setup used port 8883
Solution: use InfluxDB's telegraf
to achieve the same functionality. The data is sent in 'influx' line protocol
already, so no additional parsing is necessary. My setup uses the telegraf:1.27.1
docker image to deploy telegraf in a container. I'm volume-mapping the certificates and telegraf-configuration into the container.
Here's the telegraf.conf
file:
# Global settings
[agent]
interval = "5s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
precision = "1ms"
debug = true
quiet = false
[[inputs.mqtt_consumer]]
servers = ["ssl://<AWS-ID>.iot.<LOCATION>.amazonaws.com:8883"]
qos = 0
connection_timeout = "30s"
topics = ["374174.016/data"]
client_id = "27-telegraf-e9cabc28-6a2b-4423-be74-31fb3dd231cd"
data_format = "influx"
# SSL configuration
tls_ca = "/etc/telegraf/ca.pem"
tls_cert = "/etc/telegraf/cert.pem"
tls_key = "/etc/telegraf/key.pem"
# insecure_skip_verify = false
[[outputs.influxdb_v2]]
urls = ["https://<LOCATION>.aws.cloud2.influxdata.com/"]
token = "[REDACTED]"
organization = "[REDACTED]"
bucket = "TELEGRAF_DEV"
Here are the startup (and failure) logs for telegraf:
2023-07-06T12:54:11Z I! Loading config: /etc/telegraf/telegraf.conf
2023-07-06T12:54:11Z I! Starting Telegraf 1.27.1
2023-07-06T12:54:11Z I! Available plugins: 237 inputs, 9 aggregators, 28 processors, 23 parsers, 59 outputs, 4 secret-stores
2023-07-06T12:54:11Z I! Loaded inputs: mqtt_consumer
2023-07-06T12:54:11Z I! Loaded aggregators:
2023-07-06T12:54:11Z I! Loaded processors:
2023-07-06T12:54:11Z I! Loaded secretstores:
2023-07-06T12:54:11Z I! Loaded outputs: influxdb_v2
2023-07-06T12:54:11Z I! Tags enabled: host=d847c648021c
2023-07-06T12:54:11Z I! [agent] Config: Interval:5s, Quiet:false, Hostname:"d847c648021c", Flush Interval:10s
2023-07-06T12:54:11Z D! [agent] Initializing plugins
2023-07-06T12:54:11Z D! [agent] Connecting outputs
2023-07-06T12:54:11Z D! [agent] Attempting connection to [outputs.influxdb_v2]
2023-07-06T12:54:11Z D! [agent] Successfully connected to outputs.influxdb_v2
2023-07-06T12:54:11Z D! [agent] Starting service inputs
2023-07-06T12:55:11Z E! [telegraf] Error running agent: starting input inputs.mqtt_consumer: network Error : read tcp 172.17.0.2:51268->52.29.126.248:8443: i/o timeout
I've also written a Python-script to just read a couple of messages off the IoT-core from the same topic using the same certificate files (Python 3.9.6
, awscrt==0.14.7
, awsiot==0.1.3
)
import json
import os.path
import time
import uuid
from dataclasses import dataclass
from typing import Callable, List, Any, Dict
from awscrt import io
from awscrt.mqtt import QoS
from awsiot import mqtt_connection_builder # type: ignore
class AWSMQTTClient:
def __init__(
self,
host_name: str,
client_id: str,
cert_dir: str = "."
):
# write to files (compatibility with AWS library)
self.cert_file = f"{cert_dir}/cert.pem"
self.priv_file = f"{cert_dir}/key.pem"
self.root_file = f"{cert_dir}/ca.pem"
# check if files exist
if not os.path.exists(self.cert_file):
raise FileNotFoundError(f"cert file not found: {self.cert_file}")
if not os.path.exists(self.priv_file):
raise FileNotFoundError(f"priv file not found: {self.priv_file}")
if not os.path.exists(self.root_file):
raise FileNotFoundError(f"root file not found: {self.root_file}")
# set up connection object
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
self.mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=host_name,
cert_filepath=self.cert_file,
pri_key_filepath=self.priv_file,
client_bootstrap=client_bootstrap,
ca_filepath=self.root_file,
client_id=f"{client_id}_{uuid.uuid4()}",
clean_session=False,
keep_alive_secs=6,
)
def subscribe_to_topic(
self,
topic_name: str,
duration: float,
on_receive: Callable,
) -> None:
_connect_future = self.mqtt_connection.connect()
sub = self.mqtt_connection.subscribe(topic=topic_name, qos=QoS(1), callback=on_receive)
time.sleep(duration)
_connect_future.result()
_disconnect_future = self.mqtt_connection.disconnect()
_disconnect_future.result()
if sub[0].running():
raise ConnectionError("connection not closed")
@dataclass
class TopicInfo:
topic: str
message: str
@dataclass
class TopicResponse:
topic_infos: List[TopicInfo]
def add_topic(self, topic_info: TopicInfo):
self.topic_infos.append(topic_info)
def on_topic_received(
self,
topic: str,
payload: bytes,
dup: Any, qos: Any, retain: Any,
**kwargs: Dict[Any, Any],
) -> None:
try:
message_json = json.loads(payload)
if "message" in message_json.keys():
message = message_json["message"]
else:
message = f"{message_json}"
except json.decoder.JSONDecodeError:
message = f"{payload.decode('ascii')}"
topic_info = TopicInfo(
topic=topic,
message=message,
)
if topic_info not in self.topic_infos:
self.add_topic(topic_info=topic_info)
if __name__ == '__main__':
# settings
_host_name = "<AWS ID>.iot.<LOCATION>.amazonaws.com"
_client_id = f"{uuid.uuid4()}"
_topic = "374174.016/data"
aws_mqtt_client = AWSMQTTClient(
host_name=_host_name,
client_id=_client_id,
)
response = TopicResponse(topic_infos=[])
aws_mqtt_client.subscribe_to_topic(
topic_name=_topic,
on_receive=response.on_topic_received,
duration=7.0,
)
print("")
print("----RECEIVED:----")
print(response.topic_infos)
print("-----------------")
print("")
assert response.topic_infos.__len__() != 0
The Python script works as expected - it reads (and logs) a couple of messages. From what I can tell it uses Port 443 though instead of 8883.
Here's the Makefile
I'm using to deploy both telegraf
and the Python script listen_to_mqtt.py
via docker:
###########################
# Telegraf test setup #
###########################
TELEGRAF_IMAGE=telegraf-test-image
TELEGRAF_CONTAINER=telegraf-test-container
TELEGRAF_LOCAL_DIR := $(shell pwd)
TELEGRAF_INTERNAL_CONFIG_DIR = /etc/telegraf
deploy_telegraf:
docker run \
--detach \
--name $(TELEGRAF_CONTAINER) \
-v $(TELEGRAF_LOCAL_DIR)/ca.pem:$(TELEGRAF_INTERNAL_CONFIG_DIR)/ca.pem \
-v $(TELEGRAF_LOCAL_DIR)/cert.pem:$(TELEGRAF_INTERNAL_CONFIG_DIR)/cert.pem \
-v $(TELEGRAF_LOCAL_DIR)/key.pem:$(TELEGRAF_INTERNAL_CONFIG_DIR)/key.pem \
-v $(TELEGRAF_LOCAL_DIR)/telegraf.conf:$(TELEGRAF_INTERNAL_CONFIG_DIR)/telegraf.conf:ro \
telegraf:1.27.1
remove_telegraf:
-docker rm -f $(TELEGRAF_CONTAINER)
watch_logs_telegraf:
docker logs -f $(TELEGRAF_CONTAINER)
redeploy_telegraf:
make remove_telegraf
make deploy_telegraf
make watch_logs_telegraf
access_container:
docker exec -it $(TELEGRAF_CONTAINER) bash
DOCKER_NETWORK_INTERFACE = docker0 # replace this with your Docker network interface
analyze_traffic:
sudo tcpdump -i $(DOCKER_NETWORK_INTERFACE) -U -w ./tcpdump.pcap #'port 8883'
analyze_traffic_file:
wireshark ./tcpdump.pcap
analyze_traffic_file_python:
wireshark ./tcpdump-py.pcap
perform_connection_check:
docker exec -it $(TELEGRAF_CONTAINER) ping a1q5qvt4b4ldkh-ats.iot.eu-central-1.amazonaws.com
###########################
# Python equivalent #
###########################
TELEGRAF_PY_IMAGE=telegraf-test-image-py
TELEGRAF_PY_CONTAINER=telegraf-test-container-py
TELEGRAF_PY_DOCKERFILE=telegraf-py.Dockerfile
build_python:
docker build -t $(TELEGRAF_PY_IMAGE) -f $(TELEGRAF_PY_DOCKERFILE) .
deploy_python:
docker run \
--detach --rm \
--name $(TELEGRAF_PY_CONTAINER) \
$(TELEGRAF_PY_IMAGE)
watch_logs_python:
docker logs -f $(TELEGRAF_PY_CONTAINER)
remove_python:
-docker rm -f $(TELEGRAF_PY_CONTAINER)
redeploy_python:
make remove_python
make build_python
make deploy_python
make watch_logs_python
And here's the Dockerfile
for the Python script:
FROM python:3.9.6 as py3106
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
WORKDIR /app
# copy requirement files
COPY ./requirements.txt ./requirements.txt
# install awscli
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
RUN unzip awscliv2.zip
RUN ./aws/install
# install general dependencies
RUN apt install gcc
RUN python -m pip install --upgrade pip && pip install --no-cache-dir -r requirements.txt
# copy script & certificates
COPY ./listen_to_mqtt.py ./listen_to_mqtt.py
COPY ./ca.pem ./ca.pem
COPY ./cert.pem ./cert.pem
COPY ./key.pem ./key.pem
# run the script
CMD python listen_to_mqtt.py
I used wireshark
and tcpdump
to analyze the network traffic from the container, but I'm admittedly a newbie doing that. When setting telegraf up with Port 8883
it fails within ~30s, and I don't see any key exchange and the [SYN] call doesn't receive a [SYN ACK]. When setting it up with Port 443
I see certificate exchanges and it takes around ~60s for the, essentially, same error message to appear: i/o timeout
.
So things I tried are:
telegraf
version (for 1.18.3
the container doesn't crash, but it fails to receive messages, later versions crash after mentioned delay)443
, 8883
, 8443
)AWS IoT-Core
's security policy (i.e. TLS13_1_3_2022_10
, TLS12_1_0_2015_01
, etc.) -- I see via wireshark that telegraf then uses TLS1.3 or TLS1.2, but apart from that no mayor change in behavior occursI'm expecting this to be about some tiny setting or configuration in either telegraf
or AWS
that I just don't know about. I'm not sure how I can proceed after spending close to a day on this already.
turns out the solution was quite simple, but a bit confusing due to the Python script: the outgoing Port was blocked in our network. The Python script (as mentioned) used Port 443 so it got through - Port 443 wouldn't work for telegraf though - and Port 8883 was blocked leading to the timeout error.