I'm trying to parallelize time series forecasting in python using dask. The format of the data is that each time series is a column and they have a common index of monthly dates. I have a custom forecasting function that returns a time series object with the fitted and forecasted values. I want to apply this function across all columns of a dataframe (all time series) and return a new dataframe with all these series to be uploaded to a DB. I've gotten the code to work by running:
data = pandas_df.copy()
ddata = dd.from_pandas(data, npartitions=1)
res = ddata.map_partitions(lambda df: df.apply(forecast_func,
axis=0)).compute(get=dask.multiprocessing.get)
My question is, is there a way in Dask to partition by column instead of row, since in this use case I need to keep the ordered time index as is for the forecasting function to work correctly.
If not, how would I re-format the data to allow efficient large-scale forecasting to be possible, and still return the data in the format I need to then push to a DB?
I've used the dask.delayed solution and it's working really well, it takes about 1/3 of the time just using a local cluster.
For anyone interested the solution I've implemented:
from dask.distributed import Client, LocalCluster
import pandas as pd
import dask
cluster = LocalCluster(n_workers=3,ncores=3)
client = Client(cluster)
#get list of time series back
output = []
for i in small_df:
forecasted_series = dask.delayed(custom_forecast_func)(small_df[i])
output.append(forecasted_series)
total = dask.delayed(output).compute()
#combine list of series into 1 dataframe
full_df = pd.concat(total,ignore_index=False,keys=small_df.columns,names=['time_series_names','Date'])
final_df = full_df.to_frame().reset_index()
final_df.columns = ['time_series_names','Date','value_variable']
final_df.head()
This gives you the melted dataframe structure so if you want the series to be the columns you can transform it with
pivoted_df = final_df.pivot(index='Date', columns='time_series_names', values='value_variable')
small_df is in this format in pandas dataframe with Date being the index