kubernetesapache-kafkamqtteclipse-dittoeclipse-digital-twin

Eclipse Ditto overloads when reading and sending large numbers of messages


We are integrating Eclipse Ditto into a digital twin platform, but we have encountered a problem while testing and we don't really know how to fix it.

We made a question related to this one time ago and it worked. Here you have the link to that question: Eclipse Ditto does not send all things events over target connection

Unfortunately it started failling again but we dont think that the problem is the same as before.

We are in the same scenario, the goal is to receive in 593 twins (Ditto Thing) the result of a simulation. The idea is to be able to do several simulation runs simultaneously and that each simulation run sends 593 messages to a Kafka topic. For example, for 6 runs we will have 3558 messages in the topic.

We upgraded all fields and values that were given to us deleted javascript mapping and tested with the maximun amount of simulations, 360. It worked with 360 simulations that send a total of 213480 messages. No messages were droped in any of the tests that we carried out. Perfect!.

So we decided to make some test over all the platform to measure latency. The workflow of the data is the following:

Simulation --> Kafka --> Ditto --> MQTT (Mosquitto) --> Database

We made a script that sended 1 simulation, waited the data to be stored into the database and then retrieved timestamps. When al 593 messages arrived, the script sended 2 simulations, waited the all 1186 messages to arrive to the db and then sended a run with 3 simulations. The script should stop when it reached 360 simulations simulataneously.

We found that ditto was not capable of processing data from 200 simulations even when it was previously capable of supporting 360. We tried giving Ditto and its components more resources, don't worry we are still free resources, but nothing changed. It even got worse.

We decided to reinstall every component with the configuration that worked previously but now we found some problems:

The funny thing is that all those unread/unsended messages sometimes are sent after 1 or 2 hours to the MQTT broker, even though we delete all messages from the Kafka topic. We though that Ditto stores some data in a cache, but we dont know how to clear it or stop it.

Furthermore, despite all these problems, we have 5 twins receiving data every 15 minutes and sending it over MQTT via other connections. These twins are working properly at all times.

On the other hand we are a little bit confused about resources management cause we are using Kubernetes. We dont know exactly the amount of resources that Ditto need for a specific amount of connections, things, etc, or even if we need to give arguments to the JVM. Sometimes connections pods are restarted due to an AskTimeoutException error.

Here are the connections we have established, their logs and metrics, along with the Helm's values.yaml.

When executing just one simulation at the begining of the morning it works correctly, but when executing simulations after that, it start failing.

Source connection:

{
    "name": "connection-for-pivot-simulation-with-idSimulationRun",
    "connectionType": "kafka",
    "connectionStatus": "open",
    "uri": "tcp://KAFKAIP",
    "sources": [
        {
            "addresses": [
                "riego"
            ],
            "consumerCount": 1,
            "qos": 1,
            "authorizationContext": [
                "nginx:ditto"
            ],
            "headerMapping": {
                "correlation-id": "{{header:correlation-id}}",
                "namespace": "{{ entity:namespace }}",
                "content-type": "{{header:content-type}}",
                "connection": "{{ connection:id }}",
                "id": "{{ entity:id }}",
                "reply-to": "{{header:reply-to}}"
            },
            "replyTarget": {
                "address": "{{header:reply-to}}",
                "headerMapping": {
                    "content-type": "{{header:content-type}}",
                    "correlation-id": "{{header:correlation-id}}"
                },
                "expectedResponseTypes": [
                    "response",
                    "error"
                ],
                "enabled": true
            }
        }
    ],
    "targets": [],
    "clientCount": 5,
    "failoverEnabled": true,
    "validateCertificates": true,
    "processorPoolSize": 1,
    "specificConfig": {
        "saslMechanism": "plain",
        "bootstrapServers": "KAFKAIP"
    },
    "tags": []
}

Target connection:

{
    "name": "mqtt-connection-for-telegraf-pivot",
    "connectionType": "mqtt-5",
    "connectionStatus": "open",
    "uri": "tcp://MQTTIP",
    "sources": [],
    "targets": [
        {
            "address": "opentwins/{{ topic:channel }}/{{ topic:criterion }}/{{ thing:namespace }}/{{ thing:name }}",
            "topics": [
                "_/_/things/twin/events?namespaces=pivot&extraFields=thingId,attributes/_parents,features/idSimulationRun/properties/value",
                "_/_/things/live/messages",
                "_/_/things/live/commands"
            ],
            "qos": 1,
            "authorizationContext": [
                "nginx:ditto"
            ],
            "headerMapping": {}
        }
    ],
    "clientCount": 5,
    "failoverEnabled": true,
    "validateCertificates": true,
    "processorPoolSize": 1,
    "tags": []
}

Values:


swaggerui:
  enabled: false

mongodb:
  enabled: false

global:
  prometheus:
    enabled: true

dbconfig:
  connectivity:
    uri: mongodb://dt-mongodb:27017/connectivity
  things:
    uri: mongodb://dt-mongodb:27017/things
  searchDB:
    uri: mongodb://dt-mongodb:27017/search
  policies:
    uri: mongodb://dt-mongodb:27017/policies

connectivity:
  replicaCount: 5
  extraEnv:
    - name: MQTT_CONSUMER_THROTTLING_ENABLED
      value: "false"
    - name: MQTT_CONSUMER_THROTTLING_LIMIT
      value: "100000"
    - name: KAFKA_CONSUMER_THROTTLING_ENABLED
      value: "false"
    - name: KAFKA_CONSUMER_THROTTLING_LIMIT
      value: "100000"
    - name: KAFKA_SESSION_TIMEOUT_MS
      value: "60000"
    - name: CONNECTIVITY_MQTT_MAX_QUEUE_SIZE
      value: "100000"
    - name: CONNECTIVITY_KAFKA_MAX_QUEUE_SIZE       
      value: "100000"
    - name: CONNECTIVITY_SIGNAL_ENRICHMENT_BUFFER_SIZE
      value: "300000"
    - name: CONNECTIVITY_MESSAGE_MAPPING_MAX_POOL_SIZE
      value: "10"
  resources:
    requests:
      cpu: 2000m
    limits:
      memory: 3Gi

gateway:
  resources:
    requests:
      cpu: 1000m
    limits:
      memory: 768Mi

nginx:
  replicaCount: 2
  service:
    type: NodePort
    nodePort: 30525
  resources:
    requests:
      cpu: 500m
    limits:
      cpu: 1000m
      memory: 768Mi

policies:
  resources:
    requests:
      cpu: 1000m
    limits:
      memory: 768Mi

things:
  replicaCount: 1
  resources:
    requests:
      cpu: 1000m
    limits:
      memory: 8192Mi

thingsSearch:
  resources:
    requests:
      cpu: 1000m
    limits:
      memory: 768Mi


Solution

  • The behaviour described on OP was a product of multiple errors on Ditto 3.1.2

    We believe version 3.3.0 has solved most of our issues, but further testing is needed.