apache-sparkpysparknullanti-join

Pyspark: Comparing Datasets and Identify Unchanged Records


I am trying to implement a generic historization procedure for my data in PySpark. Those data have a key, several business information columns, and some additional technical metadata, which should not be considered for identifying changes, but should be saved, whenever a change was identified (for example the exact timestamp, when a record has been read).
The new data arrive in a spark dataframe, and I have also loaded the existing data into a spark dataframe already. So for example

old = spark.createDataFrame([("foo", 'f', 1), ('bar', 'b', 1), ('', None, 1)], ['text','first_character','version'])
new = spark.createDataFrame([("foo", 'f', 2), ('Bar', 'B', 2), ('', None, 2)], ['text','first_character','version']) 

old.show()
new.show()

The sample shows records with a text column (which is considered to be the key column), an information column showing the first character of the text, and my technical metadata is simplified in the column "version".

+----+---------------+-------+
|text|first_character|version|
+----+---------------+-------+
| foo|              f|      1|
| bar|              b|      1|
|    |           null|      1|
+----+---------------+-------+

+----+---------------+-------+
|text|first_character|version|
+----+---------------+-------+
| foo|              f|      2|
| Bar|              B|      2|
|    |           null|      2|
+----+---------------+-------+

Now I am looking for the unchanged records. As you can see, If we ignore the version column, the first record is unchanged, in the second record, the case of the first character was changed to upper, and the third records is also unchanged.

My first idea was to use an anti-join to get the unchanged records:

new.join(old, ['text', 'first_character'], "left_anti").show()

however this results in

+----+---------------+-------+
|text|first_character|version|
+----+---------------+-------+
|    |           null|      2|
| Bar|              B|      2|
+----+---------------+-------+

The reason is clear, the comparison of the null values for the records with blank text (which are present in both versions), does not return "equal" but "unknown".

I have a workaround for this problem:

new.select('text','first_character')\
   .exceptAll(old.select('text','first_character'))\
   .join(new.select('text','version'), 'text','inner')\
   .show()

Which returns the expected result:

+----+---------------+-------+
|text|first_character|version|
+----+---------------+-------+
| Bar|              B|      2|
+----+---------------+-------+

My question is: is there an option to make anti-joins consider two null values as equal, or is there a simpler workaround than mine?


Solution

  • You could try using .eqNullSafe()

    new.join(old,
             (new.text.eqNullSafe(old.text)) &
             (new.first_character.eqNullSafe(old.first_character)),
             "left_anti") \
        .show()