pythonpandaskedro

Kedro - How to update a dataset in a Kedro pipeline given that a dataset cannot be both input and output of a node (only DAG)?


In a Kedro project, I have a dataset in catalog.yml that I need to increment by adding a few lines each time I call my pipeline.

#catalog.yml
my_main_dataset:
  type: pandas.SQLTableDataSet
  credentials: postgrey_credentials
  save_args:
    if_exists: append
  table_name: my_dataset_name

However I cannot just rely on append in my catalog parameters since I need to control that I do not insert already existing dates in my dataset to avoid duplicates. I also cannot create a node taking my dataset both as input (to look for already existing dates and merge with the additional data) and as output, otherwise I'm creating a cycle which is forbidden (only DAG are permitted). I'm stuck and do not see any elegant way to solve my issue. I looked at other threads but did not find anything relevant on stackoverflow so far.

I tried a very ugly thing which is to create an independent node in the same pipeline just to look into my dataset and record min and max dates in global variables as a side effect, in order to use the in the main flow to control the append. It's not only ugly, but it also fails since I cannot control in which order independent nodes of a same pipeline will be run...

Idealy I would like to achieve something like this, which it is forbidden by Kedro the way I coded it (not DAG):

#catalog.yml
my_main_dataset:
  type: pandas.SQLTableDataSet
  credentials: postgrey_credentials
  save_args:
    if_exists: append
  table_name: my_dataset_name

my_additional_dataset:
  type: pandas.SQLTableDataSet
  credentials: postgrey_credentials
  save_args:
    if_exists: append
  table_name: my__additional_dataset_name
#node.py
import pandas as pd

def increment_main_dataset(main_df, add_df):
  last_date = main_df['date'].max()
  filtered_add_df = add_df.loc[add_df['date'] > last_date]
  main_df = pd.concat([main_df, filtered_add_df], axis=0)
  return main_df
#pipeline.py

from kedro.pipeline import Pipeline, node, pipeline
from .nodes import *

def create_pipeline(**kwargs) -> Pipeline:
  return pipeline([
    node(
      func=increment_main_dataset,
      inputs=["my_main_dataset", "my_additional_dataset"],
      outputs="my_main_dataset",
      name="increment-dataset",
    ),
  ])

Solution

  • It may not be the best solution, but a workaround is for you to set two kedro datasets pointing to the same physical space. One for reading and the other for writing, but to the same file/table. Something like:

    #catalog.yml
    my_main_dataset_read:  # same as my_main_dataset_write
      type: pandas.SQLTableDataSet
      credentials: postgrey_credentials
      save_args:
        if_exists: append
      table_name: my_dataset_name
    
    my_main_dataset_write:  # same as my_main_dataset_read
      type: pandas.SQLTableDataSet
      credentials: postgrey_credentials
      save_args:
        if_exists: append
      table_name: my_dataset_name
    
    my_additional_dataset:
      type: pandas.SQLTableDataSet
      credentials: postgrey_credentials
      save_args:
        if_exists: append
      table_name: my__additional_dataset_name
    
    #node.py - no changes from your code
    import pandas as pd
    
    def increment_main_dataset(main_df, add_df):
      last_date = main_df['date'].max()
      filtered_add_df = add_df.loc[add_df['date'] > last_date]
      main_df = pd.concat([main_df, filtered_add_df], axis=0)
      return main_df
    
    #pipeline.py
    
    from kedro.pipeline import Pipeline, node, pipeline
    from .nodes import *
    
    def create_pipeline(**kwargs) -> Pipeline:
      return pipeline([
        node(
          func=increment_main_dataset,
          inputs=["my_main_dataset_read", "my_additional_dataset"],
          outputs="my_main_dataset_write",
          name="increment-dataset",
        ),
      ])