I am trying to load the data written into the Kafka topic into the Postgres
table. I can see the topic is receiving new messages every second and also the data looks good.
However, when I use the below JDBC sink configuration it is not able to load the data in the Postgres table:
{
"name": "JdbcSinkConnectorConnector_0",
"config": {
"name": "JdbcSinkConnectorConnector_0",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "sample_b",
"connection.url": "jdbc:postgresql://postgres:5432/",
"connection.user": "postgres",
"connection.password": "********",
"insert.mode": "insert",
"table.name.format": "${topic}",
"auto.create": "true"
}
}
Pyspark code that writes the data to Kafka topic:
transformed_df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream.outputMode(outputMode='Append').format('kafka') \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sample_b") \
.option("checkpointLocation", "src/checkpointLoc") \
.start() \
.awaitTermination()
When I checked the Kafka control center tab for this topic sample_b
it gives the data in this format:
{
"id": "00037167-0894-4373-9a56-44c49d2285c9",
"is_active": true,
"is_deleted": false,
"created_by": 70516,
"created_at": "2024-10-05T13:42:25.069+05:30",
"created_ip": "10.160.0.76",
"created_dept_id": 4,
"updated_by": 70516,
"updated_at": "2024-10-05T14:55:55.218+05:30",
"updated_ip": "10.84.0.1",
"updated_dept_id": 4,
"sql_id": 0,
"ipa_no": "0",
"pe_id": 165587147,
"uid": "22516767",
"mr_no": "P5942023",
"site_id": 1,
"entered_date": "2024-10-05"
}
Now, I tried to push the data via another Kafka topic named test01
. I created the topic via KSQL and pushed the data to it and using the same JDBC sink configuration I was able to push the data to the Postgres table without any issues.
CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR)
WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO')
INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('V',4,'EOO');
The data for test01 looks like the below in the Kafka Control Center tab:
{
"COL1": {
"int": 4
},
"COL2": {
"string": "EOO"
}
}
I can see the schema difference between the two topics. So what exact change do I need to make in the sample_b
topic writing so that it matches the test01
payload format?
I don't see any errors in the Kafka Connect
logs.
SampleB topic is JSON, by usage of to_json
Spark function.
test01 topic, is Avro, by usage of VALUE_FORMAT=AVRO
in ksqlDB.
Like any Kafka application, serialization formats must match on both ends of the topic... An Avro Deserializer will not accept JSON, and vice versa
it is not able to load the data in the Postgres table
The Connect server has both logs and a /status
API. Look at the errors, if there are any.
My suspicion is that you have Kafka connect configured to read Avro data instead of JSON
Configure the converters to fix it. https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
make in the sample_b topic writing so that it matches the test01 payload format?
You'd need to_avro
function but not the one built into Spark, as that will not work with Confluent's AvroConverter (as you've previously discovered the "magic byte" format)
See https://github.com/absaoss/abris/blob/master/documentation/python-documentation.md