I'm trying to read data into my topic from a RabbitMQ queue using the Kafka connector with the configuration below:
{
"name" : "RabbitMQSourceConnector1",
"config" : {
"connector.class" : "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "rabbitmqtest3",
"rabbitmq.queue" : "taskqueue",
"rabbitmq.host" : "localhost",
"rabbitmq.username" : "guest",
"rabbitmq.password" : "guest",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true"
}
}
But I´m having troubles when converting the source stream to JSON format as I´m losing the original message
Original:
{'id': 0, 'body': '010101010101010101010101010101010101010101010101010101010101010101010'}
Received:
{"schema":{"type":"bytes","optional":false},"payload":"eyJpZCI6IDEsICJib2R5IjogIjAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMDEwMTAxMCJ9"}
Does anyone have an idea why this is happening?
EDIT: I tried to convert the message to String using the "value.converter": "org.apache.kafka.connect.storage.StringConverter", but the result is the same:
11/27/19 4:07:37 PM CET , 0 , [B@1583a488
EDIT2:
I´m now receiving the JSON file but the content is still encoded in BASE64
Any idea on how to convert it back to UTF8 directly?
{
"name": "adls-gen2-sink",
"config": {
"connector.class":"io.confluent.connect.azure.datalake.gen2.AzureDataLakeGen2SinkConnector",
"tasks.max":"1",
"topics":"rabbitmqtest3",
"flush.size":"3",
"format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"internal.value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"topics.dir":"sw66jsoningest",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor":"1",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}
}
UPDATE:
I got the solution, considering this flow:
Message (JSON) --> RabbitMq (ByteArray) --> Kafka (ByteArray) -->ADLS (JSON)
I used this converter on the RabbitMQ to Kafka connector to decode the message from Base64 to UTF8.
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
Afterwards I treated the message as a String and saved it as a JSON.
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"format.class":"io.confluent.connect.azure.storage.format.json.JsonFormat",
If you set schemas.enable": "false"
, you shouldn't be getting the schema and payload fields
If you want no translation to happen at all, use ByteArrayConverter
If your data is just a plain string (which includes JSON), use StringConverter
It's not clear how you're printing the resulting message, but looks like you're printing the byte array and not decoding it to a String