pythonkedro

how do kedro catalog properties get passed to Datasets when pipeline is executed


I'm trying to create two custom kedro DataSets and I'm not quite sure how to configure the __init__ methods. The first DataSet is simple, its a readonly, fetch rss items from a static URL. catalog entry:

rss_feed_extract:
  type: kedro_workbench.extras.datasets.RSSDataSet.RSSFeedExtract
  url: https://api.msrc.microsoft.com/update-guide/rss

class definition

class RSSFeedExtract(AbstractDataSet):
    def __init__(self, url: str):
        self._url = url <- how/when does kedro pass this from the catalog entry?
    def _save():
        DataSetError
    def _load():
        dictionary=feedparser(self._url)
        #some minor processing
        return {dictionary}

the function mapped to the node definition

def extract_rss_feed() -> Dict[str, Any]:
    raw_rss_feed = RSSFeedExtract() <- do I manually pass the catalog property here or does kedro?
    raw_rss_feed.load() <- do I manually call the load method or does the pipeline?
    
    return {'key_1':'value_1', 'key_2': 'value_2'}

the node definition in the pipeline

node(
            func=extract_rss_feed,
                inputs=None, <- not sure if I'm supposed to pass the dataset name here or not
                outputs='rss_feed_for_transforming',
                name="extract_rss_feed",
        ),

when I try to run the above, I get the following error: TypeError: RSSFeedExtract.__init__() missing 1 required positional argument: 'url' So I just need some help understanding how to pass catalog properties into the DataSet classes

I tried to create an instance of the built-in APIDataset and I got that working but I didn't need to pass the url property to the function/node calling it, so I'm not sure what I'm doing wrong here. My thinking was that the node doesn't take an input because it is fetching data so inputs=None. But maybe I'm supposed to set inputs='rss_feed_extract'? I'm not clear.

What I want is to create a class that fetches rss items and returns a dictionary of the items and I want the node that references it to output an in-memory DataSet of the dictionary that I can pass into the next node to filter and transform the feed items.

Any guidance is appreciated.


Solution

  • Yes, you're supposed to pass the dataset name as input to the node and also to make the node function (in your case extract_rss_feed not use the dataset class directly.

    Kedro first instantiates the *Dataset class with the appropriate arguments from the YAML file, in your case url, and then passes the return value of .load() to the node function.

    As a result, your node functions never see *Dataset objects, only primitive types (or whatever you return from _load). For example, nodes matched to a pandas.CSVDataSet would not see a CSVDataSet, but a pd.DataFrame.

    Your code would look like this:

    # pipeline.py
    
    ...
        node(
            func=extract_rss_feed,
            inputs='rss_feed_extract',
            outputs='rss_feed_for_transforming',
            name="extract_rss_feed",
        ),
    
    
    # nodes.py
    
    def extract_rss_feed(feed: Dict) -> Dict:
        return {"key1": feed["key1"], "key2": feed["key2"]}