Background: I am using TFX pipelines with Flink as the runner for Beam (flink session cluster using flink-on-k8s-operator). The Flink cluster has 2 taskmanagers with 16 cores each, and parallelism is set to 32. TFX components call beam.io.ReadFromTFRecord
to load data, passing in a glob file pattern. I have a dataset of TFRecords split across 160 files. When I try to run the component, processing for all 160 files ends up in a single subtask in Flink, i.e. the parallelism is effectively 1. See below images:
I have tried all manner of Beam/Flink options and different versions of Beam/Flink but the behaviour remains the same.
Furthermore, the behaviour affects anything that uses apache_beam.io.iobase.SDFBoundedSourceReader
, e.g. apache_beam.io.parquetio.ReadFromParquet
also has the same issue. Is there some obscure setting in my configuration, or is this a bug with the Flink runner? I have also searched far and wide across the internet and can't find any mentions of this issue, other than suggestions of using beam.Reshuffle
which doesn't help.
It seems that that the obscure setting I was missing was --experiments=pre_optimize=all
in the Beam pipeline options. This results in the following code being run and a RESHUFFLE
being included in the Splittable DoFn expansion: https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1433
For those reading this in the future, this worked with Beam 2.32.0 and Flink 1.13.2 -- this is no doubt going to change at some point so this answer may no longer be relevant.