I am trying to run three different classes concurrently by using ThreadPoolExecutor
. But it seems it dataflow job not running concurrently and it is running each class one after the another. And also if I use max_worker
inside the ThreadPoolExecutor
, it is still scaling down to one worker once the dataflow starts running. I am not able to figure it out how can I modify the code so that it will run all the classes concurrently. I have used autoscaling_algorithm=NONE
as well while running the code using console. But it is showing unrecognized argument for the same.
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import concurrent.futures
class StudentId(beam.DoFn):
def process(self, element):
# code block for generating unique ids
pass
class ReadExcel(beam.DoFn):
def process(self, element, file_path):
# code block for reading Excel file
pass
class DqcP(beam.DoFn):
def process(self, element, bq_project, sid):
# code block for dqc_p function
pass
class DqcR(beam.DoFn):
def process(self, element, bq_project, sid):
# code block for dqc_r function
pass
def run_pipeline(pipeline):
return pipeline.run()
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--landing', required=True, type=str)
parser.add_argument('--BQ_project', required=True, type=str)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as pipeline:
# Generate unique ids
sid = (pipeline | 'sid' >> beam.Create([None]) | 'sid_gen' >> beam.ParDo(StudentId()))
# Read reference Excel file
ref_excel = (pipeline | 'start reading' >> beam.Create([None])
| 'read ref excel' >> beam.ParDo(ReadExcel(), known_args.landing))
# Run DQC_P and DQC_R concurrently
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit DQC_P pipeline
dqc_p_future = executor.submit(run_pipeline, beam.Pipeline(options=pipeline_options))
dqc_p_pipeline = dqc_p_future.result()
dqc_p_result = (ref_excel | 'dqc_p' >> beam.ParDo(DqcP(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))
# Submit DQC_R pipeline
dqc_r_future = executor.submit(run_pipeline, beam.Pipeline(options=pipeline_options))
dqc_r_pipeline = dqc_r_future.result()
dqc_r_result = (ref_excel | 'dqc_r' >> beam.ParDo(DqcR(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))
# Wait for DQC_P and DQC_R pipelines to finish
dqc_p_result.pipeline.run()
dqc_r_result.pipeline.run()
if __name__ == '__main__':
run()
Your code won't work. I do not know why you need concurrent.futures.ThreadPoolExecutor()
. Your code is supposed to define the entire pipeline and then submit it to Dataflow, which will be executed by many workers.
Your code should be changed to something like this:
with beam.Pipeline(options=pipeline_options) as pipeline:
# Generate unique ids
sid = (pipeline | 'sid' >> beam.Create([None]) | 'sid_gen' >> beam.ParDo(StudentId()))
# Read reference Excel file
ref_excel = (pipeline | 'start reading' >> beam.Create([None])
| 'read ref excel' >> beam.ParDo(ReadExcel(), known_args.landing))
ref_excel | 'dqc_p' >> beam.ParDo(DqcP(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))
ref_excel | 'dqc_r' >> beam.ParDo(DqcR(), known_args.BQ_project, beam.pvalue.AsSingleton(sid)))