pythonkedro

How can I register kedro data catalog programmatically in Kedro 0.18?


For various reasons (mainly ability to dynamically construct file paths) I like to define the data catalog programatically, and not use yaml file to define datasets e.g.

DataCatalog(
    {"products": ParquetDataSet(filepath=f{PREFIX}/products.parquet") 
...
})

In kedro 0.17 there was an easy way to register the catalog in order to use it's datasets in pipeline definition, via register_catalog hook.

However in 0.18 this hook is absent and there is no RegistrationSpecs section among hook specifications.

What is there a way to replace register_catalog in kedro 0.18?

I searched the documentation and tried to implement catalog in yaml file, but that doesn't fit project's requirements. TemplatedConfigLoader is probably an option, but i does't allow to implement custom logic, constructing paths to datasets.


Solution

  • Not sure if this is the precise functionality that you're looking for, but I've been programmatically adding datasets by using a combination of after_context_created and after_catalog_created hooks.

    Just create an "add" method for the dataset that you require and use the docs to see what args are needed. In the example below, I have my data in S3, so I created a method to grab my creds from credentials.yml and passed them to PickleDataSet.

    import logging
    from kedro.config import ConfigLoader
    from kedro.framework.project import settings
    from kedro.framework.hooks import hook_impl
    from kedro.extras.datasets.pickle.pickle_dataset import PickleDataSet
    
    
    class ProjectHooks:
        @property
        def _logger(self):
            return logging.getLogger(__name__)
    
        @hook_impl
        def after_context_created(self, context):
            self.project_path = context.project_path
            self._logger.info(f"Project path: {self.project_path}")
    
        def _get_credentials(self, key):
            conf_path = f"{self.project_path}/{settings.CONF_SOURCE}"
            conf_loader = ConfigLoader(conf_source=conf_path, env="local")
            return conf_loader.get("credentials*")[key]
    
        def add_pickle_dataset(self, name, folder, layer=None):
            self.catalog.add(
                data_set_name=name,
                data_set=PickleDataSet(
                    filepath=f"s3://root/data/{folder}/{name}",
                    credentials=self._get_credentials("dev_s3"),
                ),
                replace=True,
            )
            if layer:
                self.catalog.layers[layer].add(name)
            self._logger.info(f"Added dataset '{name}' to the data catalog.")
    
        @hook_impl
        def after_catalog_created(self, catalog):
            self.catalog = catalog
            datasets = self.catalog.load("params:datasets")
            for dataset in datasets:
                self.add_pickle_dataset(
                    name=f"{dataset}",
                    folder="07_model_output",
                    layer="Model Output",
                )