yamldata-ingestiondatahub

How to add Structured Property to data passing through a pipeline in Datahub


I have created a structured property for my 0.14.0.2 instance of Datahub via CLI, but I cannot find any information online about how to apply that property to the datasets that come through my ingestion pipelines. I tried using an add_dataset_properties transformer with the listed property being the id of my structured property, but that just created an unchangeable custom property.

Is there a way to add a structured property to every dataset that passes through a pipeline automatically so that the value can be changed in the UI after the fact, or can you only apply it manually, after the data has been ingested?

Edit:

transformer:

- type: 'add_dataset_properties'
  config:
    semantics: PATCH
    add_properties_resolver_class: structured_property-resolver-file.PropertyResolverClass

python code:

from typing import Dict
from datahub.ingestion.transformer.add_dataset_properties import AddDatasetPropertiesResolverBase

class PropertyResolverClass(AddDatasetPropertiesResolverBase):
    def get_properties_to_add(self, entity_urn: str) -> Dict[str, str]:
        return {"structuredPropertyName": "desired default value"}

Solution

  • As far as I can tell, this is not possible. However, in the interest of helping others attempting a similar operation, I found Datahub Actions to be the best method for accomplishing this.

    Action yaml/yml file:

    name: structured-property-applier
    source:
      type: "kafka"
      config:
        connection:
          bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}
          schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081}
    action:
      type: "<path to python file>:StructuredPropertyAction"
      config:
        gms_endpoint: $GMS_ENDPOINT
        gms_token: $GMS_TOKEN
        datahub_rest_endpoint: $DATAHUB_REST_ENDPOINT
        datahub_token: $DATAHUB_TOKEN
        allowed_asset_types:
          - dataset
    

    Action python file:

    from types import SimpleNamespace
    from datahub_actions.action.action import Action
    from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
    from datahub_actions.event.event_envelope import EventEnvelope
    from datahub_actions.pipeline.pipeline_context import PipelineContext
    from datahub.metadata.schema_classes import (
        DataProductPropertiesClass, DatasetPropertiesClass, StructuredPropertiesClass, StructuredPropertyValueAssignmentClass
    )
    
    from datahub.emitter.rest_emitter import DatahubRestEmitter
    from datahub.emitter.mcp import MetadataChangeProposalWrapper
    
    structured_properties_aspect_info = StructuredPropertyValueAssignmentClass(
        propertyUrn = "<REPLACE WITH STRUCTURED PROPERTY URN>",
        values = [<REPLACE WITH DESIRED DEFAULT VALUE>]
    )
    
    structured_properties_aspect = StructuredPropertiesClass(
        properties = [structured_properties_aspect_info]
    )
    
    ENTITY_TYPE_TO_CLASS_MAP = {
        "dataset": DatasetPropertiesClass,
        "dataproduct": DataProductPropertiesClass
    }
    
    def get_class_from_type(entity_type: str):
        return ENTITY_TYPE_TO_CLASS_MAP.get(entity_type.lower())
    
    class StructuredPropertyAction(Action):
        def __init__(self, context: PipelineContext, config: dict) -> None:
            graph_config = DatahubClientConfig(
                server=config['datahub_rest_endpoint'],
                token=config.pop('datahub_token', None)
            )
            self.graph = DataHubGraph(graph_config)
            self.ctx = context
            self.gms_endpoint = config.get("gms_endpoint")
            self.gms_token = config.get("gms_token")
            self.allowed_assets = config.get("allowed_asset_types", [])
    
            if config.get("update_old_entities"):
                self.update_old_entities()
    
        @classmethod
        def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action":
            return cls(ctx, config_dict)
    
        def close(self) -> None:
            print("Closing action...")
    
        def fetch_entity(self, entity_urn: str, entity_type: str):
            try:
                entity_class = get_class_from_type(entity_type)
                entity = self.graph.get_aspect(entity_urn, entity_class)
                if entity is None:
                    return SimpleNamespace()
                return entity
            except Exception as e:
                print(f"Error fetching entity {entity_urn}: {e}")
                return None
    
        def act(self, event: EventEnvelope) -> None:
            entity_type = event.event.get("entityType", "").lower()
            #These filters can all be changed depending on what you're aiming for
            if entity_type not in self.allowed_assets:
                return
            category = event.event.get("category")
            if category != "LIFECYCLE":
                return
            operation = event.event.get("operation")
            change = event.event.get("changeType")
            if operation != "ADD" and operation != "CREATE" and change != "UPSERT":
                return
            urn = event.event.get("entityUrn")
            print(f"Added classification to {urn}.")
            self.add_classification(urn, entity_type)
    
        def add_classification(self, entity_urn: str, entity_type: str) -> None:
            entity = self.fetch_entity(entity_urn, entity_type)
            if entity is None:
                return
    
            metadata_change_proposal = MetadataChangeProposalWrapper(
                entityUrn = entity_urn,
                changeType = "UPSERT",
                entityType = entity_type,
                aspect = structured_properties_aspect
            )
    
            event_emitter = DatahubRestEmitter(self.gms_endpoint, token=self.gms_token)
            event_emitter.emit_mcp(metadata_change_proposal)
            event_emitter.close()
    

    When this action is running, all new datasets will be tagged with the structured property.

    One note: an error I encountered When trying to run this is that the python file name and python class name much match; for instance, if your python file was example_python_file.py, then you would want to name your Action class ExamplePythonFile and have the action: type: in the yml file be example_python_file:ExamplePythonFile (assuming they're in the same directory).