apache-sparkpysparkapache-kafkaspark-structured-streamingndjson

Read newline delimited json from Kafka message in Spark Structured Streaming


I developed a Python Kafka producer that sends multiple json records as a nd-json binary string to a Kafka topic. Then I'm trying to read these messages in Spark Structured Streaming with PySpark as follow:

events_df = select(from_json(col("value").cast("string"), schema).alias("value"))

but this code works only with a single json documents. If the value contains multiple records as a newline delimited json, Spark can't decode it correctly.

I don't want to send a kafka message for each single event. How can I achieve this?


Solution

  • I managed to do what I was looking for in this way, splitting the full text string by newline and then exploding the array in rows to be parsed with the schema:

        events = spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option("subscribe", "quickstart-events") \
            .option("startingOffsets", "earliest")\
            .load()\
            .selectExpr("CAST(value AS STRING) as data")
        
        events = events.select(explode(split(events.data, '\n')))
        events = events.select(from_json(col("col"), event_schema).alias('value'))
        events = events.selectExpr('value.*')```