google-bigquerybeamgoogle-bigquery-storage-api

When using Apache Beam locally, how to utilize persistant caching for queries in BigQuery?


For development purposes, I'd like to cache the results of queries in BigQuery made by beam.io.ReadFromBigQuery connector - so I'd be able to load them quickly from the local file system when running the exact same query in the next times.

The problem is that I cannot run any PTransform before beam.io.ReadFromBigQuery to validate existence of caching and skip the reading from BigQuery as a result.

Currently I came up with two possible solutions:

  1. Creating a customized beam.DoFn for reading from BigQuery. It will include the caching mechanism, but might underperform comparing to the existing connector. One variation might be inheritance of the existing connector - but it will require knowledge of Beam "under the hood" - which might be overwhelming.
  2. Implementing the caching when building the pipeline, and the resulting step will be determined according the existence or inexistence of the cache (apache_beam.io.textio.ReadAllFromText or beam.io.ReadFromBigQuery, respectively).

Solution

  • I found out that preceding beam.io.ReadFromBigQuery by any other PTransform is impossible by design. Unfortunately it's currently not well reflected in the Python SDK docs - but according to Apache Beam's official Java documentation: "A root PTransform conceptually has no input" - and as the equivalent java PTransform BigQueryIO.Read() inherits from PBegin, it puts the seal on preceding it with something else.

    However, I found a workaround which resembles the second approach that I suggested in the question - implementing a beam.PTransform (not beam.DoFn) that dynamically returns the appropriate PTransform when building the pipeline, according to the existence or inexistence of cache. It looks like this:

    class ReadFromBigQueryWithCache(beam.PTransform):
    
        def __init__(self, query):
            super().__init__()
            self.query = query
    
        def expand(self, input_or_inputs: InputT) -> OutputT:
    
        ## Implement here logic for caching
        ## if cached:
        ##  return input_or_inputs | beam.io.ReadFromAvro(file_pattern="path/to/cached_results")
        ## else:    
    
            return input_or_inputs | beam.io.ReadFromBigQuery(
                query=self.query
            )