We have a batch GCP Dataflow job that is failing on a Reshuffle() step with the following error:
ValueError: Error decoding input stream with coder WindowedValueCoder[TupleCoder[LengthPrefixCoder[DeterministicFastPrimitivesCoder], StateBackedIterableCoder[LengthPrefixCoder[FastPrimitivesCoder]]]]
We have a step that generates n amount of lists, and then passes each list to the next PCollection for processing. Without a "Resfhuffle()" step, these two steps are fused together, and the lists from the first step are processed linearly, instead of in parallel. The Reshuffle() was added to get rid of the fusing and split up the lists between workers, but we are getting the error above during the "GroupByKey" step of the Reshuffle().
The ouput of Pcollection 1 is list objects, which the Reshuffle() is expected to pass to the next PCollection.
I have tried changing the output of the first PCollection from a list to an object containing a list and received the same error.
We've seen the same issue running Python3.11 and 3.12. We are using the DataflowRunner.
The pipeline is set up as follows:
with beam.Pipeline(options=beam_options) as p:
(p
| 'Get source data and split into lists of rows' >> beam.ParDo(GetDataAndSplit())
| "Reshuffle" >> beam.Reshuffle()
| 'Process Lists from previous Pcollection' >> beam.ParDo(ProcessData())
)
Seems like there is a mismatch in the expected and actual data types during the deserialization process after the shuffle. StateBackedIterableCoder is used for iterables that might not fit in memory on a single worker. The presence of this coder in the error message indicates that Beam is trying to decode an iterable (likely your list).
Try to use beam.Create within your ParDo. So instead of returning a list directly from your GetDataAndSplit ParDo, use beam.Create to emit the elements of the list individually. This will ensure proper encoding and prevent issues related to list serialization.
class GetDataAndSplit(beam.DoFn):
def process(self, element):
# ... your logic to create the list of rows ...
for row in list_of_rows:
yield row # Emit each row individually