apache-sparkpysparkapache-kafka-connectspark-structured-streamingspark-kafka-integration

Unable to push the data from the written kafka topic to Postgres table


I am trying to load the data written into the Kafka topic into the Postgres table. I can see the topic is receiving new messages every second and also the data looks good.

However, when I use the below JDBC sink configuration it is not able to load the data in the Postgres table:

{
  "name": "JdbcSinkConnectorConnector_0",
  "config": {
    "name": "JdbcSinkConnectorConnector_0",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "sample_b",
    "connection.url": "jdbc:postgresql://postgres:5432/",
    "connection.user": "postgres",
    "connection.password": "********",
    "insert.mode": "insert",
    "table.name.format": "${topic}",
    "auto.create": "true"
  }
}

Pyspark code that writes the data to Kafka topic:

transformed_df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
.writeStream.outputMode(outputMode='Append').format('kafka') \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "sample_b") \
.option("checkpointLocation", "src/checkpointLoc") \
.start() \
.awaitTermination()

When I checked the Kafka control center tab for this topic sample_b it gives the data in this format:

{
  "id": "00037167-0894-4373-9a56-44c49d2285c9",
  "is_active": true,
  "is_deleted": false,
  "created_by": 70516,
  "created_at": "2024-10-05T13:42:25.069+05:30",
  "created_ip": "10.160.0.76",
  "created_dept_id": 4,
  "updated_by": 70516,
  "updated_at": "2024-10-05T14:55:55.218+05:30",
  "updated_ip": "10.84.0.1",
  "updated_dept_id": 4,
  "sql_id": 0,
  "ipa_no": "0",
  "pe_id": 165587147,
  "uid": "22516767",
  "mr_no": "P5942023",
  "site_id": 1,
  "entered_date": "2024-10-05"
}

Now, I tried to push the data via another Kafka topic named test01. I created the topic via KSQL and pushed the data to it and using the same JDBC sink configuration I was able to push the data to the Postgres table without any issues.

 CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR)
    WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO')
 INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('V',4,'EOO');

The data for test01 looks like the below in the Kafka Control Center tab:

{
  "COL1": {
    "int": 4
  },
  "COL2": {
    "string": "EOO"
  }
}

I can see the schema difference between the two topics. So what exact change do I need to make in the sample_b topic writing so that it matches the test01 payload format?

I don't see any errors in the Kafka Connect logs.


Solution

  • SampleB topic is JSON, by usage of to_json Spark function.

    test01 topic, is Avro, by usage of VALUE_FORMAT=AVRO in ksqlDB.


    Like any Kafka application, serialization formats must match on both ends of the topic... An Avro Deserializer will not accept JSON, and vice versa

    it is not able to load the data in the Postgres table

    The Connect server has both logs and a /status API. Look at the errors, if there are any.

    My suspicion is that you have Kafka connect configured to read Avro data instead of JSON

    Configure the converters to fix it. https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

    make in the sample_b topic writing so that it matches the test01 payload format?

    You'd need to_avro function but not the one built into Spark, as that will not work with Confluent's AvroConverter (as you've previously discovered the "magic byte" format)

    See https://github.com/absaoss/abris/blob/master/documentation/python-documentation.md