pythonapache-beamapache-beam-io

Apache Beam – issue with Deduplicate function


I have an issue with apache_beam.transforms.deduplicate.Deduplicate transformation. Please, look on a code sample below:

    with beam.Pipeline() as pipeline:
        (
            pipeline
            # | 'Load' >> beam.Create(['a', 'b', 'b']) ## <- works fine
            | 'Load' >> beam.io.ReadFromText('./input.txt'). ## <- breaks Dedup
            | 'Dedup' >> Deduplicate(processing_time_duration=1000).with_input_types(AnyStr)
            | 'Print' >> beam.Map(print)
        )

If I create collection manually – everything works fine and as expected. But when I try to load something from disk (text file, Avro files, etc.) Deduplicate stops working and throws an exception:

Traceback (most recent call last):
  File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/direct/executor.py", line 370, in call
    self.attempt_call(
  File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/direct/executor.py", line 404, in attempt_call
    evaluator.start_bundle()
  File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 867, in start_bundle
    self.runner.start()
  File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1475, in start
    self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle)
  File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1460, in _invoke_bundle_method
    self._reraise_augmented(exn)
  File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1507, in _reraise_augmented
    raise new_exn.with_traceback(tb)
  File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/common.py", line 1458, in _invoke_bundle_method
    bundle_method()
  File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/common.py", line 559, in invoke_start_bundle
    self.signature.start_bundle_method.method_value())
  File "/Users/ds/.pyenv/versions/3.9.14/lib/python3.9/site-packages/apache_beam/runners/direct/sdf_direct_runner.py", line 122, in start_bundle
    self._invoker = DoFnInvoker.create_invoker(
TypeError: create_invoker() got an unexpected keyword argument 'output_processor' [while running 'Load/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/pair']

This happens only with Deduplicate and DeduplicatePerKey transformations. All other things like ParDo, Map, etc. work fine.


Solution

  • I hope it can help.

    Indeed I tested your code and it doesn't works, maybe I am wrong but I think the Deduplicate PTransform seems to be more adapted for jobs with windowing logics (processing time and event time).

    It works with beam.Create (even if it's a bounded source) but not with ReadFromText because a type is not inferred :

    E     TypeError: create_invoker() got an unexpected keyword argument 'output_processor'
    

    I propose you another solution that works in your case, and it's more adapted to deduplicate data in batch job and bounded source :

        def test_dedup(self):
            with TestPipeline() as p:
                (
                        p
                        # | 'Load' >> beam.Create(['a', 'b', 'b'])  ## <- works fine
                        | 'Load' >> beam.io.ReadFromText(
                    f'{ROOT_DIR}/input.txt')  ## <- breaks D
                        # | 'Dedup' >> Deduplicate(processing_time_duration=1000).with_input_types(AnyStr)
                        | 'Group by' >> beam.GroupBy(lambda el: el)
                        | 'Get key' >> beam.Map(lambda t: t[0])
                        | 'Print' >> beam.Map(self.print_el)
                )
    

    The input.txt content is :

    1
    2
    2
    3
    4
    

    The output PCollections is :

    1
    2
    3
    4
    

    I used GroupBy on the current element, it gives me a Tuple => 2 -> [2, 2] and then I added a map on the deduplicated key on the Tuple.