pythongoogle-cloud-platformgoogle-cloud-dataflowapache-beam

Apache beam pipeline function does not run in parallel


I have a Dofn function in my pipeline which is running in GCP dataflow and is suppose to do some process per products in parallel.

class Step1(DoFn):
    def process(self, element):
        # Get a list of products
        for idx, item in enumerate(product_list):
            yield product, idx

class Step2(DoFn):
    def process(self, element):
        # Get index and product
        logger.info(f"::: Processing product number {index} STARTED at {datetime.now()}:::::")
        # Do some process ....
        logger.info(f"::: FINISHED product number {index} at {datetime.now()}:::::")

with Pipeline(options=pipeline_options) as pipeline:
    results = (
        pipeline
        | "Read from PubSub" >> io.ReadFromPubSub()
        | "Product list"     >> ParDo(Step1())
        | "Process Product"  >> ParDo(Step2())
        | "Group data" >> GroupBy()
        ...
    )

So Step2 is suppose to run per product in parallel. But actually what I get in logs is:

::: Processing product number 0 STARTED at <some_time> :::::
::: FINISHED product number 0 at <some_time>:::::
::: Processing product number 1 STARTED at <some_time> :::::
::: FINISHED product number 1 at <some_time>:::::
::: Processing product number 2 STARTED at <some_time> :::::
::: FINISHED product number 2 at <some_time>:::::
::: Processing product number 3 STARTED at <some_time> :::::
::: FINISHED product number 3 at <some_time>:::::
...

That shows that Instead of running Step2 in parallel, everything is running sequentially, which takes a long time to finish for huge amount of products.

Is there something I'm missing here? Aren't ParDo functions suppose to run in parallel?

Update

As apache beam documentation suggests, I tried the following options in PipelineOptions, and I double checked if they are actually set in the job in GCP but the result was the same:


Solution

  • After checking the execution graph of my dataflow job, I've realised that both of those steps are in one fuse. This means that the input elements of an "F" stage determine it's parallelism. So if I fed it with just one element, it will be handled by a single thread.

    To prevent that from happening,I had to apply a "fusion prevention" step, like Reshuffle between the two steps.

    with Pipeline(options=pipeline_options) as pipeline:
        results = (
            pipeline
            | "Read from PubSub" >> io.ReadFromPubSub()
            | "Product list"     >> ParDo(Step1())
            | "Reshuffle" >> Reshuffle()
            | "Process Product"  >> ParDo(Step2())
            | "Group data" >> GroupBy()
            ...
        )
    

    And with that I got these in the logs:

    ::: Processing product number 17 STARTED at 2023-11-03 14:27:59.371644:::::
    ::: Processing product number 10 STARTED at 2023-11-03 14:27:59.353624:::::
    ::: Processing product number 4 STARTED at 2023-11-03 14:27:59.364676:::::
    ::: Processing product number 12 STARTED at 2023-11-03 14:27:59.327599:::::
    ::: Processing product number 14 STARTED at 2023-11-03 14:27:59.326156:::::