pythonapache-kafkawebsocketeclipse-dittoditto

Websocket to apache kafka in eclipse Ditto


I am having a problem with eclipse ditto. I want to send a command to update the features of a digital twins using a websocket (in python) and I want to read the new features in an apache kafka topic. This is my websocket:

import asyncio 
import random
import time
from websockets import connect
import json

async def func(uri):
  async with connect(uri) as websocket:
    await websocket.send("START-SEND-EVENTS")
    #await websocket.send("START-SEND-MESSAGES")
    message = await websocket.recv()
    print(message)
    while(True):
        
        msg = {
            "topic": "org.eclipse.ditto/camera01/things/twin/commands/modify",
            "headers": {
                "content-type": "text/plain"
            },
            "path": "features/coordinates/properties",
            "value": {"x": random.randrange(0,1000), "y": random.randrange(0,1000), "z": random.randrange(0,1000), "x_rotation": 0.0, "y_rotation": 0.0, "z_rotation": 0.0, "w_rotation": 1.0, "thingId": "org.eclipse.ditto:camera01"}
        }
        to_send = json.dumps(msg)
        time.sleep(1)
        await websocket.send(to_send)
        msg_recv = await websocket.recv()
        print(msg_recv)
uri = "ws://ditto:ditto@localhost:8080/ws/2"
asyncio.run(func(uri))

When I send a message, ditto updates the digital twin and a second websocket gets the new features, but kafka's topic doesn't receive it.

I thought the problem may be the target connection, but it doesn't seem like there are any errors. This is how I set it up:

{
"targetActorSelection": "/system/sharding/connection",
"headers": {
    "aggregate": false
},
"piggybackCommand": {
    "type": "connectivity.commands:modifyConnection",
    "connection": {
        "id": "kafka-connection-target",
        "connectionType": "kafka",
        "connectionStatus": "open",
        "failoverEnabled": true,
        "uri": "tcp://localhost:9092",
        "specificConfig":{
       "bootstrapServers":"localhost:9092"
         },
        "targets": [{
            "address": "topic_ditto",
            "topics": [
                "_/_/things/twin/events",
                "_/_/things/live/messages"
            ],
            "authorizationContext": ["ditto:unity"],
            "qos": 0
        }],
        "mappingContext": {
            "mappingEngine": "JavaScript",
            "options": {
                "incomingScript": "function mapToDittoProtocolMsg(headers, textPayload, bytePayload, contentType) {return null;}",
                "outgoingScript": "function mapFromDittoProtocolMsg(namespace, id, group, channel, criterion, action, path, dittoHeaders, value, status, extra) {let textPayload = '{\"x\":' + value.coordinates.properties.x + ',\"y\":' + value.coordinates.properties.y + ',\"z\":' + value.coordinates.properties.z + ',\"x_rotation\":' + value.coordinates.properties.x_rotation + ',\"y_rotation\": ' + value.coordinates.properties.y_rotation + ', \"z_rotation\": ' + value.coordinates.properties.z_rotation + ',\"w_rotation\":' + value.coordinates.properties.w_rotation + ',\"idCamera\":\"' + id + '\"}'; let bytePayload = null; let contentType = 'text/plain; charset=UTF-8'; return  Ditto.buildExternalMsg(dittoHeaders, textPayload, bytePayload, contentType);}",
                "loadBytebufferJS": "false",
                 "loadLongJS": "false"
            }
        }
    }
}
}

Note: if I update the digital twin using a topic (specified in a source connection), the topic of the target connection receives the new features (also the second websocket ..)


Solution

  • Solved.

    the path of the message to be sent with websocket was not the correct path to pass to the mapping function of the target connection. This is why I was able to update the digital twin without being able to update the "ditto_topic" topic.

    I passed path "features/coordinates/properties" but the correct path is "/features" for how I set up the mapping function.

    The form of the message I sent is "{" x ": random.randrange (0,1000), .." but the correct form in this case is {"coordinates": {"properties": {"x": random.randrange (0,1000), ... "