I'm working on an Apache Beam pipeline that processes data and writes it to BigQuery. The pipeline works perfectly when using the DirectRunner, but when I switch to the DataflowRunner, it completes without errors or warnings but doesn't insert any rows into BigQuery. Additionally, I see large leftover files in the temporary directory of my Cloud Storage bucket (gs://my-bucket/temp/bq_load/...
), and no data appears in the target table.
Here’s the pipeline structure:
worker_options.sdk_container_image = '...'
with beam.Pipeline(options=pipeline_options) as p:
processed_data = (
p
| "ReadFiles" >> beam.Create(FILE_LIST)
| "ProcessFiles" >> beam.ParDo(ProcessAvroFileDoFn())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
table=f"{PROJECT_ID}:{DATASET_ID}.{TABLE_ID}",
schema=BQ_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
Key Observations:
What I’ve Tried:
Inspecting the leftover temp files, I downloaded the temp file and verified that it contains valid NDJSON rows. Manually uploading this file to BigQuery using the bq load command works fine.
Testing with other datasets: I tried many different inputs, but the issue persists.
Checking Dataflow logs: I looked at the logs in the Dataflow Monitoring Console but found no errors or warnings.
Different service account: a service account with insufficient dataflow permissions throws an error. It therefore seems unlikely that the issue is in permissions for the workers.
I saw one other thread about this (Can't make apache beam write outputs to bigquery when using DataflowRunner) but nothing got resolved there.
The problem turned out to be that I defined the google Storage Client object globally in my script.
When I refactored the code to be modular, and put the storage client & bucket initialization in the setup function of my DoFn, it started working.
I don't know why it failed silently like this and left no trace, this was such a pain to debug.