pythonapache-beam

How to use both `with_outputs` and `with_output_types` in Apache Beam (Python SDK)?


An Apache Beam PTransform can have with_outputs and with_output_types appended to it. Eg,

pcoll | beam.CombinePerKey(sum).with_output_types(typing.Tuple[unicode, int])

and

(words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
             .with_outputs('above_cutoff_lengths', 'marked strings',
                           main='below_cutoff_strings')
)

(Both of these examples are taken from Apache Beam documentation, if you want some context.)

But I cannot seem to find any documentation on how to combine them. For instance, can I do something like this?

(words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
             .with_outputs('above_cutoff_lengths', 'marked strings',
                           main='below_cutoff_strings')
             .with_output_types(str, IndexError, str)
)

Solution

  • Disclaimer: I could be wrong, since you have not described the actual problem/error. Furthermore, the DirectRunner (which is used in the playground) completely ignores any typehints! In order to verify that this indeed solves your issue, this has to be executed in a runner which actually considers typehints (e.g., Dataflow).

    Assuming the error you encounter is

    TypeError: PTransform.with_output_types() takes 2 positional arguments but 4 were given

    If you look at the documentation of with_output_types then it only expects a single typehint. However, you are providing 3 individual ones. You need to wrap your typehints into a Tuple, e.g.

    import apache_beam as beam
    from typing import Tuple  # <- this is the important piece
    
    class DoFnWithOutputs(beam.DoFn):
      def process(self, element):
        if element == 1:
          yield "one"
        else:
          yield beam.pvalue.TaggedOutput("not_one", False)
    
    
    with beam.Pipeline() as pipeline:
      input_data = pipeline | 'Create data' >> beam.Create([1, 2, 3, 1])
    
      being_one, not_being_one = (
        input_data
        | "DoFn" >> beam.ParDo(DoFnWithOutputs())
                    .with_outputs("not_one", main="one")
                    .with_output_types(Tuple[bool, str])   # Note the wrapping within 'Tuple'
      )
      
      (
        being_one
        | "print 1" >> beam.Map(print)
      )
    
      (
        not_being_one
        | "print != 1" >> beam.Map(print)
      )
      
    

    which you can execute directly within the Beam playground.