We're trying to use DataFrame.groupBy()
when joining a one to many relationship.
@dlt.table(name = silver_table_name)
def silver():
appointmentsDf = spark.readStream.table(f"{bronze_catalog_name}.{bronze_schema_name}.{bronze_appointments_table_name}")
# TODO doesn't work
# appointmentsDf = appointmentsDf.withWatermark("inserted_datetime", "1 hour")
answersDf = spark.readStream.table(f"{bronze_catalog_name}.{bronze_schema_name}.{bronze_answers_table_name}")
# TODO - doesn't work
# answersDf = answersDf.withWatermark("answer_inserted_datetime", "1 hour")
df = appointmentsDf.join(answersDf, appointmentsDf["id"] == answersDf["appointmentId"], "inner")
# TODO - doesn't work
# df = df.withWatermark("inserted_datetime", "1 hour")
df = df.groupBy('appointmentId') \
.agg(max(when(col("prompt") == lit(QUESTION_BALL_FITTING), 'answer').otherwise(None)).alias('answer'))
We get the following error:
com.databricks.pipelines.common.errors.DLTAnalysisException: Failed to start stream inno_gametime_data_204615620297433 in either append mode or complete mode.
Append mode error: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
When we uncomment any of the .withWatermark()
statements above, even using all three of them at the same time, we are still getting the exact same error message.
Any idea what we're doing wrong?
Yeah, not an expert but been going through the same issues lately and might have some leads for you.
"Chaining multiple stateful operations on streaming Datasets is not supported with Update and Complete mode."
I know, it doesn't say that it is not supported with "Append" mode, but better safe than sorry. It's also easier to check/debug your code if you split it into smaller steps. If everything works, you can still try to combine it again afterwards.
See the code example here for joins: https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking
When talking about joins there are different requirements for various types of joins (inner is easier than outer etc.)
And check the example here for a watermark/window aggregation: https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#handling-late-data-and-watermarking
By the way, you don't necessarily have to use a window in your groupBy clause, you can also group by the event-time column (that you used in the watermark). But one of the two options you must go for, or your watermark has zero effect in that aggregation.
An effective way of checking if your watermarks work in groupBy cases, is to start an incremental DLT pipeline refresh (not a full refresh!) and look at the number of processed/written records. It should only process the streaming increment and not the total number of records... If you see the same number of records (or growing) in each DLT update, you are doing something wrong, because DLT is then switching to "complete" mode automatically (which you don't want usually in a streaming workload).