apache-kafkacouchbaseapache-kafka-streams

Unknown Magic Byte - Kafka Json


I've been strugglin with some problemas working along with kafka streams and how my consumer expected the data. My pipeline works like next:

  1. I configured a source connector of couchbase that add the next message to a topic:
{
    "name": "couchbase-source-connector",
    "config": {
        "connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
        "couchbase.persistence.polling.interval": "100ms",
        "tasks.max": "2",
        "couchbase.seed.nodes": "couchbasedb",
        "couchbase.collections": "art-school-scope.student-record-collection",
        "couchbase.bucket": "student-bucket",
        "couchbase.username": "username",
        "couchbase.password": "password",
        "couchbase.stream.from": "NOW",
        "couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "couchbase.topic": "topicstream"
    }
}
{"xyz": "Test from couchbase"}

My message ever adds succesfully to the topic. Then I need to send to a database.

  1. I tried to implement with kafka streams.
 void startStreaming() {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafkastreamtopic");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.FailOnInvalidTimestamp");

            StreamsBuilder builder = new StreamsBuilder();
            builder.<String, String>stream("topicstream")
                    .mapValues(value -> {
                        try {
                            JSONObject jsonObject = new JSONObject(value);
                            // Extract the "xyz" field
                            String xyz = jsonObject.optString("xyz", null);
                            // Create a new JSON object with the correct format
                            JSONObject transformedObject = new JSONObject();
                            transformedObject.put("xyz", xyz);
                            return transformedObject.toString();
                        } catch (Exception e) {
                            // Log and handle the error appropriately
                            logger.error("Error transforming value: {}", value, e);
                            return value; // Return the original value in case of error
                        }
                    })
                    .to("jdbcsinktopic2", Produced.valueSerde(Serdes.String()));

            KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
            kafkaStreams.setStateListener((newState, oldState) -> {
                logger.info("State changed from " + oldState + " to " + newState);
            });

            kafkaStreams.start();

            Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));

The code above writes succesfully the message onto jdbcsinktopic2 and I can see like this:

{"xyz":"Test from couchbase"}
  1. Also I added a schema to schema registry and my sink configuration
kafka-json-schema-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic jdbcsinktopic2 --property value.schema='{"type":"object", "properties":{"xyz":{"type":"string"}}}'

when I tested it by adding the message when I created the schema the data got inserted on my database without any problem.

{
    "name": "mysql-sink-2",
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "jdbcsinktopic2",
        "connection.url": "jdbc:mysql://mysql:3306/inventory",
        "connection.username": "debeziumexample",
        "connection.password": "example",
        "auto.create": "true",
        "insert.mode": "insert",
        "pk.mode": "none",
        "value.converter.schemas.enable": "true",
        "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "true",
        "errors.tolerance": "all",
        "errors.log.enable": true,
        "errors.log.include.messages": true
    }
}

The problem comes when I transform the data and how my consumer expected it. I'm quite new at this topic. How can I manage this data to send it transformed with my kafka stream application to my database through my jdbcsink?

Any advice, correction or help would be amazing.

Regards, Rigo


Solution

  • I resumed the project after some time and finally I succeeded in solving my problem.

    First, I need to configure my Couchbase Source Connector. There were no changes required (wich you can see above).

    I started to use schema Registry and ksqldb. My messages wich came from Couchbase was a simple json objects as follows. (Schema less json)

    {"xyz":"This is a value"}
    

    Then by using ksqldb I executed the following statements:

    CREATE STREAM smjson (xyz VARCHAR) WITH (KAFKA_TOPIC='topicstream', VALUE_FORMAT='JSON');
    
    SELECT * FROM smjson EMIT CHANGES;
    
    CREATE STREAM SOME_JSON_AS_AVRO WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM smjson;
    

    It's necessary add to your ksqldb server the configuration to connect with schema registry and executed the above statements, on "http://schema-registry:8081/schemas". After that you will see the schemas autogenerated.

    After this, you can configure your JdbcSinkConnector as the next example:

    {
        "name": "mysql-sink",
        "config": {
            "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
            "tasks.max": "1",
            "topics": "SOME_JSON_AS_AVRO",
            "connection.url": "jdbc:mysql://mysql:3306/inventory",
            "connection.username": "*",
            "connection.password": "*",
            "auto.create": "true",
            "auto.evolve": "true",
            "insert.mode": "insert",
            "pk.mode": "none",
            "key.converter": "io.confluent.connect.avro.AvroConverter",
            "key.converter.schema.registry.url": "http://schema-registry:8081",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "quote.identifiers": "true",
            "errors.tolerance": "all",
            "errors.log.enable": true,
            "errors.log.include.messages": true
        }
    }
    

    After this, your implementation to move data from Couchbase to any Relational Database should be working.

    Also, I highly recommend to follow those tutorials Kafka Connect in Action: JDBC Sink from Robin Moffatt channel