I have an issue with apache_beam.transforms.deduplicate.Deduplicate
transformation. Please, look on a code sample below:
with beam.Pipeline() as pipeline:
(
pipeline
# | 'Load' >> beam.Create(['a', 'b', 'b']) ## <- works fine
| 'Load' >> beam.io.ReadFromText('./input.txt'). ## <- breaks Dedup
| 'Dedup' >> Deduplicate(processing_time_duration=1000).with_input_types(AnyStr)
| 'Print' >> beam.Map(print)
)
If I create collection manually – everything works fine and as expected. But when I try to load something from disk (text file, Avro files, etc.) Deduplicate stops working and throws an exception:
Traceback (most recent call last):
File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/direct/executor.py", line 370, in call
self.attempt_call(
File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/direct/executor.py", line 404, in attempt_call
evaluator.start_bundle()
File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 867, in start_bundle
self.runner.start()
File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1475, in start
self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle)
File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1460, in _invoke_bundle_method
self._reraise_augmented(exn)
File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1507, in _reraise_augmented
raise new_exn.with_traceback(tb)
File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1458, in _invoke_bundle_method
bundle_method()
File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/common.py", line 559, in invoke_start_bundle
self.signature.start_bundle_method.method_value())
File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/direct/sdf_direct_runner.py", line 122, in start_bundle
self._invoker = DoFnInvoker.create_invoker(
TypeError: create_invoker() got an unexpected keyword argument 'output_processor' [while running 'Load/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/pair']
This happens only with Deduplicate
and DeduplicatePerKey
transformations. All other things like ParDo
, Map
, etc. work fine.
I hope it can help.
Indeed I tested your code and it doesn't works, maybe I am wrong but I think the Deduplicate
PTransform
seems to be more adapted for jobs with windowing logics (processing time and event time).
It works with beam.Create
(even if it's a bounded source) but not with ReadFromText
because a type is not inferred :
E TypeError: create_invoker() got an unexpected keyword argument 'output_processor'
I propose you another solution that works in your case, and it's more adapted to deduplicate data in batch
job and bounded
source :
def test_dedup(self):
with TestPipeline() as p:
(
p
# | 'Load' >> beam.Create(['a', 'b', 'b']) ## <- works fine
| 'Load' >> beam.io.ReadFromText(
f'{ROOT_DIR}/input.txt') ## <- breaks D
# | 'Dedup' >> Deduplicate(processing_time_duration=1000).with_input_types(AnyStr)
| 'Group by' >> beam.GroupBy(lambda el: el)
| 'Get key' >> beam.Map(lambda t: t[0])
| 'Print' >> beam.Map(self.print_el)
)
The input.txt content is :
1
2
2
3
4
The output PCollections
is :
1
2
3
4
I used GroupBy
on the current element, it gives me a Tuple
=> 2 -> [2, 2]
and then I added a map
on the deduplicated key on the Tuple
.