My users generate SQL queries with about 100,000 rows and 20 columns which, these days, I guess would be considered a modest amount of data. The queries execute in a fraction of a second, but transmission to Arrow (and then to pandas) takes 5-10 seconds (clock time) so I want to improve this. The tables are a natural for compression along the columns. I couldn't immediately see how to add compression to bigquery.to_arrow() so I thought I'd start with the temporary table and ship that:
job_query = bq_client.query(query)
q = job_query.result()
destination = job_query.destination
destination_info = bq_client.get_table(destination)
destination_parts = str(destination_info).split(".")
destination_table = "projects/{}/datasets/{}/tables/{}".format(*destination_parts)
requested_session = bigquery_storage.types.ReadSession()
requested_session.table = destination_table
requested_session.data_format = bigquery_storage.types.DataFormat.ARROW
# bigquery_storage.ReadSession.TableReadOptions.ResponseCompressionCodec has possible values
# 'RESPONSE_COMPRESSION_CODEC_LZ4', 'RESPONSE_COMPRESSION_CODEC_UNSPECIFIED but I couldn't
# get it to work so I'll try Arrow - there are three choices:
arrow_options = bigquery_storage.ArrowSerializationOptions(buffer_compression =
bigquery_storage.ArrowSerializationOptions.CompressionCodec.COMPRESSION_UNSPECIFIED)
arrow_options = bigquery_storage.ArrowSerializationOptions(buffer_compression =
bigquery_storage.ArrowSerializationOptions.CompressionCodec.ZSTD)
arrow_options = bigquery_storage.ArrowSerializationOptions(buffer_compression =
bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME)
bq_storage_client.ArrowSerializationOptions = arrow_options #is this correct?!
parent = "projects/{}".format(destination_parts[0])
session = bq_storage_client.create_read_session(
parent = parent,
read_session=requested_session,
max_stream_count = 1,
)
reader = bq_storage_client.read_rows(session.streams[0].name)
start_t = (time.time(), time.process_time())
df = reader.to_dataframe(session)
end_t = (time.time() - start_t[0], time.process_time()-start_t[1])
print("To->ReadSession->pandas Time/cpu", df.shape, end_t[0], end_t[1])
And the good news is that it runs fine with all three compression options. The bad news is that the clock and cpu times are essentially identical regardless of which compression option I choose.
Makes me think that I am not actually compressing the data.... (a) Have I set the code correctly or is there some other flag that I should be setting and/or (b) Is a way to check for the actual number of bytes transmitted. Of course, there is always (c) that the files are small enough that data compression isn't going to help and/or (d) the compression is being done on the rows (which is useless to me) as opposed to being done on the columns (which should be awesome).
Pointers appreciated
p.s. If instead of ArrowSerialization I chose
requested_session.read_options.response_compression_codec =
bigquery_storage.ReadSession.TableReadOptions.ResponseCompressionCodec.RESPONSE_COMPRESSION_CODEC_LZ4
Then I got error: OSError: Invalid IPC stream: negative continuation token
The short answer is that LZ4_FRAME compression is automatically added if you do the most simple call with no extra parameters
df_co = bq_client.query(sql_query).to_arrow(create_bqstorage_client=True).to_pandas()
so there is no reason to add extra requests for ArrowSerializationOptions.
In a bit more detail, _ARROW_COMPRESSION_SUPPORT
is set to TRUE
in modern versions of bigquery in _pandas_helpers.py
and then one can find the lines
if _ARROW_COMPRESSION_SUPPORT:
requested_session.read_options.arrow_serialization_options.buffer_compression = (
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
)
and voila, compression is turned on. The to_arrow
call then includes the step
session = bqstorage_client.create_read_session(
parent="projects/{}".format(project_id),
read_session=requested_session,
max_stream_count=requested_streams,
)
where the compression tag has been added so there does not seem particular value in doing additional compression. Also notice that this is what I had broken out by hand, so there really wasnt value in doing that either. Following along in the code, it finally gets to build an RPC response request
parent: "projects/my-table"
read_session {
data_format: ARROW
table: "projects/my-table/datasets/_9eee1035c3558f558823a7ce70f6126ce5792454/tables/anon3f64d460_217c_4493_a0f8_3c959d746614"
read_options {
arrow_serialization_options {
buffer_compression: LZ4_FRAME
}
}
}
max_stream_count: 1
and presumably BigQuery is indeed respecting that request and that's why the computation times are the same whether or not the user adds ArrowSerialization by hand.
There was also some error on the original code I posted. This does not work
arrow_options = bigquery_storage.ArrowSerializationOptions(
buffer_compression = bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME)
bq_storage_client.ArrowSerializationOptions = arrow_options
The formulation which does work is
requested_session.read_options.arrow_serialization_options.buffer_compression = (
bigquery_storage.ArrowSerializationOptions.CompressionCodec.LZ4_FRAME)
This does give the advantage of being able to increase max_streams, add filters by column, etc.