I have a large gziped csv file (.csv.gz) uploaded to a dataset that's about 14GB in size and 40GB when uncompressed. Is there a way to decompress, read, and write it out to a dataset using Python Transforms without causing the executor to OOM?
I'm going to harmonize a few tactics in answering this question.
First, I want to write this using test-driven development using the method discussed here since we are dealing with raw files. The iteration speed on raw files using full checks + build will be far too long, so I'll start off by creating a sample .csv
file and compressing it for much faster development.
My sample .csv
file looks like the following:
I then compressed it using command-line utilities and added it to my code repository by cloning the repository to my local machine, adding the file to my development branch, and pushing the result back up into my Foundry instance.
I also made a test
directory in my repository as I want to ensure my parsing logic is properly verified.
This resulted in my repository looking like the following:
Protip: don't forget to modify your setup.py
and build.gradle
files to enable testing and specifically package up your small test file.
I also need to make my parsing logic sit outside my my_compute_function
method so that its available to my test methods, so parse_gzip.py
looks like the following:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.option("header", "true").csv(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs)
return output_df
@transform(
the_output=Output("ri.foundry.main.dataset.my-awesome-output"),
the_input=Input("ri.foundry.main.dataset.my-awesome-input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + file_status.path for file_status in input_filesystem.ls('**/*.csv.gz')]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)
Consequently, my test_gzip_csv.py
file looks like so:
from myproject.datasets import parse_gzip
from pkg_resources import resource_filename
def test_compressed_csv(spark_session):
file_path = resource_filename(__name__, "test.csv.gz")
parsed_df = parse_gzip.read_files(spark_session, [file_path])
assert parsed_df.count() == 1
assert set(parsed_df.columns) == {"col_1", "col_2"}
It's important to see here that this methodology doesn't use the .files()
call into the filesystem, it uses the .ls()
method to get an iterator of file names. This is done on purpose in this case because we don't need to parse the files themselves inside executors; we simply need to use Spark's native methods of .csv
to parse the compressed files using existing functionality.
GZip files are actually splittable and Spark's own methods for reading these files will be optimal instead of writing your own decompressor / .csv parser. If you were to try to decompress them and parse them manually, you would risk OOMing your job and need to throw more memory at it in order for it to succeed. At the scale you are also operating at, it's advisable to not process individual files in Python as its performance will not match that of Spark.
Note that I also use the transforms.verbs.dataframes.union_many
method here to gracefully handle different files having different schemas. You can specify 'narrow', 'wide' and 'strict' options to handle cases of different schemas, refer to your product documentation on which will best fit your needs.