The above image is the table schema for a big query table which is the input into an apache beam dataflow job that runs on spotify's scio. If you aren't familiar with scio it's a Scala wrapper around the Apache Beam Java SDK. In particular, a "SCollection wraps PCollection". My input table on BigQuery disk is 136 gigs, but upon looking at the size of my SCollection in the dataflow UI it is 504.91 GB.
I understand that BigQuery is likely much better at data compression and representation, but a >3x increase in size seems quite high. To be very clear I'm using Type Safe Big Query Case Class (let's call it Clazz) representation, so my SCollection is of type SCollection[Clazz] instead of SCollection[TableRow]. TableRow is the native representation in the Java JDK. Any tips on how to keep the memory allocation down? It is related to a particular column type in my input: Bytes, Strings, Record, Floats, etc?
This is likely due to the TableRow format which contains string names for the columns, that add to the size.
Consider using the following to create a PCollection of objects instead of TableRows. This allows you to directly read into an object which matches the schema, which should reduce the data size a little bit.
/**
* Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
* each row of the table or query result, parsed from the BigQuery AVRO format using the specified
* function.
*
* <p>Each {@link SchemaAndRecord} contains a BigQuery {@link TableSchema} and a
* {@link GenericRecord} representing the row, indexed by column name. Here is a
* sample parse function that parses click events from a table.
*
* <pre>{@code
* class ClickEvent { long userId; String url; ... }
*
* p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, ClickEvent>() {
* public ClickEvent apply(SchemaAndRecord record) {
* GenericRecord r = record.getRecord();
* return new ClickEvent((Long) r.get("userId"), (String) r.get("url"));
* }
* }).from("...");
* }</pre>
*/
public static <T> TypedRead<T> read(
SerializableFunction<SchemaAndRecord, T> parseFn) {