The Spark Scala code snippet below reproduces the behavior I'm trying to understand. At a high level, we construct two tuples each containing a DF and a Bloom filter of the id column of the respective DF. Then we filter b
such that any rows with IDs that are contained in a row in a
are removed, and store the union of this filtered result and a
as c.
val a_df = Seq(("b", List("4", "16"))).toDF("id", "data")
val a_bloom_filter = a_df.stat.bloomFilter(col("id"), 2, 0.000001)
val a = (a_df, a_bloom_filter)
println("a")
a_df.show(20, false)
val b_df = Seq(("b", List("4")), ("c", List("4"))).toDF("id", "data")
val b_bloom_filter = b_df.stat.bloomFilter(col("id"), 2, 0.000001)
val b = (b_df, b_bloom_filter)
println("b")
b_df.show(20, false)
val a_bloom_filter_udf = udf((s: String) => !a._2.mightContain(s))
val filtered_b_df = b._1.filter(a_bloom_filter_udf(col("id")))
val c = a._1.union(filtered_b_df)
println("c")
c.show(20, false)
val merged_bloom_filter = a._2.mergeInPlace(b._2)
println("c")
c.show(20, false)
Running this in the Spark REPL produces output I do not understand:
a
+---+-------+
|id |data |
+---+-------+
|b |[4, 16]|
+---+-------+
b
+---+-------+
|id |data |
+---+-------+
|b |[4, 16]|
+---+-------+
c
+---+-------+
|id |data |
+---+-------+
|b |[4, 16]|
|c |[4] |
+---+-------+
c
+---+-------+
|id |data |
+---+-------+
|b |[4, 16]|
+---+-------+
Specifically, why does c
seem to change when we perform the mergeInPlace
operation? My expectation is that c
would not change between calls to show
.
Reading the documentation for mergeInPlace
, I see that "The mutations happen to this instance.", which is a._2
, or a
's Bloom filter in this case.
My current hypothesis is that after the mergeInPlace
call, a._2
gets mutated, so a._2 := a._2 OR b._2
, or effectively (b, c)
in this example. Then for the second show()
, it seems like everything gets re-evaluated, i.e. a._2 = (b, c)
, so
a._2 = (b, c)
filtered_b_df
is re-evaluated to an empty DF because both of b._1
's IDs are found in the "new" Bloom filter a._2
c
is re-evaluated as the union between a._1
and filtered_b_df
(an empty DF), so effectively just a._1
, which is exactly what we see from the second show()
This belief is seemingly reinforced by the fact that b._2.mergeInPlace(a._2)
instead of a._2.mergeInPlace(b._2)
produces the expected behavior (i.e. c does not change) because a._2
is not clobbered which triggers some sort of re-evaluation.
Yep. You answered it yourself, even though you find it odd. show()
is an Action
that (re-) triggers the execution path.
The reevaluation occurs ... explanation for val merged_bloom_filter = a._2.mergeInPlace(b._2)
is correct. But not for the reversing of this. It is just that the reverse situation i.e. logic for mergeInPlace
is not actually used in your code on re-execution, e.g b._2
.
So, if seeking confirmation, then it is confirmed. And there is no unexpected behaviour.