I am trying to use a MemoryDataset with kedro, in order to not save the intermeiate result to disk.
# nodes.py
def preprocess_format_tracksessions(tracksess: DataFrame, userid_profiles:pd.DataFrame , parameters: Dict) -> MemoryDataset:
In the pipeline I am defining the node output and inputs:
# pipeline.py
def create_pipeline(**kwargs) -> Pipeline:
return pipeline([
node(
func=preprocess_format_tracksessions,
inputs= ["track_sessions", "userid_profiles", "parameters"],
outputs="ts_formatted",
name="preprocess_format_tracksessions",
),
node(
func=process_tracksessions,
inputs= ["ts_formatted", "parameters"],
outputs="results_summary",
name="process_format_tracksessions",
),
])
And in the catalog I am defining
ts_formatted:
type: MemoryDataSet
But every time I get this error, surely because of my misunderstanding on how to proceed. Any help much appreciated:
DatasetError: Failed while saving data to data set MemoryDataset().
It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation.
SparkContext can only be used on the driver, not in code that it run on workers. For more information, see
SPARK-5063.
Hi @gaut you may need to add this to your YAML definition:
ts_formatted:
type: MemoryDataSet
copy_mode: assign
For normal, not explicitly defined MemoryDataSets
Kedro will infer this copy mode like so:
https://docs.kedro.org/en/stable/_modules/kedro/io/memory_dataset.html#MemoryDataset.__init__