elasticsearchapache-kafkaapache-kafka-connect

Kafka Connect Sink JsonConverter DataException: Converting byte[] to Kafka Connect data failed due to serialization error


Created an Apache Kafka cluster (3 brokers, 3 controllers, 1 worker for now) for a project and have multiple topics receiving data from a 3rd party. The worker is leveraging the confluent elasticsearch plugin.

I Would like to understand where I'm going wrong with the configuration or my understanding of the data. I am relatively new to Apache Kafka and only a fledgling developer. I would like to think I have a decent grasp of tech but the Kafka ecosystem makes my head swim.

Example output from console consumer for 1 topic. All are similarly formatted with no schema.

{
    "location": "Battery Tester1",
    "dc": "16.20V",
    "ac": "12.01V",
    "curr": " 0.00A",
    "temperature": "32.00C",
    "status": [
        "Currently on AC power"
    ]
}
{
    "location": "Battery Tester2",
    "dc": "16.10V",
    "ac": "11.01V",
    "curr": " 2.00A",
    "temperature": "34.00C",
    "status": [
        "Currently on AC power"
    ]
}
{
    "location": "Battery Tester3",
    "status": [
        "Currently on AC power"
    ]
}

The connect-standalone.properties are:

bootstrap.servers=kafbrk01-4:9092,kafbrk01-5:9092,kafbrk01-6:9092
config.storage.topic: es-connect-kafwrk01-configs
offset.storage.topic: es-connect-kafwrk01-offsets
status.storage.topic: es-connect-kafwrk01-status
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/opt/kafka/plugins

With the plugin quickstart-elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=Power,Router,Gateway
key.ignore=true
connection.url=https://<FQDN to es01>:9200,https://<FQDN to es02>:9200,https://<FQDN to es03>:9200
connection.username=es_sink_connector_user
connection.password=FakePasswordBecause!
type.name=kafka-connect
elastic.security.protocol = PLAINTEXT
schema.ignore=true

When running with this configuration, get the following error stack

[2024-04-24 22:06:15,672] ERROR [elasticsearch-sink|task-0] WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:533)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:333)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:533)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
        ... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"key"; line: 1, column: 4]
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:69)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:331)
        ... 18 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"key"; line: 1, column: 4]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3635)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2734)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
        at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4703)
        at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3090)
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:67)
        ... 19 more

I originally wrote a small python application when building/testing the kafka cluster to write entries into the cluster and the sink works with it. I'm stumped but based on reading/research feel like this is perhaps the JSON formatting? I'm seeing what looks like valid JSON to me but does the error indicate it's not and the console consumer is showing it to me that way by design?

Also, I tried using the org.apache.kafka.connect.storage.StringConverter for key/value but that creates empty indices and errors out after 6 retries with errors referencing org.elasticsearch.common.compress.NotXContentException: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes.

Update 4/25/24 As an exercise, I dumped the contents of each topic into an individual file (> /tmp/) and inserted a {"create":{"_index":"<topicname>"}} above each entry. Attempting to bulk import into elasticsearch using curl failed with similar errors. I modified the pretty json layout into a single line for each entry and that successfully imported. Does this help indicate anything? Maybe it's treated as string data for each line instead of actual json?

Any guidance, tips or feedback is appreciated. Thank you for your time regardless.


Solution

  • After research and further testing, I found that in my scenario the data being sent via 3rd party was sending with a null key value which was not readily apparent to my novice eyes.

    I only realized it when I ran a local console consumer with the option to show keys.

    /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafbrk01-4:9092 --topic Power --property=print.key=true --from-beginning

    This showed up as

    key     {
        "location": "Battery Tester1",
        "dc": "16.20V",
        "ac": "12.01V",
        "curr": " 0.00A",
        "temperature": "32.00C",
        "status": [
            "Currently on AC power"
        ]
    }
    key     {
        "location": "Battery Tester2",
        "dc": "16.10V",
        "ac": "11.01V",
        "curr": " 2.00A",
        "temperature": "34.00C",
        "status": [
            "Currently on AC power"
        ]
    }
    key     {
        "location": "Battery Tester3",
        "status": [
            "Currently on AC power"
        ]
    }
    

    With that in mind, and hoping the string converter handled null, I switched the connect-standalone.properties configuration

    key.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false
    

    Afterwards, I saw the connect worker start and it successfully processed the data in every topic and pushed to the ELK cluster.

    Hope this helps someone in the future.