apache-sparkpysparkapache-kafkaavro

Avro schema evolution(backward compatibility) return null with pyspark structured streaming


Avro backward compatibility return null record.

I send a record endoded by schema_ver1.avsc, and a record encoded by schema_ver2.avsc to kafka, then, I query pyspark Streaming memory sink named avro_sink3 decode by ver2 schema(avro_json_schema_ver2).

I expect records like this.

expected record

+-----------------------------------+
|from_avro(value)                   |
+-----------------------------------+
|{ver1_yuki_schema, login, 4, 1}   | 
|{ver2_yuki_schema, login2, 4, 1000}| 
+-----------------------------------+

However, I got output like below instead.

actually output1

+-----------------------------------+
|from_avro(value)                   |
+-----------------------------------+
|{null, null, null, null}           |   -> endoded by schema_ver1.avsc decoded by schema_ver2.avsc(record No.2)
|{ver2_yuki_schema, login2, 4, 1000}|  -> endoded by schema_ver2.avsc decoded by schema_ver2.avsc(record No.2)
+-----------------------------------+

How should I fixed this problem.

pyspark streming sink

memory_stream_check29 = df \
  .select("value").alias("value") \
  .writeStream \
  .format("memory") \
  .trigger(processingTime="5 seconds") \
  .option("checkpointLocation", "/tmp/kafka/avro_file11131/") \
  .queryName("avro_sink3") \
  .start()

query memory sink

spark.sql("select * from avro_sink3").select(from_avro("value",avro_json_schema_ver2, {"mode" : "PERMISSIVE"})).show(truncate=False)

schema_ver1.avsc

{
  "namespace": "root",
  "type": "record",
  "name": "Device",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "type", "type": "string" },
    { "name": "sendtime", "type": "int" },
  ]
}

schema_ver2.avsc

{
  "namespace": "root",
  "type": "record",
  "name": "Device",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "type", "type": "string" },
    { "name": "sendtime", "type": "int" },
    { "name": "temp", "type": "string", "default": "1" }
  ]
}


environment spark3.2


Solution

  • I ended up following program.

    If Schema is null then Fall back to another version.

    
    
    from pyspark.sql import functions as F
    from typing import Iterable, List, Tuple
    from pyspark.sql.avro.functions import from_avro
    
    __all__ = ["decode_per_version"]
    
    def decode_per_version(
        df,
        *,
        bytes_col: str,
        schemas: List[Tuple[int, str]], 
        key_field: str = "id", 
        passthrough: Tuple[str, ...] = ("partition", "offset"),
        mode: str = "PERMISSIVE",
    ):
    
        remain = df.select(bytes_col, *passthrough) 
        decoded = []
    
        for ver, schema_json in schemas:
            with_payload = remain.withColumn(
                "payload",
                from_avro(F.col(bytes_col), schema_json, {"mode": mode})
            )
    
            ok = (
                with_payload
                .where(F.col(f"payload.{key_field}").isNotNull())
                .select("payload.*", bytes_col, *passthrough)
                .withColumn("schema_version", F.lit(ver))
            )
            decoded.append(ok)
    
            remain = (
                with_payload
                .where(F.col(f"payload.{key_field}").isNull())
                .select(bytes_col, *passthrough)
            )
    
        # --- DLQ ---
        dlq_df = (
            remain.withColumn("schema_version", F.lit(None))
        )
    
        return decoded, dlq_df