google-cloud-platformgoogle-cloud-dataflowapache-beam

Concurrent Processing In GCP dataflow Job


I am trying to run three different classes concurrently by using ThreadPoolExecutor. But it seems it dataflow job not running concurrently and it is running each class one after the another. And also if I use max_worker inside the ThreadPoolExecutor, it is still scaling down to one worker once the dataflow starts running. I am not able to figure it out how can I modify the code so that it will run all the classes concurrently. I have used autoscaling_algorithm=NONE as well while running the code using console. But it is showing unrecognized argument for the same.

import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import concurrent.futures

class StudentId(beam.DoFn):
    def process(self, element):
        # code block for generating unique ids
        pass

class ReadExcel(beam.DoFn):
    def process(self, element, file_path):
        # code block for reading Excel file
        pass

class DqcP(beam.DoFn):
    def process(self, element, bq_project, sid):
        # code block for dqc_p function
        pass

class DqcR(beam.DoFn):
    def process(self, element, bq_project, sid):
        # code block for dqc_r function
        pass

def run_pipeline(pipeline):
    return pipeline.run()

def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--landing', required=True, type=str)
    parser.add_argument('--BQ_project', required=True, type=str)

    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Generate unique ids
        sid = (pipeline | 'sid' >> beam.Create([None]) | 'sid_gen' >> beam.ParDo(StudentId()))

        # Read reference Excel file
        ref_excel = (pipeline | 'start reading' >> beam.Create([None])
                     | 'read ref excel' >> beam.ParDo(ReadExcel(), known_args.landing))

    # Run DQC_P and DQC_R concurrently
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Submit DQC_P pipeline
        dqc_p_future = executor.submit(run_pipeline, beam.Pipeline(options=pipeline_options))
        dqc_p_pipeline = dqc_p_future.result()
        dqc_p_result = (ref_excel | 'dqc_p' >> beam.ParDo(DqcP(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))

        # Submit DQC_R pipeline
        dqc_r_future = executor.submit(run_pipeline, beam.Pipeline(options=pipeline_options))
        dqc_r_pipeline = dqc_r_future.result()
        dqc_r_result = (ref_excel | 'dqc_r' >> beam.ParDo(DqcR(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))

    # Wait for DQC_P and DQC_R pipelines to finish
    dqc_p_result.pipeline.run()
    dqc_r_result.pipeline.run()

if __name__ == '__main__':
    run()


Solution

  • Your code won't work. I do not know why you need concurrent.futures.ThreadPoolExecutor(). Your code is supposed to define the entire pipeline and then submit it to Dataflow, which will be executed by many workers.

    Your code should be changed to something like this:

    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Generate unique ids
        sid = (pipeline | 'sid' >> beam.Create([None]) | 'sid_gen' >> beam.ParDo(StudentId()))
    
        # Read reference Excel file
        ref_excel = (pipeline | 'start reading' >> beam.Create([None])
                         | 'read ref excel' >> beam.ParDo(ReadExcel(), known_args.landing))
    
        ref_excel | 'dqc_p' >> beam.ParDo(DqcP(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))
    
        ref_excel | 'dqc_r' >> beam.ParDo(DqcR(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))