pythonpysparkkedro

in kedro / pyspark how to use MemoryDataset


I am trying to use a MemoryDataset with kedro, in order to not save the intermeiate result to disk.

# nodes.py
def preprocess_format_tracksessions(tracksess: DataFrame, userid_profiles:pd.DataFrame , parameters: Dict) -> MemoryDataset:

In the pipeline I am defining the node output and inputs:

# pipeline.py
def create_pipeline(**kwargs) -> Pipeline:
    return pipeline([ 
        node( 
                func=preprocess_format_tracksessions,
                inputs= ["track_sessions", "userid_profiles", "parameters"],
                outputs="ts_formatted",
                name="preprocess_format_tracksessions",
            ),
        node( 
                func=process_tracksessions,
                inputs= ["ts_formatted", "parameters"],
                outputs="results_summary",
                name="process_format_tracksessions",
            ),            
    ])

And in the catalog I am defining

ts_formatted:
  type: MemoryDataSet 

But every time I get this error, surely because of my misunderstanding on how to proceed. Any help much appreciated:

DatasetError: Failed while saving data to data set MemoryDataset().
It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation.
SparkContext can only be used on the driver, not in code that it run on workers. For more information, see
SPARK-5063.

Solution

  • Hi @gaut you may need to add this to your YAML definition:

    ts_formatted:
      type: MemoryDataSet
      copy_mode: assign
    

    For normal, not explicitly defined MemoryDataSets Kedro will infer this copy mode like so:

    https://docs.kedro.org/en/stable/_modules/kedro/io/memory_dataset.html#MemoryDataset.__init__