apache-kafkaneo4japache-kafka-connect

Neo4j CDC Events in Unexpected Format for Property Values


I have enabled CDC in my Neo4j instance and set up a Kafka topic to listen for data changes in the Neo4j database. According to the Neo4j documentation, the event data should be in a key-value pair format, but I am receiving a more complex structure with additional fields for each property.

Here is a sample of the data I receive from the Kafka topic:

{
  "id": "CJUg4WrNW0Y7ttlh8lbkxfwAAAAAAAAEMAAAAAAAAAACAAABkh21o5c=",
  "txId": 1072,
  "seq": 2,
  "event": {
    "elementId": "4:9520e16a-cd5b-463b-b6d9-61f256e4c5fc:2073",
    "eventType": "NODE",
    "operation": "CREATE",
    "labels": [
      "Environment"
    ],
    "keys": {},
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Environment"
        ],
        "properties": {
          "name": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "Dev",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          },
          "id": {
            "B": null,
            "I64": null,
            "F64": null,
            "S": "78b90e78-9b79-4330-9d02-7895f349964b",
            "BA": null,
            "TLD": null,
            "TLDT": null,
            "TLT": null,
            "TZDT": null,
            "TOT": null,
            "TD": null,
            "SP": null,
            "LB": null,
            "LI64": null,
            "LF64": null,
            "LS": null,
            "LTLD": null,
            "LTLDT": null,
            "LTLT": null,
            "LZDT": null,
            "LTOT": null,
            "LTD": null,
            "LSP": null
          }
        }
      }
    }
  }
}

My neo4j server version is 5.22 and Kafka connect version is 5.1.1

Kafka connector config:

{
  "name": "neo4j-source-connector",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "neo4j.uri": "bolt://localhost:7687",
    "neo4j.streaming.from": "ALL",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "500ms",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.neo4j-node-topic.patterns.0.pattern": "(:Environment)",
  }
}

Notice that for the properties name and id, instead of receiving a simple key-value format, I am seeing a structure with fields like B, I64, S, etc. I expected something like this:

"id": "78b90e78-9b79-4330-9d02-7895f349964b"

Is there a configuration I am missing or something else that could explain why I'm receiving the data in this format? How can I simplify it to just key-value pairs?


Solution

  • Update: the simple-but-flawed format, coined as COMPACT has made its return with the release of 5.1.5. This is not the default format for the reasons explained below. Please read the dedicated documentation section on this topic (no pun intended).


    Original answer:

    What you are seeing has been introduced in version 5.1.0-rc02 of the connector, which has since been promoted as generally available (5.1.0 and later).

    In short, this makes sure that the change events published to a Kafka topic always adhere to the same registered message schema, even if a particular property has changed its type on the Neo4j side (for instance: :Environment(id) changes from an integer to a string).

    Basically, all node/relationship property values are encoded as a struct of all supported Neo4j types, with the corresponding property value once being set and null everywhere else ("B" for boolean, "I64" for int64/long, "F64" for float64/double, "S" for String...)

    If you try this scenario with earlier releases of the source connector, such a property type change would result in an error when the schema is enforced on topic messages (at publishing time and/or consumption time depending on your configuration).

    Moreover, the earlier format would not work well with the sink connector (think of a sync scenario between 2 Neo4j instances, one as a source, one as a sink).