I've been strugglin with some problemas working along with kafka streams and how my consumer expected the data. My pipeline works like next:
{
"name": "couchbase-source-connector",
"config": {
"connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
"couchbase.persistence.polling.interval": "100ms",
"tasks.max": "2",
"couchbase.seed.nodes": "couchbasedb",
"couchbase.collections": "art-school-scope.student-record-collection",
"couchbase.bucket": "student-bucket",
"couchbase.username": "username",
"couchbase.password": "password",
"couchbase.stream.from": "NOW",
"couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"couchbase.topic": "topicstream"
}
}
{"xyz": "Test from couchbase"}
My message ever adds succesfully to the topic. Then I need to send to a database.
void startStreaming() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafkastreamtopic");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.FailOnInvalidTimestamp");
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("topicstream")
.mapValues(value -> {
try {
JSONObject jsonObject = new JSONObject(value);
// Extract the "xyz" field
String xyz = jsonObject.optString("xyz", null);
// Create a new JSON object with the correct format
JSONObject transformedObject = new JSONObject();
transformedObject.put("xyz", xyz);
return transformedObject.toString();
} catch (Exception e) {
// Log and handle the error appropriately
logger.error("Error transforming value: {}", value, e);
return value; // Return the original value in case of error
}
})
.to("jdbcsinktopic2", Produced.valueSerde(Serdes.String()));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.setStateListener((newState, oldState) -> {
logger.info("State changed from " + oldState + " to " + newState);
});
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
The code above writes succesfully the message onto jdbcsinktopic2
and I can see like this:
{"xyz":"Test from couchbase"}
kafka-json-schema-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic jdbcsinktopic2 --property value.schema='{"type":"object", "properties":{"xyz":{"type":"string"}}}'
when I tested it by adding the message when I created the schema the data got inserted on my database without any problem.
{
"name": "mysql-sink-2",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "jdbcsinktopic2",
"connection.url": "jdbc:mysql://mysql:3306/inventory",
"connection.username": "debeziumexample",
"connection.password": "example",
"auto.create": "true",
"insert.mode": "insert",
"pk.mode": "none",
"value.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true
}
}
The problem comes when I transform the data and how my consumer expected it. I'm quite new at this topic. How can I manage this data to send it transformed with my kafka stream application to my database through my jdbcsink?
Any advice, correction or help would be amazing.
Regards, Rigo
I resumed the project after some time and finally I succeeded in solving my problem.
First, I need to configure my Couchbase Source Connector. There were no changes required (wich you can see above).
I started to use schema Registry and ksqldb. My messages wich came from Couchbase was a simple json objects as follows. (Schema less json)
{"xyz":"This is a value"}
Then by using ksqldb I executed the following statements:
CREATE STREAM smjson (xyz VARCHAR) WITH (KAFKA_TOPIC='topicstream', VALUE_FORMAT='JSON');
SELECT * FROM smjson EMIT CHANGES;
CREATE STREAM SOME_JSON_AS_AVRO WITH (VALUE_FORMAT='AVRO') AS SELECT * FROM smjson;
It's necessary add to your ksqldb server the configuration to connect with schema registry and executed the above statements, on "http://schema-registry:8081/schemas". After that you will see the schemas autogenerated.
After this, you can configure your JdbcSinkConnector as the next example:
{
"name": "mysql-sink",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "SOME_JSON_AS_AVRO",
"connection.url": "jdbc:mysql://mysql:3306/inventory",
"connection.username": "*",
"connection.password": "*",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "insert",
"pk.mode": "none",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"quote.identifiers": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true
}
}
After this, your implementation to move data from Couchbase to any Relational Database should be working.
Also, I highly recommend to follow those tutorials Kafka Connect in Action: JDBC Sink from Robin Moffatt channel