I am running a kafka producer code on databricks 12.2. I am testing avro serialization of message with help of confluent schema registry. I configured 'to_avro' function to read the schema from schema registry but I am getting the below error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 99.0 failed 4 times, most recent failure: Lost task 0.3 in stage 99.0 (TID 125) (10.52.30.102 executor 0): org.spark_project.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
df_tpch_orders.withColumn("value",to_avro(
data = df_tpch_orders.o_comment,
options = schema_registry_options,
schemaRegistryAddress = schema_registry_address,
subject = F.lit("demo-topic-value"),
))
The code works when I pass the avro schema instead of reading from schema registry. I also confirmed that the scheme is present under the correct subject name.
I found this , which says the it might be because the data is incompatible with the provided schema, but this is not true in my case because when I hardcode (or read the schema manually from using confluent API and pass it) works.
We found a similar issue. There was a clue in the Spark log4j logs.
25/03/05 13:12:17 DEBUG RestService: Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"key_struct\",\"fields\":...}
For us, the culprit was the conversion to an Avro schema, which sets the name key_struct
. This mismatched the name (and namespace) that was already in the schema registry. In this case, the client reports that the schema does not exist even if the data schema is identical.
Note, to see the above log lines you may need to add:
log4jLogger = spark._jvm.org.apache.log4j
logger = log4jLogger.LogManager.getLogger(__name__)
spark.sparkContext.setLogLevel("DEBUG")
spark.conf.set("log4j.logger.org.spark_project.confluent.kafka.schemaregistry.client.rest.RestService", "DEBUG")