yamldata-ingestiondatahub

How to add Structured Property to data passing through a pipeline in Datahub


I have created a structured property for my 0.14.0.2 instance of Datahub via CLI, but I cannot find any information online about how to apply that property to the datasets that come through my ingestion pipelines. I tried using an add_dataset_properties transformer with the listed property being the id of my structured property, but that just created an unchangeable custom property.

Is there a way to add a structured property to every dataset that passes through a pipeline automatically so that the value can be changed in the UI after the fact, or can you only apply it manually, after the data has been ingested?

Edit:

transformer:

- type: 'add_dataset_properties'
  config:
    semantics: PATCH
    add_properties_resolver_class: structured_property-resolver-file.PropertyResolverClass

python code:

from typing import Dict
from datahub.ingestion.transformer.add_dataset_properties import AddDatasetPropertiesResolverBase

class PropertyResolverClass(AddDatasetPropertiesResolverBase):
    def get_properties_to_add(self, entity_urn: str) -> Dict[str, str]:
        return {"structuredPropertyName": "desired default value"}

Solution

  • EDIT:

    There is actually a document for this that I stumbled across by accident. I do not know if it was posted recently, if Google was conspiring to hide it from me, or what, but it lays out how to do this quite simply. My answer (from before this edit) had the slight problem of overwriting all the other data of the dataset, and so did not actually answer the question properly. The code listed on their documents is not inherently an action, but it can easily be configured as such.

    # Inlined from /metadata-ingestion/examples/library/dataset_add_structured_properties_patch.py
    from datahub.emitter.mce_builder import make_dataset_urn
    from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig
    from datahub.specific.dataset import DatasetPatchBuilder
    
    # Create DataHub Client
    datahub_client = DataHubGraph(DataHubGraphConfig(server="http://localhost:8080"))
    
    # Create Dataset URN
    dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")
    
    # Create Dataset Patch to Add and Remove Structured Properties
    patch_builder = DatasetPatchBuilder(dataset_urn)
    patch_builder.add_structured_property(
        "urn:li:structuredProperty:retentionTimeInDays", 12
    )
    patch_builder.remove_structured_property(
        "urn:li:structuredProperty:customClassification"
    )
    patch_mcps = patch_builder.build()
    
    # Emit Dataset Patch
    for patch_mcp in patch_mcps:
        datahub_client.emit(patch_mcp)