In the Tour of Beam documentation on combine functions, it says the following:
When you apply a Combine transform, you must provide the function that contains the logic for combining the elements or values. The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple workers, the combining function might be called multiple times to perform partial combining on subsets of the value collection
For a combine function, the emphasized phrase above seemed like a huge limitation that would drastically complicate how to create a function. But then in the very next section of the tutorial, there is an example which breaks this restriction, as I understand it:
Example averager in tutorial:
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, sum_count, input):
(sum, count) = sum_count
return sum + input, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, sum_count):
(sum, count) = sum_count
return sum / count if count else float('NaN')
and the tutorial says:
a local accumulator tracks the running sum of values (the numerator value for our final average division) and the number of values summed so far (the denominator value). It may be called any number of times in a distributed fashion.
To explain my question / confusion, let me create a toy example with the values
[5, 15, 1004, -3, -8, 11]
and let's say that PCollection
is split across two "instances":
[15, 1004, 11]
[5, -3, -8]
As I understand the documentation, it is saying that an average of the first subset, [15, 1004, 11]
, might be calculated twice, while the second one only once, for instance.
That would create an average of 228
instead of 171
, if I "double-counted" the representation of [15, 1004, 11]
.
This is obviously not what happens, so what am I missing?
I understand that the class provides functions that are completely commutative and associative, but not functions that are idempotent, which is what is implied when saying they are 100% correct even when "not necessarily invoked exactly once".
This is important to understand -- how many times can a combiner function be called? -- because:
I am a statistician by training, so I first thought that maybe there is some sort of Law of large numbers phenomenon, where there is no guarantee of exact correctness, but a discrepancy that is vanishingly small when working with the large datasets used with Beam.
But I think this is untrue, and the answers will be completely deterministic, even on large datasets (excluding windowing and watermarking issues, which may inject some randomness).
I think you missread the documentation. Not necessarily invoked exactly once on all values
means that it is either invoked
[5, 15, 1004, -3, -8, 11]
)[15, 1004, 11]
and [5, -3, -8]
)It does not mean (in this context) that the incoming data is processed twice. Furthermore, it may be called any number of times in a distributed fashion
means that the bundles can take any form and are defined during runtime by the runner, i.e. it could be any permutation of
[5, 15, 1004, -3, -8, 11]
[5]
, [15, 1004, -3, -8, 11]
[5, 15]
, [1004, -3, -8, 11]
[5]
, [15]
, [1004]
, [-3]
, [-8]
, [11]
That's why it states
[...] Because the input data (including the value collection) may be distributed across multiple workers, the combining function might be called multiple times to perform partial combining on subsets of the value collection
and why The combining function should be commutative and associative [...]