pythonapache-sparkpysparkapache-kafkajdbc

Unable to get the postgres data in the right format via Kafka, JDBC source connector and pyspark


I have created a table in Postgres:

CREATE TABLE IF NOT EXISTS public.sample_a
(
    id text COLLATE pg_catalog."default" NOT NULL,
    is_active boolean NOT NULL,
    is_deleted boolean NOT NULL,
    created_by integer NOT NULL,
    created_at timestamp with time zone NOT NULL,
    created_ip character varying(30) COLLATE pg_catalog."default" NOT NULL,
    created_dept_id integer NOT NULL,
    updated_by integer,
    updated_at timestamp with time zone,
    updated_ip character varying(30) COLLATE pg_catalog."default",
    updated_dept_id integer,
    deleted_by integer,
    deleted_at timestamp with time zone,
    deleted_ip character varying(30) COLLATE pg_catalog."default",
    deleted_dept_id integer,
    sql_id bigint NOT NULL,
    ipa_no character varying(30) COLLATE pg_catalog."default" NOT NULL,
    pe_id bigint NOT NULL,
    uid character varying(30) COLLATE pg_catalog."default" NOT NULL,
    mr_no character varying(15) COLLATE pg_catalog."default" NOT NULL,
    site_id integer NOT NULL,
    entered_date date NOT NULL,
    CONSTRAINT pk_patient_dilation PRIMARY KEY (id)
);

and I have inserted the data as below:

INSERT INTO sample_a (id, is_active, is_deleted, created_by, created_at, created_ip, created_dept_id, updated_by, updated_at, updated_ip, updated_dept_id, deleted_by, deleted_at, deleted_ip, deleted_dept_id, sql_id, ipa_no, pe_id, uid, mr_no, site_id, entered_date)
VALUES ('00037167-0894-4373-9a56-44c49d2285c9', TRUE, FALSE, 70516, '2024-10-05 08:12:25.069941+00','10.160.0.76', 4, 70516, '2024-10-05 09:25:55.218961+00', '10.84.0.1',4,NULL, NULL, NULL, NULL, 0,0,165587147,'22516767','P5942023',1,'10/5/24');

Now, I have created the JDBC source connector config as below:

   {
  "name": "JdbcSourceConnectorConnector_0",
  "config": {
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "name": "JdbcSourceConnectorConnector_0",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "connection.url": "jdbc:postgresql://postgres:5432/",
    "connection.user": "postgres",
    "connection.password": "********",
    "table.whitelist": "sample_a",
    "mode": "bulk"
  }
}

So when the data is pushed from the DB to the Kafka Topic, I can see the data in the readable format in the Kafka Control Center tab. Since I am using bulk mode, the data is continuously being loaded.

My Problem is when I fetch the data via Pyspark, it is not readable:

from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col

spark = SparkSession \
    .builder \
    .appName("Kafka_Test") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sample_a") \
    .option("startingOffsets","latest") \
    .load()

df.selectExpr("cast(value as string) as value").writeStream.format("console").start()

spark.streams.awaitAnyTermination()

Output:

H00037167-0894-4373-9a56-44c49d2285c9?ڹ??d10.160.0.7??????d10.84.0.0????22516767P5942023¸  

So do I access the specific attributes? Do I need any deserializer class?

TIA.

As per Nimi's suggestion I was able to fetch the schema via the schema registry and tried to use the from_avro method but it gave me this error:

jsonSchema = {"type":"record","name":"sample_a","fields":[{"name":"id","type":"string"},{"name":"is_active","type":"boolean"},{"name":"is_deleted","type":"boolean"},{"name":"created_by","type":"int"},{"name":"created_at","type":{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}},{"name":"created_ip","type":"string"},{"name":"created_dept_id","type":"int"},{"name":"updated_by","type":["null","int"],"default":None},{"name":"updated_at","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":None},{"name":"updated_ip","type":["null","string"],"default":None},{"name":"updated_dept_id","type":["null","int"],"default":None},{"name":"deleted_by","type":["null","int"],"default":None},{"name":"deleted_at","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":None},{"name":"deleted_ip","type":["null","string"],"default":None},{"name":"deleted_dept_id","type":["null","int"],"default":None},{"name":"sql_id","type":"long"},{"name":"ipa_no","type":"string"},{"name":"pe_id","type":"long"},{"name":"uid","type":"string"},{"name":"mr_no","type":"string"},{"name":"site_id","type":"int"},{"name":"entered_date","type":{"type":"int","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Date","logicalType":"date"}}],"connect.name": "sample_a"}
df.select(from_avro("value", json.dumps(jsonSchema)).alias("sample_a")) \
    .select("sample_a.*").writeStream.format("console").start()

error:.

     org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
    at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:113)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:435)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 70516 out of bounds for length 2
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)

Solution

  • Finally, I was able to solve the problem using this blog. The author has well explained the concept.

    The approach suggested by Nimi was right. There was this magic bytes trick that was missing and treating them the right way helped me to solve the issue.

    Will have to read about magic bytes in much detail though.

    I would like to Thank OneCricketeer and Nimi for their time. Much Appreciated.