I'm trying to create two custom kedro DataSets and I'm not quite sure how to configure the __init__
methods.
The first DataSet is simple, its a readonly, fetch rss items from a static URL.
catalog entry:
rss_feed_extract:
type: kedro_workbench.extras.datasets.RSSDataSet.RSSFeedExtract
url: https://api.msrc.microsoft.com/update-guide/rss
class definition
class RSSFeedExtract(AbstractDataSet):
def __init__(self, url: str):
self._url = url <- how/when does kedro pass this from the catalog entry?
def _save():
DataSetError
def _load():
dictionary=feedparser(self._url)
#some minor processing
return {dictionary}
the function mapped to the node definition
def extract_rss_feed() -> Dict[str, Any]:
raw_rss_feed = RSSFeedExtract() <- do I manually pass the catalog property here or does kedro?
raw_rss_feed.load() <- do I manually call the load method or does the pipeline?
return {'key_1':'value_1', 'key_2': 'value_2'}
the node definition in the pipeline
node(
func=extract_rss_feed,
inputs=None, <- not sure if I'm supposed to pass the dataset name here or not
outputs='rss_feed_for_transforming',
name="extract_rss_feed",
),
when I try to run the above, I get the following error:
TypeError: RSSFeedExtract.__init__() missing 1 required positional argument: 'url'
So I just need some help understanding how to pass catalog properties into the DataSet classes
I tried to create an instance of the built-in APIDataset and I got that working but I didn't need to pass the url property to the function/node calling it, so I'm not sure what I'm doing wrong here. My thinking was that the node doesn't take an input because it is fetching data so inputs=None. But maybe I'm supposed to set inputs='rss_feed_extract'? I'm not clear.
What I want is to create a class that fetches rss items and returns a dictionary of the items and I want the node that references it to output an in-memory DataSet of the dictionary that I can pass into the next node to filter and transform the feed items.
Any guidance is appreciated.
Yes, you're supposed to pass the dataset name as input
to the node and also to make the node function (in your case extract_rss_feed
not use the dataset class directly.
Kedro first instantiates the *Dataset
class with the appropriate arguments from the YAML file, in your case url
, and then passes the return value of .load()
to the node function.
As a result, your node functions never see *Dataset
objects, only primitive types (or whatever you return from _load
). For example, nodes matched to a pandas.CSVDataSet
would not see a CSVDataSet
, but a pd.DataFrame
.
Your code would look like this:
# pipeline.py
...
node(
func=extract_rss_feed,
inputs='rss_feed_extract',
outputs='rss_feed_for_transforming',
name="extract_rss_feed",
),
# nodes.py
def extract_rss_feed(feed: Dict) -> Dict:
return {"key1": feed["key1"], "key2": feed["key2"]}