pythonsqlgoogle-cloud-dataflowapache-beamdataflow

Pass/Refer a SQL file in Apache Beam instead of string


I'm trying to run a simple Beam pipeline to extract data from a BQ table using SQL and push to a GCS bucket.

class SimplePipeline:
    def run_pipeline(self, known_args, pipeline_args, streaming_flag):
        pipeline_options = PipelineOptions(pipeline_args, streaming=streaming_flag, save_main_session=True)
        custom_options = pipeline_options.view_as(self.get_pipeline_options_class())
        log_info("Starting Pipeline")
        
        sql_query = "SELECT col1, col2, col3 FROM `project.dataset.table`"

        with Pipeline(options=pipeline_options) as pipeline:
            bq_source = beam.io.BigQuerySource(query=sql_query, use_standard_sql=True)


            apromore_event_schema = pa.schema([
                        pa.field("col1", "string",metadata={'parent':'id'}),
                        pa.field("col2", "string",metadata={'parent':'name'}),
                        pa.field("col3", "string",metadata={'parent':'age'})]
                    )

            process_data = (pipeline
                | "ReadFromBigQuery" >> beam.io.Read(bq_source)
                | "Write to Parguet" >> beam.io.parquetio.WriteToParquet(file_path_prefix='gs://project-gcs/test/',
                   schema=event_schema,file_name_suffix='.parquet')
            )

My requirement is to pass the SQL from a file (a simple .sql file) and not as a string. I want to modularize the SQL. So far, I've tried the following option - it did not work:

with open ("file_query.sql", "r") as myfile:
    sql_query = myfile.read()

where the contents of my file_query.sql file look like: "SELECT col1, col2, col3 FROM `project.dataset.table`"

Any help on this folks?


Solution

  • Any information on the failure that you get? I ran almost a copy from what you shared, and it seems to work fine for me.

    file_query.sql

    SELECT * FROM `bigquery-public-data`.`baseball`.`games_post_wide`;
    

    pipeline.py

    with beam.Pipeline(options=pipeline_options) as pipeline, \
        open("file_query.sql", "r") as file_content:
      sql_query = file_content.read()
      bq_source = beam.io.BigQuerySource(query=sql_query, use_standard_sql=True)
    
      (
          pipeline
          | 'ReadFromBigQuery' >> beam.io.Read(bq_source)
          | 'Print to Console' >> beam.Map(print)
      )
    
    

    Running the above code is printing rows to the console.