I have a struct streaming job which reads message from Kafka topic then saves to dbfs. The code is as follows:
input_stream = spark.readStream \
.format("kafka") \
.options(**kafka_options) \
.load() \
.transform(create_raw_features)
# tranformation by 7 days rolling window
def transform_func(df):
window_spec = window("event_timestamp", "7 days", "1 day")
return df \
.withWatermark(eventTime="event_timestamp", delayThreshold="2 days") \
.groupBy(window_spec.alias("window"), "customer_id") \
.agg(count("*").alias("count")) \
.select("window.end", "customer_id", "count")
result = input_stream.transform(transform_func)
query = result \
.writeStream \
.format("memory") \
.queryName("test") \
.option("truncate","false").start()
I can see the checkpointing is working fine. However, there is no data output.
spark.table("test").show(truncate=False)
Show empty table. Any clue why?
I found the issue. in the Spark documentation output mode section, it states:
Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in withWatermark() as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed).
Since I didn't specify the output mode explicitly, append
is applied implicitly, which means the first output will occur only after the watermark threshold is passed.
To get the output per micro-batch, use output mode update
or complete
instead.
This works for me now
query = result \
.writeStream \
.format("memory") \
.outputMode("update") \
.queryName("test") \
.option("truncate","false").start()