pythongoogle-cloud-platformencodinggoogle-cloud-dataflowapache-beam

AttributeError: 'tuple' object has no attribute 'encode' during stateful DoFn


I am using a stateful and timely DoFn to process data 2 seconds after the end of the fixed window I am implementing.

I have tested a reproducible example of my code inside the Apache Beam playground. My data is in KV[str, str] format as an input to the DoFn. The only difference I can think of between the playground and my code is that the playground uses a DirectRunner and I am using a DataflowRunner.

Before my DoFn I have another DoFn that mutates the input PCollection into the format the stateful DoFn is expecting:

class AddKeys(beam.DoFn):
    def __init__(self, settings):
        self.settings = settings

    def process(self, element):
        data = element["data"]

        for setting in self.settings:
            if setting["data"] == data:
                yield [
                    ("tuple key 1",
                     setting["tuple key 1"]),
                    ("tuple key 2", setting["tuple key 2"]),
                    ("tuple key 3", setting["tuple key 3"]),
                    ("element", str(element))
                ]

Then my stateful DoFn should take the output and process it:

class ProcessCollection(beam.DoFn):
    EXPIRY_TIMER = TimerSpec('expiry', TimeDomain.WATERMARK)
    BUFFER_STATE = BagStateSpec(
        'buffer', ListCoder(StrUtf8Coder()))

    def process(self, element,
                timer=beam.DoFn.TimerParam(EXPIRY_TIMER),
                window=beam.DoFn.WindowParam,
                buffer=beam.DoFn.StateParam(BUFFER_STATE)):

        timer.set(window.end + Duration(seconds=2))

        buffer.add(str(element))

    @on_timer(EXPIRY_TIMER)
    def expiry(self, buffer=beam.DoFn.StateParam(BUFFER_STATE)):
        events = buffer.read()

        for event in events:
            yield ''.join(event)

        buffer.clear()

Calling DoFn method:

# continue with processing & branch to stateful DoFn
extra_processing = (
    raw_data_processing
    | "Add Group Keys"
    >> beam.Map(
        lambda message: add_group_key(
            message, SETTINGS)
        )
    | "Fixed Window"
    >> beam.WindowInto(
        window.FixedWindows(self.window_length),
        # if message late by 700ms, still accept
        allowed_lateness=window.Duration(seconds=0.7)
    )
    | "Group" >> beam.GroupByKey()
    | "Process Further" >> beam.ParDo(OtherDoFn(SETTINGS, CONFIG))
)

# process data with stateful DoFn
(
    extra_processing
    | "Add Keys" >> beam.ParDo(AddKeys(SETTINGS)).with_output_types(KV[str, str])
    | "Process Collection" >> beam.ParDo(ProcessCollection())
    | 'Log' >> beam.LogElements(with_timestamp=True)
)

The error I receive from Google Cloud is:

File "/usr/local/lib/python3.10/site-packages/apache_beam/coders/coders.py", line 429, in encode
    return value.encode('utf-8')
AttributeError: 'tuple' object has no attribute 'encode' [while running 'Add Keys-ptransform-51']

Full stack trace in pastebin due to size

Can anyone determine why this might be occurring?


Solution

  • You are right in your assumption why it does work with the DirectRunner, because this runner does not encode anything.

    I am not familiar with KV[...], since I have never used that function and/or heard of it. So I am not sure what you are telling Beam to encode with this. Nevertheless, I encountered your error before and it is definitely caused by a missmatch of the argument within with_output_types (or with_input_types for that matter) and what you are actually returning.

    In order to fix your problem, you would need to add the correct type hints. Thought, I am not sure what they are in your case, since you are returning a list of tuples with strings..? However, you can verify that this is indeed causing the issue, by starting the pipeline without the with_output_types - then Dataflow should stop showing the error.

    Edit:

    Stateful DoFns work per key (and window). If you want to buffer (and eventually join - as defined in your ProcessCollection) all of the incoming data, then you would need to create a dummy key. This can be anything (int, bool, str), you just have to make sure it is the same for every datapoint, e.g.

    yield (
       "my_dummy_key",
       [
           ("tuple key 1", setting["tuple key 1"]),
           ("tuple key 2", setting["tuple key 2"]),
           ("tuple key 3", setting["tuple key 3"]),
           ("element", str(element))
       ]
    )
    

    By doing so, the whole list will always added to your buffer.

    Alternatively, if you want to join only the settings which belong to a specific key, you would need to rewrite your AddKey and make it dynamic

    yield (
        self.get_dynamic_key(),    # the dynamic key method needs to be tailored to your use case / data
        setting[self.get_dynamic_key()]
    )
    

    This results in n different buffers (where n is the number of dynamic keys) and only those belonging to the same key are then buffered and joined within ProcessCollection.