In a Kedro project, I have a dataset in catalog.yml that I need to increment by adding a few lines each time I call my pipeline.
#catalog.yml
my_main_dataset:
type: pandas.SQLTableDataSet
credentials: postgrey_credentials
save_args:
if_exists: append
table_name: my_dataset_name
However I cannot just rely on append in my catalog parameters since I need to control that I do not insert already existing dates in my dataset to avoid duplicates. I also cannot create a node taking my dataset both as input (to look for already existing dates and merge with the additional data) and as output, otherwise I'm creating a cycle which is forbidden (only DAG are permitted). I'm stuck and do not see any elegant way to solve my issue. I looked at other threads but did not find anything relevant on stackoverflow so far.
I tried a very ugly thing which is to create an independent node in the same pipeline just to look into my dataset and record min and max dates in global variables as a side effect, in order to use the in the main flow to control the append. It's not only ugly, but it also fails since I cannot control in which order independent nodes of a same pipeline will be run...
Idealy I would like to achieve something like this, which it is forbidden by Kedro the way I coded it (not DAG):
#catalog.yml
my_main_dataset:
type: pandas.SQLTableDataSet
credentials: postgrey_credentials
save_args:
if_exists: append
table_name: my_dataset_name
my_additional_dataset:
type: pandas.SQLTableDataSet
credentials: postgrey_credentials
save_args:
if_exists: append
table_name: my__additional_dataset_name
#node.py
import pandas as pd
def increment_main_dataset(main_df, add_df):
last_date = main_df['date'].max()
filtered_add_df = add_df.loc[add_df['date'] > last_date]
main_df = pd.concat([main_df, filtered_add_df], axis=0)
return main_df
#pipeline.py
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import *
def create_pipeline(**kwargs) -> Pipeline:
return pipeline([
node(
func=increment_main_dataset,
inputs=["my_main_dataset", "my_additional_dataset"],
outputs="my_main_dataset",
name="increment-dataset",
),
])
It may not be the best solution, but a workaround is for you to set two kedro datasets pointing to the same physical space. One for reading and the other for writing, but to the same file/table. Something like:
#catalog.yml
my_main_dataset_read: # same as my_main_dataset_write
type: pandas.SQLTableDataSet
credentials: postgrey_credentials
save_args:
if_exists: append
table_name: my_dataset_name
my_main_dataset_write: # same as my_main_dataset_read
type: pandas.SQLTableDataSet
credentials: postgrey_credentials
save_args:
if_exists: append
table_name: my_dataset_name
my_additional_dataset:
type: pandas.SQLTableDataSet
credentials: postgrey_credentials
save_args:
if_exists: append
table_name: my__additional_dataset_name
#node.py - no changes from your code
import pandas as pd
def increment_main_dataset(main_df, add_df):
last_date = main_df['date'].max()
filtered_add_df = add_df.loc[add_df['date'] > last_date]
main_df = pd.concat([main_df, filtered_add_df], axis=0)
return main_df
#pipeline.py
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import *
def create_pipeline(**kwargs) -> Pipeline:
return pipeline([
node(
func=increment_main_dataset,
inputs=["my_main_dataset_read", "my_additional_dataset"],
outputs="my_main_dataset_write",
name="increment-dataset",
),
])