apache-kafkaapache-kafka-connectconfluent-platforms3-kafka-connector

Unable to use sink connector inside kafka connect


I am trying to use S3 sink connector inside kafka connect , It starts and fails later .

My config looks like :

{
    "name": "my-s3-sink3",
     "config": {
         "connector.class":"io.confluent.connect.s3.S3SinkConnector", 
         "tasks.max":"1", 
         "topics":"mysource.topic", 
         "s3.region":"us-east-1", 
         "s3.bucket.name": "topicbucket001", 
         "s3.part.size":"5242880", 
         "flush.size":"1", 
         "storage.class":"io.confluent.connect.s3.storage.S3Storage", 
         "format.class": "io.confluent.connect.s3.format.json.JsonFormat", 
         "partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner", 
         "schema.compatibility":"NONE"
        }
    }

My connect-distributed.properties look like:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false
errors.tolerance = all

Complete Error log :

[2021-04-06 10:59:04,398] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Member connector-consumer-s3connect12-0-f1e48df8-76ba-49f9-9080-e10b0a34202b sending LeaveGroup request to coordinator **********.kafka.us-east-1.amazonaws.com:9092 (id: 2147483645 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

2021-04-06 16:29:04
[2021-04-06 10:59:04,397] ERROR WorkerSinkTask{id=s3connect12-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

2021-04-06 16:29:04
[2021-04-06 10:59:04,396] ERROR WorkerSinkTask{id=s3connect12-0} Error converting message key in topic 'quickstart-status' partition 3 at offset 0 and timestamp 1617706740956: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask)

2021-04-06 16:29:04
[2021-04-06 10:59:04,393] INFO [Consumer clientId=connector-consumer-s3connect12-0, groupId=connect-s3connect12] Resetting offset for partition quickstart-status-3 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[***************.kafka.us-east-1.amazonaws.com:9092 (id: 1 rack: use1-az2)], epoch=absent}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState)

Message type :

{
   "registertime": 1511985752912,
   "userid": "User_6",
   "regionid": "Region_8",
   "gender": "FEMALE"
}

enter image description here

New ERROR Log : enter image description here


Solution

  • The problem is the Key SerDe. Per your screenshot the key data is a non-JSON string:

    User_2
    User_9
    etc
    

    So instead of

    key.converter=org.apache.kafka.connect.json.JsonConverter
    

    use

    key.converter=org.apache.kafka.connect.storage.StringConverter
    

    Edit:

    Try this for your connector config, specifying the converters explicitly (as suggested by @OneCricketeer)

    {
        "name": "my-s3-sink3",
         "config": {
             "connector.class"               : "io.confluent.connect.s3.S3SinkConnector",
             "tasks.max"                     : "1",
             "topics"                        : "mysource.topic",
             "s3.region"                     : "us-east-1",
             "s3.bucket.name"                : "topicbucket001",
             "s3.part.size"                  : "5242880",
             "flush.size"                    : "1",
             "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
             "value.converter"               : "org.apache.kafka.connect.json.JsonConverter",
             "value.converter.schemas.enable": "false",
             "storage.class"                 : "io.confluent.connect.s3.storage.S3Storage",
             "format.class"                  : "io.confluent.connect.s3.format.json.JsonFormat",
             "partitioner.class"             : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
             "schema.compatibility"          : "NONE"
            }
        }