pythonpandasdataframepysparkautoml

How can I implement python AutoML libraries (like Pycaret, auto-sklearn) etc, on pyspark dataframe?


I am trying to implement AutoML over a Pyspark DataFrame but didn't found any particular documentation or library specific for this? Can we implement Pycaret, MLJar or any automl library for pyspark dataframes using pandas_udfs?


Solution

  • There are two ways to do this. The first is you partition the data and use PyCaret for each partition. There is a video about this setup here: https://www.youtube.com/watch?v=fvLRrwAbLd0

    and an article here: https://towardsdatascience.com/scaling-pycaret-with-spark-or-dask-through-fugue-60bdc3ce133f

    Those will give the full info but I'll also leave the code snippets here because of StackOverflow guidelines.

    import fugue_spark
    
    schema = """Model:str, Accuracy:float, AUC:float, Recall:float, Prec:float, 
    F1:float, Kappa:float, MCC:float, TT_Sec:float, Sex:str"""
    
    def wrapper(df: pd.DataFrame) -> pd.DataFrame:
        clf = setup(data = df, 
                    target = 'Survived', 
                    session_id=123, 
                    silent = True, 
                    verbose=False, 
                    html=False)
        models = compare_models(fold = 10,  
                                round = 4,  
                                sort = "Accuracy", 
                                turbo = True, 
                                n_select=5, 
                                verbose=False)
        results = pull().reset_index(drop=True)
        # Fugue can't have spaces or . in column names
        results = results.rename(columns={"TT (Sec)": "TT_Sec", 
                                          "Prec.": "Prec"})
        results['Sex'] = df.iloc[0]["Sex"]
        return results.iloc[0:5]
    
    
    res = transform(df.replace({np.nan: None}), wrapper, schema=schema, partition={"by":"Sex"}, engine="spark")
    res = res.toPandas()
    

    The second way to use PyCaret on Spark/Dask/Ray is with distributed training. You can use the FugueBackend to achieve this. Documentation for that can be found here: https://fugue-tutorials.readthedocs.io/tutorials/integrations/ecosystem/pycaret.html

    The code example for this:

    from pycaret.parallel import FugueBackend
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    
    compare_models(n_select=2, parallel=FugueBackend(spark))