I have a dataframe with multiple companies and countries' data that I am trying to transform in parallel using a function. The data takes a format like this but is much larger and with many more clients:
I have made a dask dataframe that combines the companygroupname and country to make a blended column that I am trying to use as an index for partitions.
I don't want to use npartitions > 1 when making the dask dataframe as that can randomly put half of my client's data in one partition and then the other half in another partition (as far as I can tell)
Here is what I have tried, my question is how can I properly split up my dataframe into specified partitions in order to process each one of them in parallel through my transformation function?
Many thanks in advance for any help
def calculate_predictions(df):
df['CompanyCountryCombined'] = df['CompanyGroupName'].astype(str) + '-' + df['Country']
# Convert pandas DataFrame to Dask DataFrame with one partition
ddf = dd.from_pandas(df, npartitions=1)
ddf = ddf.set_index('CompanyCountryCombined', sorted=True).repartition(partition_size="10MB") # Here I am trying to repartition the data across the combined index
list_of_output_columns = [] # defined elsewhere
# Define meta DataFrame to specify the output format
meta = pd.DataFrame(columns=[list_of_output_columns])
ddf = ddf.sort_values(by=['CompanyCountryCombined','Date'])
models = {} # defined elsewhere too
# Processing the partitions through the process_forecast_group function which is also defined elsewhere
final_results = ddf.groupby('CompanyCountryCombined').apply(
lambda partition: process_forecast_group(partition, models),
meta=meta
).compute()
return final_results
I ended up solving this by taking a unique list of the desired partitions and then repartitioning over those divisions:
import dask.dataframe as dd
import pandas as pd
def calculate_predictions(df):
def create_dask_df(pandas_df, columns_to_combine=list):
# Creates a column that will be used to partition the data based on desired column list
pandas_df['combined'] = pandas_df[columns_to_combine].astype(str).agg('-'.join, axis=1)
# Dask requires the data to be partitioned, we set it to 1 initially to create the ddf with all the data in one dataframe
ddf = dd.from_pandas(df, npartitions=1)
# Sets the index of the dask dataframe to the combined column
ddf = ddf.set_index('combined', sorted=True)
# Gets the list of unique combined values into a list, ready to repartition by these values
list_of_uniques = sorted(df['combined'].unique().tolist())
# The range of dask repartitioning cuts the list short by 1 so we need to increase it intentionally with a dupe
division_list = list_of_uniques + [list_of_uniques[-1]]
# Repartitions the dask dataframe based on the list of unique combined column
ddf = ddf.repartition(divisions=division_list)
return ddf
columns_to_partition_over = ['CompanyGroupName', 'Country']
ddf = create_dask_df(df, columns_to_partition_over)
list_of_output_columns = [] # defined elsewhere
meta = pd.DataFrame(columns=[list_of_output_columns]) # Define meta DataFrame to specify the output format
models = {} # defined elsewhere too
# Processing the partitions through the process_forecast_group function which is also defined elsewhere
final_results = ddf.map_partitions(
lambda partition: process_forecast_group(partition.sort_values('Date'), models),
meta=meta
).compute()
return final_results