I use python to read a large csv file, transform the data within (mostly string operations), and then write the results to a single parquet file. In the transformation process, rows are independent. 1 row in the csv file yields 1 row in the parquet file.
In my current implementation, I divide the data into chunks (via pandas.read_csv()) and then transform the chunked data, allocating the work across most of the cores in my CPU (via multiprocessing.ProcessPoolExecutor). Once all tasks are complete, I concatenate the results into a single, large pandas dataframe and then write it to a parquet file.
The code below provides a minimal example.
Concatenating the results maxes out memory usage. Is there a safe, more efficient process that writes each chunk of transformed data once it becomes available? I've read about ThreadPoolExecutor, Queues, and futures but I haven't found a clean example of how to combine all of these multiprocessing techniques in a case like mine.
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import regex as re
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count, Queue
filename = 'input.csv'
# Create example of input file
n_rows = int(25 * 1e6) # a mid-sized dataset/ I am anticipating much larger files
input_df = pd.DataFrame([f'User_id_{i}_says_"hello!".' for i in range(n_rows)], columns=['A'])
input_df.to_csv(filename, header=True, index=False)
# Example of data transformation
def clean_text(s):
regex = re.compile('[^a-zA-Z]')
return regex.sub('', s)
def process(df):
# Split string into 5 columns.
out = pd.DataFrame(df['A'].str.split('_').to_list())
# Convert numeric id to string
out[2] = out[2].astype(str)
# Remove non-alphanumerics from column 4 entries.
out[4] = out[4].apply(lambda x: clean_text(x))
# Rename columns
out.rename(columns={i: f'A{i}' for i in out.columns }, inplace=True)
return out
if __name__ == "__main__":
# Set parquet data schema
schema = pa.schema((f'A{i}', pa.string()) for i in range(5))
# Multiprocessing implementation
core_count = cpu_count() - 3 # OP's machine has 24 cores
# Read input data in chunks
chunk_size = int(1e5)
chunks = pd.read_csv('input.csv', header=0, na_filter=False, chunksize=chunk_size)
# Allocate transformation task
with pq.ParquetWriter('processed.parquet', schema=schema, compression='GZIP') as writer:
with ProcessPoolExecutor(core_count) as executor:
results = executor.map(process, chunks)
df = pd.concat(results, axis=0)
# Convert df to record batch
transformed_batch = pa.RecordBatch.from_pandas(df, schema=schema)
writer.write_batch(transformed_batch)
There are two ideas I would suggest. Combined, they can reduce memory usage on this example from 11 GB to 0.4 GB.
I ran your program on my 4-core system, (with core_count = 4
) and measured the maximum RSS of the parent process. For me, the parent process uses 11 GB. This will be our baseline.
import resource
def get_maxrss_bytes():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1024
if __name__ == "__main__":
print(f"{get_maxrss_bytes() / 1e6:.2f} MB used at max RSS")
(Note: This benchmark ignores child process RSS. Most pages are likely to be shared anyway.)
The first thing I would try is to eagerly consume map results. Rather than buffering all map results, iterate over your map results, and save each result to disk.
Example:
with pq.ParquetWriter('processed.parquet', schema=schema, compression='GZIP') as writer:
with ProcessPoolExecutor(core_count) as executor:
# Disable chunking - we've already done this
for result in executor.map(process, chunks, chunksize=1):
transformed_batch = pa.RecordBatch.from_pandas(result, schema=schema)
writer.write_batch(transformed_batch)
This reduces the memory usage to 3.4 GB.
Warning: The resulting parquet file is 67% larger. I suspect this is because the size of the row group is quite small, which wastes space because a fixed amount of metadata must be stored for each row group. Increasing chunk_size
to 1e6 reduces this storage cost, at the cost of using more memory. Alternatively, you could add code to buffer up dataframes until you get a million rows worth of data to write to the parquet file.
Note: I found this answer to be a very helpful reference when writing this.
I added a bunch of print statements to the process()
function and for result in ...
loop, and I found that it would often run hundreds of calls to process()
before running the main loop once. This can happen if one of the calls to process()
runs slowly.
What you'd really like is to limit the number of results stored in memory at once. I found an answer which has some helpful code for this. Copying their code into your program, I called it like this.
# Allocate transformation task
with pq.ParquetWriter('processed.parquet', schema=schema, compression='GZIP') as writer:
with ProcessPoolExecutor(core_count) as executor:
# Prepare 2 extra jobs while waiting for jobs to complete
n_outstanding_jobs = core_count + 2
for result in lazy_executor_map(process, chunks, executor, n_concurrent=n_outstanding_jobs):
transformed_batch = pa.RecordBatch.from_pandas(result, schema=schema)
writer.write_batch(transformed_batch)
This reduces the memory use to 0.43 GB.