
Facing Errors in Pyspark while deserializing avro formatted data coming from kafka using Apicurio

I am working on pyspark, getting avro encoded data through apicurio from kafka topic with spark structured streaming and want to de-serialize it and apply transformations and then save it to hive. I am facing following error if I donot use "mode as PERMISSIVE":

ERROR: " Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'."

If I use "mode as PERMISSIVE", then I got all nulls as seen below:

Batch: 8
|before|after|source|op  |ts_ms|transaction|
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |

If I print my schema it shows that it reads it right


Interestingly, I am giving schema registry's url so I dont think that schema is wrong, Code I am using is as follows:

# pyspark imports
import pyspark.sql.functions as func
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.types import StringType, TimestampType, IntegerType, BinaryType

# schema registry imports
from confluent_kafka.schema_registry import SchemaRegistryClient
import time
import pandas as pd
# import json
from pyspark.conf import SparkConf

#kafka_url = "kafka-broker:29092"
schema_registry_url = "http://localhost:8082/apis/ccompat/v6"
kafka_producer_topic = "topic"
#kafka_analyzed_topic = "wikimedia.processed"
schema_registry_subject = f"{kafka_producer_topic}-value"
#schema_registry_analyzed_data_subject = f"{kafka_analyzed_topic}-value"

kafka_bootstrap_servers = 'ip:9092'

# UDF function
binary_to_string_udf = func.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())

def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
    sr = SchemaRegistryClient({'url': schema_registry_url})
    latest_version = sr.get_latest_version(schema_registry_subject)
    return sr, latest_version

if __name__ == "__main__":
    print("Welcome Here")
    print("Data Processing App Start Form Here")
    print(time.strftime("%Y-%m-%d %H:%M:%S"))

    conf = SparkConf().setAppName("MyApp002").setMaster("local[*]") \
            .set("spark.executor.instances", "10")\
            .set("spark.executor.memory", "28g") \
            .set("spark.executor.cores", "16") 

    spark = SparkSession.builder.master("local[*]").appName("Pyspark App to Load WSLOG From Raw To Currated In Realtime").config(conf=conf).getOrCreate()

    spark.conf.set("spark.sql.shuffle.partitions", "2")
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.codegen.wholeStage", "false")
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")  

    wikimedia_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", KAFKA_TOPIC_NAME) \
        .option("startingOffsets", "earliest") \
        .option("spark.streaming.kafka.maxRatePerPartition", "50") \

    # remove first 5 bytes from value
    wikimedia_df = wikimedia_df.withColumn('fixedValue', func.expr("substring(value, 6, length(value)-5)"))

    #  get schema id from value
    #wikimedia_df = wikimedia_df.withColumn('valueSchemaId', binary_to_string_udf(func.expr("substring(value, 2, 4)")))

    # get schema using subject name
    _, latest_version_wikimedia = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)

    #deserialize data 
    fromAvroOptions = {"mode":"PERMISSIVE"}
    decoded_output =
            func.col("fixedValue"), latest_version_wikimedia.schema.schema_str, fromAvroOptions
    wikimedia_value_df ="wikimedia.*")

    wikimedia_value_df \
        .writeStream \
        .format("console") \
        .trigger(processingTime='1 second') \
        .outputMode("append") \
        .option("truncate", "false") \
        .start() \

Debezium Connector Configurations I used are as follows:

    "name": "wslog-debezium-xstream102",
        "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter.apicurio.registry.url": "http://localhost:8082/apis/registry/v2",
        "": "true",
        "key.converter.apicurio.registry.find-latest": "false",
        "": "true",
        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.url": "http://localhost:8082/apis/registry/v2",
        "": "true",
        "value.converter.apicurio.registry.find-latest": "false",
        "": "true",
        "" : "outbound_servername",
        "column.include.list": "column list",

I can see I am getting avro encoded data through kafka consumer.


I can't figure it out my mistake here, Have anyone else face same thing while working on kafka and spark with apicurio ?


  • I have resolved this issue by just increase the the number from 5 by 10,can be seen in following code:

    ## data_df3 = datadf_reserves.withColumn("fixedValue", func.expr("substring(value, 5, length(value))")) 
    data_df3 = datadf_reserves.withColumn("fixedValue", func.expr("substring(value, 10, length(value))")) 

    I knew that there is always a 5 magic bytes padding present while we decode avro by confluent, But I dont know why when I use 10 it worked.