pythonarimarayfacebook-prophetmodin

Using Prophet or Auto ARIMA with Ray


There is something about Ray that I could not find a clear answer. Ray is a distributed framework for dataprocessing and training. In order to make it work in a distributed fashion Modin or some other distributed data analysis tool supported by Ray must be used so the data can flow on the whole cluster, but what if I want to use a model like Facebook's Prophet or ARIMA that takes pandas dataframe as input? When I use pandas dataframe as the arguments of the model functions will it work on just a single node or is there a possible workaround for it to work on the cluster?


Solution

  • Ray is able to train models with pandas dataframes as inputs!

    Currently, there is a slight work-around needed for ARIMA, since it typically uses the statsmodels library behind the scenes. In order to ensure the models are serialized correctly, an extra pickle step is needed. Ray might eliminate the need for the pickle work-around in the future.

    See explanation of pickle work-around: https://alkaline-ml.com/pmdarima/1.0.0/serialization.html

    Here is an excerpt of code for python 3.8 and ray 1.8. Notice that the inputs to train_model() and inference_model() functions are pandas dataframes. The extra pickle step is embedded within those functions. https://github.com/christy/AnyscaleDemos/blob/main/forecasting_demos/nyctaxi_arima_simple.ipynb

    import ray
    import pandas as pd
    import pmdarima as pm
    from pmdarima.model_selection import train_test_split
    
    # read 8 months of clean, aggregated monthly taxi data
    filename = "https://github.com/christy/MachineLearningTools/blob/master/data/clean_taxi_monthly.parquet?raw=true"
    g_month = pd.read_parquet(filename) 
    
    # Define a train_model function, default train on 6 months, inference 2
    def train_model(theDF:pd.DataFrame, item_col:str
                    , item_value:str, target_col:str
                    , train_size:int=6) -> list:
    
        # split data into train/test
        train, test = train_test_split(theDF.loc[(theDF[item_col]==item_value), :], train_size=train_size)
        
        # train and fit auto.arima model
        model = pm.auto_arima(y=train[target_col]
                              ,X=train.loc[:, (train.columns!=target_col) 
                                              & (train.columns!=item_col)]
                             )
        # here is the extra pickle step to handle statsmodel objects
        return [train, test, pickle.dumps(model)]
    
    
    # Define inference_model function
    def inference_model(model_pickle:bytes, test:pd.DataFrame
                        , timestamp_col:str, item_col:str, target_col:str) -> pd.DataFrame:
    
        # unpickle the model
        model = pickle.loads(model_pickle)
        
        # inference on test data
        forecast = pd.DataFrame(model.predict(n_periods=test.shape[0]
                             , X=test.loc[:, (test.columns!=target_col) & (test.columns!=item_col)]
                             , index=test.index))
        
        return forecast
    
    
    # start-up ray on your laptop for testing purposes
    import ray
    NUM_CPU = 2
    ray.init(
        ignore_reinit_error=True
        , num_cpus = NUM_CPU
    )
    
    ###########
    # run your training as distributed jobs by using ray remote function calls
    ###########
        
    # Convert your regular python functions to ray remote functions
    train_model_remote = ray.remote(train_model).options(num_returns=3)  
    inference_model_remote = ray.remote(inference_model)
        
    # Train every model
    item_list = list(g_month['pulocationid'].unique())
    model = []
    train = []
    test = []
    
    for p,v in enumerate(item_list):
        # ray remote eval
        temp_train, temp_test, temp_model = \
            train_model_remote.remote(g_month
                                      , item_col='pulocationid', item_value=v
                                      , target_col='trip_quantity'
                                      , train_size=6)
        train.append(temp_train)
        test.append(temp_test)
        model.append(temp_model)
    
    # Inference every test dataset
    result=[]
    for p,v in enumerate(item_list):
        # ray remote eval
        result.append(inference_model_remote.remote(model[p], test[p]
                                                    , timestamp_col='pickup_monthly'
                                                    , item_col='pulocationid'
                                                    , target_col='trip_quantity'))
    
    # ray.get() means block until all objectIDs requested are available
    forecast = ray.get(result)