I'm trying to run a simple Beam pipeline to extract data from a BQ table using SQL and push to a GCS bucket.
class SimplePipeline:
def run_pipeline(self, known_args, pipeline_args, streaming_flag):
pipeline_options = PipelineOptions(pipeline_args, streaming=streaming_flag, save_main_session=True)
custom_options = pipeline_options.view_as(self.get_pipeline_options_class())
log_info("Starting Pipeline")
sql_query = "SELECT col1, col2, col3 FROM `project.dataset.table`"
with Pipeline(options=pipeline_options) as pipeline:
bq_source = beam.io.BigQuerySource(query=sql_query, use_standard_sql=True)
apromore_event_schema = pa.schema([
pa.field("col1", "string",metadata={'parent':'id'}),
pa.field("col2", "string",metadata={'parent':'name'}),
pa.field("col3", "string",metadata={'parent':'age'})]
)
process_data = (pipeline
| "ReadFromBigQuery" >> beam.io.Read(bq_source)
| "Write to Parguet" >> beam.io.parquetio.WriteToParquet(file_path_prefix='gs://project-gcs/test/',
schema=event_schema,file_name_suffix='.parquet')
)
My requirement is to pass the SQL from a file (a simple .sql file) and not as a string. I want to modularize the SQL. So far, I've tried the following option - it did not work:
with open ("file_query.sql", "r") as myfile:
sql_query = myfile.read()
where the contents of my file_query.sql file look like:
"SELECT col1, col2, col3 FROM `project.dataset.table`"
Any help on this folks?
Any information on the failure that you get? I ran almost a copy from what you shared, and it seems to work fine for me.
file_query.sql
SELECT * FROM `bigquery-public-data`.`baseball`.`games_post_wide`;
pipeline.py
with beam.Pipeline(options=pipeline_options) as pipeline, \
open("file_query.sql", "r") as file_content:
sql_query = file_content.read()
bq_source = beam.io.BigQuerySource(query=sql_query, use_standard_sql=True)
(
pipeline
| 'ReadFromBigQuery' >> beam.io.Read(bq_source)
| 'Print to Console' >> beam.Map(print)
)
Running the above code is printing rows to the console.