google-bigquerygoogle-bigquery-storage-api

Does data compression actually do anything on Query->to_Arrow()?


My users generate SQL queries with about 100,000 rows and 20 columns which, these days, I guess would be considered a modest amount of data. The queries execute in a fraction of a second, but transmission to Arrow (and then to pandas) takes 5-10 seconds (clock time) so I want to improve this. The tables are a natural for compression along the columns. I couldn't immediately see how to add compression to bigquery.to_arrow() so I thought I'd start with the temporary table and ship that:

job_query = bq_client.query(query)
q = job_query.result() 
destination = job_query.destination
destination_info = bq_client.get_table(destination)
destination_parts = str(destination_info).split(".")
destination_table = "projects/{}/datasets/{}/tables/{}".format(*destination_parts)

requested_session = bigquery_storage.types.ReadSession()
requested_session.table = destination_table
requested_session.data_format = bigquery_storage.types.DataFormat.ARROW

# bigquery_storage.ReadSession.TableReadOptions.ResponseCompressionCodec has possible values 
# 'RESPONSE_COMPRESSION_CODEC_LZ4', 'RESPONSE_COMPRESSION_CODEC_UNSPECIFIED but I couldn't
# get it to work so I'll try Arrow - there are three choices:

arrow_options = bigquery_storage.ArrowSerializationOptions(buffer_compression = 
    bigquery_storage.ArrowSerializationOptions.CompressionCodec.COMPRESSION_UNSPECIFIED)

arrow_options = bigquery_storage.ArrowSerializationOptions(buffer_compression =
     bigquery_storage.ArrowSerializationOptions.CompressionCodec.ZSTD)

arrow_options = bigquery_storage.ArrowSerializationOptions(buffer_compression = 
    bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME)

bq_storage_client.ArrowSerializationOptions = arrow_options   #is this correct?!

parent = "projects/{}".format(destination_parts[0])
session = bq_storage_client.create_read_session(
    parent = parent,
    read_session=requested_session,
    max_stream_count = 1,                         
)
reader = bq_storage_client.read_rows(session.streams[0].name)       

start_t = (time.time(), time.process_time())
df = reader.to_dataframe(session)
end_t = (time.time() - start_t[0], time.process_time()-start_t[1])
print("To->ReadSession->pandas Time/cpu", df.shape, end_t[0], end_t[1])

And the good news is that it runs fine with all three compression options. The bad news is that the clock and cpu times are essentially identical regardless of which compression option I choose.

Makes me think that I am not actually compressing the data.... (a) Have I set the code correctly or is there some other flag that I should be setting and/or (b) Is a way to check for the actual number of bytes transmitted. Of course, there is always (c) that the files are small enough that data compression isn't going to help and/or (d) the compression is being done on the rows (which is useless to me) as opposed to being done on the columns (which should be awesome).

Pointers appreciated

p.s. If instead of ArrowSerialization I chose

     requested_session.read_options.response_compression_codec =      
     bigquery_storage.ReadSession.TableReadOptions.ResponseCompressionCodec.RESPONSE_COMPRESSION_CODEC_LZ4

Then I got error: OSError: Invalid IPC stream: negative continuation token


Solution

  • The short answer is that LZ4_FRAME compression is automatically added if you do the most simple call with no extra parameters

    df_co = bq_client.query(sql_query).to_arrow(create_bqstorage_client=True).to_pandas()
    

    so there is no reason to add extra requests for ArrowSerializationOptions.

    In a bit more detail, _ARROW_COMPRESSION_SUPPORT is set to TRUE in modern versions of bigquery in _pandas_helpers.py and then one can find the lines

        if _ARROW_COMPRESSION_SUPPORT:
            requested_session.read_options.arrow_serialization_options.buffer_compression = (
                ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
            )
    

    and voila, compression is turned on. The to_arrow call then includes the step

    session = bqstorage_client.create_read_session(
        parent="projects/{}".format(project_id),
        read_session=requested_session,
        max_stream_count=requested_streams,
    )
    

    where the compression tag has been added so there does not seem particular value in doing additional compression. Also notice that this is what I had broken out by hand, so there really wasnt value in doing that either. Following along in the code, it finally gets to build an RPC response request

    parent: "projects/my-table"
    read_session {
      data_format: ARROW
      table: "projects/my-table/datasets/_9eee1035c3558f558823a7ce70f6126ce5792454/tables/anon3f64d460_217c_4493_a0f8_3c959d746614"
      read_options {
        arrow_serialization_options {
          buffer_compression: LZ4_FRAME
        }
      }
    }
    max_stream_count: 1
    

    and presumably BigQuery is indeed respecting that request and that's why the computation times are the same whether or not the user adds ArrowSerialization by hand.

    There was also some error on the original code I posted. This does not work

    arrow_options = bigquery_storage.ArrowSerializationOptions(
    buffer_compression = bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME)
    
    bq_storage_client.ArrowSerializationOptions = arrow_options
    

    The formulation which does work is

    requested_session.read_options.arrow_serialization_options.buffer_compression = (
               bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME)
    

    This does give the advantage of being able to increase max_streams, add filters by column, etc.