pysparkapache-kafkaavroconfluent-schema-registryapicurio-registry

Facing Errors in Pyspark while deserializing avro formatted data coming from kafka using Apicurio


I am working on pyspark, getting avro encoded data through apicurio from kafka topic with spark structured streaming and want to de-serialize it and apply transformations and then save it to hive. I am facing following error if I donot use "mode as PERMISSIVE":

ERROR: " Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'."

If I use "mode as PERMISSIVE", then I got all nulls as seen below:

-------------------------------------------
Batch: 8
-------------------------------------------
+------+-----+------+----+-----+-----------+
|before|after|source|op  |ts_ms|transaction|
+------+-----+------+----+-----+-----------+
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
|null  |null |null  |null|null |null       |
+------+-----+------+----+-----+-----------+

If I print my schema it shows that it reads it right

{"type":"record","name":"Envelope","namespace":"namespace","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"ID","type":"double"},{"name":"name","type":["null","string"],"default":null}],"connect.name":"Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.oracle","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"schema","type":"string"},{"name":"table","type":"string"},{"name":"txId","type":["null","string"],"default":null},{"name":"scn","type":["null","string"],"default":null},{"name":"commit_scn","type":["null","string"],"default":null},{"name":"lcr_position","type":["null","string"],"default":null},{"name":"rs_id","type":["null","string"],"default":null},{"name":"ssn","type":["null","int"],"default":null},{"name":"redo_thread","type":["null","int"],"default":null}],"connect.name":"io.debezium.connector.oracle.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"connectname"}

Interestingly, I am giving schema registry's url so I dont think that schema is wrong, Code I am using is as follows:


# pyspark imports
import pyspark.sql.functions as func
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.types import StringType, TimestampType, IntegerType, BinaryType

# schema registry imports
from confluent_kafka.schema_registry import SchemaRegistryClient
import time
import pandas as pd
# import json
from pyspark.conf import SparkConf

#kafka_url = "kafka-broker:29092"
schema_registry_url = "http://localhost:8082/apis/ccompat/v6"
kafka_producer_topic = "topic"
#kafka_analyzed_topic = "wikimedia.processed"
schema_registry_subject = f"{kafka_producer_topic}-value"
#schema_registry_analyzed_data_subject = f"{kafka_analyzed_topic}-value"

KAFKA_TOPIC_NAME = "topic"
kafka_bootstrap_servers = 'ip:9092'

# UDF function
binary_to_string_udf = func.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())

def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
    sr = SchemaRegistryClient({'url': schema_registry_url})
    latest_version = sr.get_latest_version(schema_registry_subject)
    return sr, latest_version

if __name__ == "__main__":
    print("Welcome Here")
    print("Data Processing App Start Form Here")
    print(time.strftime("%Y-%m-%d %H:%M:%S"))


    conf = SparkConf().setAppName("MyApp002").setMaster("local[*]") \
            .set("spark.executor.instances", "10")\
            .set("spark.executor.memory", "28g") \
            .set("spark.executor.cores", "16") 

    spark = SparkSession.builder.master("local[*]").appName("Pyspark App to Load WSLOG From Raw To Currated In Realtime").config(conf=conf).getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    spark.conf.set("spark.sql.shuffle.partitions", "2")
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.sql.codegen.wholeStage", "false")
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")  


    wikimedia_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", KAFKA_TOPIC_NAME) \
        .option("startingOffsets", "earliest") \
        .option("spark.streaming.kafka.maxRatePerPartition", "50") \
        .load()
    
    wikimedia_df.printSchema()


    # remove first 5 bytes from value
    wikimedia_df = wikimedia_df.withColumn('fixedValue', func.expr("substring(value, 6, length(value)-5)"))

    #  get schema id from value
    #wikimedia_df = wikimedia_df.withColumn('valueSchemaId', binary_to_string_udf(func.expr("substring(value, 2, 4)")))
    #print(wikimedia_df)

    # get schema using subject name
    _, latest_version_wikimedia = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)
    print(latest_version_wikimedia.schema.schema_str)
    

    #deserialize data 
    fromAvroOptions = {"mode":"PERMISSIVE"}
    decoded_output = wikimedia_df.select(
        from_avro(
            func.col("fixedValue"), latest_version_wikimedia.schema.schema_str, fromAvroOptions
        )
        .alias("wikimedia")
    )
    wikimedia_value_df = decoded_output.select("wikimedia.*")
    wikimedia_value_df.printSchema()

    wikimedia_value_df \
        .writeStream \
        .format("console") \
        .trigger(processingTime='1 second') \
        .outputMode("append") \
        .option("truncate", "false") \
        .start() \
        .awaitTermination()

Debezium Connector Configurations I used are as follows:

{
    "name": "wslog-debezium-xstream102",
    "config":
    {
        "connector.class":"io.debezium.connector.oracle.OracleConnector",
        "tasks.max":"1",
        "database.server.name":"abc",
        "database.hostname":"ip-addr",
        "database.port":"port",
        "database.user":"username",
        "database.password":"pass",
        "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter.apicurio.registry.url": "http://localhost:8082/apis/registry/v2",
        "key.converter.apicurio.registry.auto-register": "true",
        "key.converter.apicurio.registry.find-latest": "false",
        "key.converter.enhanced.avro.schema.support": "true",
        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.url": "http://localhost:8082/apis/registry/v2",
        "value.converter.apicurio.registry.auto-register": "true",
        "value.converter.apicurio.registry.find-latest": "false",
        "value.converter.enhanced.avro.schema.support": "true",
        "database.out.server.name" : "outbound_servername",
        "database.dbname":"VAULSYS",
        "database.history.kafka.bootstrap.servers":"ipr:9092",
        "snapshot.mode":"initial",
        "database.history.kafka.topic":"schema-changes.inventory",
        "table.include.list":"tablename",
        "column.include.list": "column list",
        "database.history.store.only.captured.tables.ddl":true,
        "database.history.skip.unparseable.ddl":true,
        "offset.flush.interval.ms":10000,
        "offset.flush.timeout.ms":60000,
        "log.mining.batch.size.min":10000,
        "log.mining.batch.size.max":300000,
        "log.mining.batch.size.default":50000,
        "log.mining.view.fetch.size":20000,
        "decimal.handling.mode":"double",
        "time.precision.mode":"connect"
        
    }
}

I can see I am getting avro encoded data through kafka consumer.

TABLE606472771rڸ���b
��5A6DATA
                                           ENCODER-oracle-encodedstr�ڽ��trueENCODE

I can't figure it out my mistake here, Have anyone else face same thing while working on kafka and spark with apicurio ?


Solution

  • I have resolved this issue by just increase the the number from 5 by 10,can be seen in following code:

    ## data_df3 = datadf_reserves.withColumn("fixedValue", func.expr("substring(value, 5, length(value))")) 
    data_df3 = datadf_reserves.withColumn("fixedValue", func.expr("substring(value, 10, length(value))")) 
    

    I knew that there is always a 5 magic bytes padding present while we decode avro by confluent, But I dont know why when I use 10 it worked.