For various reasons (mainly ability to dynamically construct file paths) I like to define the data catalog programatically, and not use yaml file to define datasets e.g.
DataCatalog(
{"products": ParquetDataSet(filepath=f{PREFIX}/products.parquet")
...
})
In kedro 0.17 there was an easy way to register the catalog in order to use it's datasets in pipeline definition, via register_catalog
hook.
However in 0.18 this hook is absent and there is no RegistrationSpecs section among hook specifications.
What is there a way to replace register_catalog
in kedro 0.18?
I searched the documentation and tried to implement catalog in yaml file, but that doesn't fit project's requirements. TemplatedConfigLoader
is probably an option, but i does't allow to implement custom logic, constructing paths to datasets.
Not sure if this is the precise functionality that you're looking for, but I've been programmatically adding datasets by using a combination of after_context_created
and after_catalog_created
hooks.
Just create an "add" method for the dataset that you require and use the docs to see what args are needed. In the example below, I have my data in S3, so I created a method to grab my creds from credentials.yml
and passed them to PickleDataSet
.
import logging
from kedro.config import ConfigLoader
from kedro.framework.project import settings
from kedro.framework.hooks import hook_impl
from kedro.extras.datasets.pickle.pickle_dataset import PickleDataSet
class ProjectHooks:
@property
def _logger(self):
return logging.getLogger(__name__)
@hook_impl
def after_context_created(self, context):
self.project_path = context.project_path
self._logger.info(f"Project path: {self.project_path}")
def _get_credentials(self, key):
conf_path = f"{self.project_path}/{settings.CONF_SOURCE}"
conf_loader = ConfigLoader(conf_source=conf_path, env="local")
return conf_loader.get("credentials*")[key]
def add_pickle_dataset(self, name, folder, layer=None):
self.catalog.add(
data_set_name=name,
data_set=PickleDataSet(
filepath=f"s3://root/data/{folder}/{name}",
credentials=self._get_credentials("dev_s3"),
),
replace=True,
)
if layer:
self.catalog.layers[layer].add(name)
self._logger.info(f"Added dataset '{name}' to the data catalog.")
@hook_impl
def after_catalog_created(self, catalog):
self.catalog = catalog
datasets = self.catalog.load("params:datasets")
for dataset in datasets:
self.add_pickle_dataset(
name=f"{dataset}",
folder="07_model_output",
layer="Model Output",
)