I developed a Python Kafka producer that sends multiple json records as a nd-json binary string to a Kafka topic. Then I'm trying to read these messages in Spark Structured Streaming with PySpark as follow:
events_df = select(from_json(col("value").cast("string"), schema).alias("value"))
but this code works only with a single json documents. If the value contains multiple records as a newline delimited json, Spark can't decode it correctly.
I don't want to send a kafka message for each single event. How can I achieve this?
I managed to do what I was looking for in this way, splitting the full text string by newline and then exploding the array in rows to be parsed with the schema:
events = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "quickstart-events") \
.option("startingOffsets", "earliest")\
.load()\
.selectExpr("CAST(value AS STRING) as data")
events = events.select(explode(split(events.data, '\n')))
events = events.select(from_json(col("col"), event_schema).alias('value'))
events = events.selectExpr('value.*')```