apache-sparkpysparkapache-kafkaapache-spark-sql

Unable to format the kafka topic data via pyspark


I am trying to push the data to Kafka topic via KSQL with the below code:

CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR)
  WITH (KAFKA_TOPIC='test01', PARTITIONS=1, VALUE_FORMAT='AVRO');

INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('X',1,'FOO');
INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('Y',2,'BAR');

Now when I am trying to run the below Pyspark code and trying to fetch the data and print on the console, it is printing the value along with some junk values

from pyspark.sql.session import SparkSession
spark = SparkSession \
    .builder \
    .appName("Kafka_Test") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test01") \
    .option("startingOffsets","earliest") \
    .load()

df.selectExpr("cast(value as string) as value").writeStream.outputMode("append").format("console").start()

Output:

enter image description here

How can I format this much better?

As per Nimi's suggestion, I tried the below code where I fetched the schema from the schema registry but it is giving me null records. It is not fetching the actual values.

jsonSchema={"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"COL1","type":["null","int"],"default":None},{"name":"COL2","type":["null","string"],"default":None}],"connect.name":"io.confluent.ksql.avro_schemas.KsqlDataSourceSchema"}
df.select(from_avro("value", json.dumps(jsonSchema)).alias("sample_a")) \
    .writeStream.format("console").start()

Output:

+------------+
|    sample_a|
+------------+
|{null, null}|
|{null, null}|
|{null, null}|
+------------+

Solution

  • Finally, I was able to solve the problem using this blog. The author has well explained the concept.

    The approach suggested by Nimi was right. There was this magic bytes trick that was missing and treating them the right way helped me to solve the issue.

    Will have to read about magic bytes in much detail though.