Avro backward compatibility return null record.
I send a record endoded by schema_ver1.avsc, and a record encoded by schema_ver2.avsc to kafka, then, I query pyspark Streaming memory sink named avro_sink3 decode by ver2 schema(avro_json_schema_ver2).
I expect records like this.
expected record
+-----------------------------------+
|from_avro(value) |
+-----------------------------------+
|{ver1_yuki_schema, login, 4, 1} |
|{ver2_yuki_schema, login2, 4, 1000}|
+-----------------------------------+
However, I got output like below instead.
actually output1
+-----------------------------------+
|from_avro(value) |
+-----------------------------------+
|{null, null, null, null} | -> endoded by schema_ver1.avsc decoded by schema_ver2.avsc(record No.2)
|{ver2_yuki_schema, login2, 4, 1000}| -> endoded by schema_ver2.avsc decoded by schema_ver2.avsc(record No.2)
+-----------------------------------+
How should I fixed this problem.
pyspark streming sink
memory_stream_check29 = df \
.select("value").alias("value") \
.writeStream \
.format("memory") \
.trigger(processingTime="5 seconds") \
.option("checkpointLocation", "/tmp/kafka/avro_file11131/") \
.queryName("avro_sink3") \
.start()
query memory sink
spark.sql("select * from avro_sink3").select(from_avro("value",avro_json_schema_ver2, {"mode" : "PERMISSIVE"})).show(truncate=False)
schema_ver1.avsc
{
"namespace": "root",
"type": "record",
"name": "Device",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "type", "type": "string" },
{ "name": "sendtime", "type": "int" },
]
}
schema_ver2.avsc
{
"namespace": "root",
"type": "record",
"name": "Device",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "type", "type": "string" },
{ "name": "sendtime", "type": "int" },
{ "name": "temp", "type": "string", "default": "1" }
]
}
environment spark3.2
I ended up following program.
If Schema is null then Fall back to another version.
from pyspark.sql import functions as F
from typing import Iterable, List, Tuple
from pyspark.sql.avro.functions import from_avro
__all__ = ["decode_per_version"]
def decode_per_version(
df,
*,
bytes_col: str,
schemas: List[Tuple[int, str]],
key_field: str = "id",
passthrough: Tuple[str, ...] = ("partition", "offset"),
mode: str = "PERMISSIVE",
):
remain = df.select(bytes_col, *passthrough)
decoded = []
for ver, schema_json in schemas:
with_payload = remain.withColumn(
"payload",
from_avro(F.col(bytes_col), schema_json, {"mode": mode})
)
ok = (
with_payload
.where(F.col(f"payload.{key_field}").isNotNull())
.select("payload.*", bytes_col, *passthrough)
.withColumn("schema_version", F.lit(ver))
)
decoded.append(ok)
remain = (
with_payload
.where(F.col(f"payload.{key_field}").isNull())
.select(bytes_col, *passthrough)
)
# --- DLQ ---
dlq_df = (
remain.withColumn("schema_version", F.lit(None))
)
return decoded, dlq_df