I have a large Dataframe that I need to do a lot of matching operations over, and in the past have always used the below method for doing it. However, the Dataframe that I am currently attempting to multiprocess is a 2 GB CSV file that my computer is having problems multiprocessing over, even with only one partition. I am assuming it is because when the Dataframe is split into chunks for multiprocessing, it is than doubling the amount of memory needed, and therefore my computer cannot handle it. This is my current code:
def parallelize_dataframe(df, func, additional_param, num_partitions):
df_split = np.array_split(df, num_partitions)
results = []
with ProcessPoolExecutor(max_workers=num_partitions) as executor:
futures = {executor.submit(func, chunk, additional_param): chunk for chunk in df_split}
for future in tqdm(futures, total=len(futures), desc="Overall progress"):
results.append(future.result())
return pd.concat(results)
Any help is greatly appreciated.
for tasks such as this I would suggest pre-processing the csv file to separate it into approximately equal chunks which are read in by the child process rather than being read by the main process and sent to the child process. The sending of that data from main to child takes quite a bit of overhead (and memory). Here's an example:
from multiprocessing import Pool
from io import BytesIO
import pandas as pd
csvfile = r"c:\some\example\data.csv"
chunksize = 2**20 # 1MiB chunk size (try different values depending on file size and processing speed)
#example csv contents
# colA,colB,colC
# 1,2,3
# 4,5,6
# 7,8,9
# ...
def sum_cols(args):
file_start, file_end, col_names = args # unpack tuple args as Pool.imap_unordered only supports a single arg
with open(csvfile, "rb") as f:
f.seek(file_start)
buf = BytesIO(f.read(file_end-file_start)) # store chunk of csv in a buffer to pass to pandas
df = pd.read_csv(buf, names=col_names) # col_names aren't in the chunk so pass them explicitly
return df.sum()
if __name__ == "__main__":
with open(csvfile, "rb") as f:
firstline = f.readline()
headers = [col_title.strip() for col_title in firstline.split(b",")]
startpoints = []
endpoints = []
while True: # scan the file without reading in much data to find good chunk boundaries (start and end on newline)
startpoints.append(f.tell())
f.seek(chunksize,1) # skip ahead by chunksize bytes
line = f.readline() # find the end of the current line
endpoints.append(f.tell())
if not line: # empty line indicates end of the file
if startpoints[-1] == endpoints[-1]: #if the last chunk landed exactly on the last line of the file, there could be an empty chunk
startpoints.pop()
endpoints.pop()
break
arglist = [(start, end, headers) for start, end in zip(startpoints, endpoints)]
with Pool() as pool:
print(sum(pool.imap(sum_cols, arglist)))
I have not used polars
and can't make an answer based on that, but my understanding is that it is very fast.