For development purposes, I'd like to cache the results of queries in BigQuery made by beam.io.ReadFromBigQuery
connector - so I'd be able to load them quickly from the local file system when running the exact same query in the next times.
The problem is that I cannot run any PTransform before beam.io.ReadFromBigQuery
to validate existence of caching and skip the reading from BigQuery as a result.
Currently I came up with two possible solutions:
beam.DoFn
for reading from BigQuery. It will include the caching mechanism, but might underperform comparing to the existing connector. One variation might be inheritance of the existing connector - but it will require knowledge of Beam "under the hood" - which might be overwhelming.apache_beam.io.textio.ReadAllFromText
or beam.io.ReadFromBigQuery
, respectively).I found out that preceding beam.io.ReadFromBigQuery
by any other PTransform is impossible by design. Unfortunately it's currently not well reflected in the Python SDK docs - but according to Apache Beam's official Java documentation: "A root PTransform conceptually has no input" - and as the equivalent java PTransform BigQueryIO.Read() inherits from PBegin
, it puts the seal on preceding it with something else.
However, I found a workaround which resembles the second approach that I suggested in the question - implementing a beam.PTransform
(not beam.DoFn
) that dynamically returns the appropriate PTransform when building the pipeline, according to the existence or inexistence of cache. It looks like this:
class ReadFromBigQueryWithCache(beam.PTransform):
def __init__(self, query):
super().__init__()
self.query = query
def expand(self, input_or_inputs: InputT) -> OutputT:
## Implement here logic for caching
## if cached:
## return input_or_inputs | beam.io.ReadFromAvro(file_pattern="path/to/cached_results")
## else:
return input_or_inputs | beam.io.ReadFromBigQuery(
query=self.query
)