pysparkdatabricksspark-streamingazure-databricksspark-structured-streaming

Unexpected Behavior using WHEN | OTHERWISE


We have developed one streaming process which use many other delta tables to enrich the final data product.

Lets call it FinalDataProduct the delta table where the data is inserted, semiLayout a dataframe that contains the whole tables used to enrinch the final table that is merged into FinalDataProduct, and stagingTable a delta table that is joined and return the SHIFTS(the name of the column is 'SHIFTS' and the values we need that can be many).

This stagingTable is joinedto semiLayout dataframe by one common key (example semiLayout have key with value = 21451, so stagingTable has the same value) applying a left join.

There are times that stagingTable does not have a value that can be joined to semiLayout dataframe, which would end having the column 'SHIFTS' empty or null.

After the join, in the final operations before the merge we apply a withColumn using the following logic: .withColumn("SHIFTS", when(col("SHIFTS").isNull(), lit(1)).otherwise(col("SHIFTS"))).

In a summary we ensure that the column SHIFTS can not be a NULL Value.

However, we have been having a few case where it is a NULL Value.

What I have tried so far was:

It only has happened in a few records, for example of 1millon records 60 does not have the value but if I re run the code in batch or if I delete the checkpoints in the clone table it fills the value.

Does anyone has experience a behavior like that? Could it be that the condition of doing .withColumn("SHIFTS", when(col("SHIFTS").isNull(), lit(1).otherwise(col("SHIFTS"))) is no detecting the Null Type and could be interpretating something else? If it could be, why does it only happens in the streaming running?

If anyone has experienced something similar I would appreciate your help or suggestions. Or even if anyone has not have a some similar situation but has any idea or aproach feel free to share your comment.


Solution

  • Apparently, the code you submited is not exact.
    you wrote this:
    .withColumn("SHIFTS", when(col("SHIFTS")).isNull(), lit(1).otherwise(col("SHIFTS")))
    it should probably be this :
    .withColumn("SHIFTS", when(col("SHIFTS").isNull(), lit(1)).otherwise(col("SHIFTS")))

    You can achieve the same with : withColumn("SHIFTS", coalesce(col("SHIFTS), lit(1)))