I am working on pyspark, getting avro encoded data through apicurio from kafka topic with spark structured streaming and want to de-serialize it and apply transformations and then save it to hive. I am facing following error if I donot use "mode as PERMISSIVE":
ERROR: " Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'."
If I use "mode as PERMISSIVE", then I got all nulls as seen below:
-------------------------------------------
Batch: 8
-------------------------------------------
+------+-----+------+----+-----+-----------+
|before|after|source|op |ts_ms|transaction|
+------+-----+------+----+-----+-----------+
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
|null |null |null |null|null |null |
+------+-----+------+----+-----+-----------+
If I print my schema it shows that it reads it right
{"type":"record","name":"Envelope","namespace":"namespace","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"ID","type":"double"},{"name":"name","type":["null","string"],"default":null}],"connect.name":"Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.oracle","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"schema","type":"string"},{"name":"table","type":"string"},{"name":"txId","type":["null","string"],"default":null},{"name":"scn","type":["null","string"],"default":null},{"name":"commit_scn","type":["null","string"],"default":null},{"name":"lcr_position","type":["null","string"],"default":null},{"name":"rs_id","type":["null","string"],"default":null},{"name":"ssn","type":["null","int"],"default":null},{"name":"redo_thread","type":["null","int"],"default":null}],"connect.name":"io.debezium.connector.oracle.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"connectname"}
Interestingly, I am giving schema registry's url so I dont think that schema is wrong, Code I am using is as follows:
# pyspark imports
import pyspark.sql.functions as func
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.types import StringType, TimestampType, IntegerType, BinaryType
# schema registry imports
from confluent_kafka.schema_registry import SchemaRegistryClient
import time
import pandas as pd
# import json
from pyspark.conf import SparkConf
#kafka_url = "kafka-broker:29092"
schema_registry_url = "http://localhost:8082/apis/ccompat/v6"
kafka_producer_topic = "topic"
#kafka_analyzed_topic = "wikimedia.processed"
schema_registry_subject = f"{kafka_producer_topic}-value"
#schema_registry_analyzed_data_subject = f"{kafka_analyzed_topic}-value"
KAFKA_TOPIC_NAME = "topic"
kafka_bootstrap_servers = 'ip:9092'
# UDF function
binary_to_string_udf = func.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
sr = SchemaRegistryClient({'url': schema_registry_url})
latest_version = sr.get_latest_version(schema_registry_subject)
return sr, latest_version
if __name__ == "__main__":
print("Welcome Here")
print("Data Processing App Start Form Here")
print(time.strftime("%Y-%m-%d %H:%M:%S"))
conf = SparkConf().setAppName("MyApp002").setMaster("local[*]") \
.set("spark.executor.instances", "10")\
.set("spark.executor.memory", "28g") \
.set("spark.executor.cores", "16")
spark = SparkSession.builder.master("local[*]").appName("Pyspark App to Load WSLOG From Raw To Currated In Realtime").config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.shuffle.partitions", "2")
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.codegen.wholeStage", "false")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
wikimedia_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", KAFKA_TOPIC_NAME) \
.option("startingOffsets", "earliest") \
.option("spark.streaming.kafka.maxRatePerPartition", "50") \
.load()
wikimedia_df.printSchema()
# remove first 5 bytes from value
wikimedia_df = wikimedia_df.withColumn('fixedValue', func.expr("substring(value, 6, length(value)-5)"))
# get schema id from value
#wikimedia_df = wikimedia_df.withColumn('valueSchemaId', binary_to_string_udf(func.expr("substring(value, 2, 4)")))
#print(wikimedia_df)
# get schema using subject name
_, latest_version_wikimedia = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)
print(latest_version_wikimedia.schema.schema_str)
#deserialize data
fromAvroOptions = {"mode":"PERMISSIVE"}
decoded_output = wikimedia_df.select(
from_avro(
func.col("fixedValue"), latest_version_wikimedia.schema.schema_str, fromAvroOptions
)
.alias("wikimedia")
)
wikimedia_value_df = decoded_output.select("wikimedia.*")
wikimedia_value_df.printSchema()
wikimedia_value_df \
.writeStream \
.format("console") \
.trigger(processingTime='1 second') \
.outputMode("append") \
.option("truncate", "false") \
.start() \
.awaitTermination()
Debezium Connector Configurations I used are as follows:
{
"name": "wslog-debezium-xstream102",
"config":
{
"connector.class":"io.debezium.connector.oracle.OracleConnector",
"tasks.max":"1",
"database.server.name":"abc",
"database.hostname":"ip-addr",
"database.port":"port",
"database.user":"username",
"database.password":"pass",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://localhost:8082/apis/registry/v2",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.find-latest": "false",
"key.converter.enhanced.avro.schema.support": "true",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://localhost:8082/apis/registry/v2",
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "false",
"value.converter.enhanced.avro.schema.support": "true",
"database.out.server.name" : "outbound_servername",
"database.dbname":"VAULSYS",
"database.history.kafka.bootstrap.servers":"ipr:9092",
"snapshot.mode":"initial",
"database.history.kafka.topic":"schema-changes.inventory",
"table.include.list":"tablename",
"column.include.list": "column list",
"database.history.store.only.captured.tables.ddl":true,
"database.history.skip.unparseable.ddl":true,
"offset.flush.interval.ms":10000,
"offset.flush.timeout.ms":60000,
"log.mining.batch.size.min":10000,
"log.mining.batch.size.max":300000,
"log.mining.batch.size.default":50000,
"log.mining.view.fetch.size":20000,
"decimal.handling.mode":"double",
"time.precision.mode":"connect"
}
}
I can see I am getting avro encoded data through kafka consumer.
TABLE606472771rڸ���b
��5A6DATA
ENCODER-oracle-encodedstr�ڽ��trueENCODE
I can't figure it out my mistake here, Have anyone else face same thing while working on kafka and spark with apicurio ?
I have resolved this issue by just increase the the number from 5 by 10,can be seen in following code:
## data_df3 = datadf_reserves.withColumn("fixedValue", func.expr("substring(value, 5, length(value))"))
data_df3 = datadf_reserves.withColumn("fixedValue", func.expr("substring(value, 10, length(value))"))
I knew that there is always a 5 magic bytes padding present while we decode avro by confluent, But I dont know why when I use 10 it worked.