apache-flinkapache-beamtfx

Beam + Flink: No parallelism when using SDFBoundedSourceReader


Background: I am using TFX pipelines with Flink as the runner for Beam (flink session cluster using flink-on-k8s-operator). The Flink cluster has 2 taskmanagers with 16 cores each, and parallelism is set to 32. TFX components call beam.io.ReadFromTFRecord to load data, passing in a glob file pattern. I have a dataset of TFRecords split across 160 files. When I try to run the component, processing for all 160 files ends up in a single subtask in Flink, i.e. the parallelism is effectively 1. See below images:

UI 1 UI 2

I have tried all manner of Beam/Flink options and different versions of Beam/Flink but the behaviour remains the same.

Furthermore, the behaviour affects anything that uses apache_beam.io.iobase.SDFBoundedSourceReader, e.g. apache_beam.io.parquetio.ReadFromParquet also has the same issue. Is there some obscure setting in my configuration, or is this a bug with the Flink runner? I have also searched far and wide across the internet and can't find any mentions of this issue, other than suggestions of using beam.Reshuffle which doesn't help.


Solution

  • It seems that that the obscure setting I was missing was --experiments=pre_optimize=all in the Beam pipeline options. This results in the following code being run and a RESHUFFLE being included in the Splittable DoFn expansion: https://github.com/apache/beam/blob/v2.32.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1433

    For those reading this in the future, this worked with Beam 2.32.0 and Flink 1.13.2 -- this is no doubt going to change at some point so this answer may no longer be relevant.