I have created a structured property for my 0.14.0.2 instance of Datahub via CLI, but I cannot find any information online about how to apply that property to the datasets that come through my ingestion pipelines. I tried using an add_dataset_properties transformer with the listed property being the id of my structured property, but that just created an unchangeable custom property.
Is there a way to add a structured property to every dataset that passes through a pipeline automatically so that the value can be changed in the UI after the fact, or can you only apply it manually, after the data has been ingested?
Edit:
transformer:
- type: 'add_dataset_properties'
config:
semantics: PATCH
add_properties_resolver_class: structured_property-resolver-file.PropertyResolverClass
python code:
from typing import Dict
from datahub.ingestion.transformer.add_dataset_properties import AddDatasetPropertiesResolverBase
class PropertyResolverClass(AddDatasetPropertiesResolverBase):
def get_properties_to_add(self, entity_urn: str) -> Dict[str, str]:
return {"structuredPropertyName": "desired default value"}
As far as I can tell, this is not possible. However, in the interest of helping others attempting a similar operation, I found Datahub Actions to be the best method for accomplishing this.
Action yaml/yml file:
name: structured-property-applier
source:
type: "kafka"
config:
connection:
bootstrap: ${KAFKA_BOOTSTRAP_SERVER:-localhost:9092}
schema_registry_url: ${SCHEMA_REGISTRY_URL:-http://localhost:8081}
action:
type: "<path to python file>:StructuredPropertyAction"
config:
gms_endpoint: $GMS_ENDPOINT
gms_token: $GMS_TOKEN
datahub_rest_endpoint: $DATAHUB_REST_ENDPOINT
datahub_token: $DATAHUB_TOKEN
allowed_asset_types:
- dataset
Action python file:
from types import SimpleNamespace
from datahub_actions.action.action import Action
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub_actions.event.event_envelope import EventEnvelope
from datahub_actions.pipeline.pipeline_context import PipelineContext
from datahub.metadata.schema_classes import (
DataProductPropertiesClass, DatasetPropertiesClass, StructuredPropertiesClass, StructuredPropertyValueAssignmentClass
)
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.emitter.mcp import MetadataChangeProposalWrapper
structured_properties_aspect_info = StructuredPropertyValueAssignmentClass(
propertyUrn = "<REPLACE WITH STRUCTURED PROPERTY URN>",
values = [<REPLACE WITH DESIRED DEFAULT VALUE>]
)
structured_properties_aspect = StructuredPropertiesClass(
properties = [structured_properties_aspect_info]
)
ENTITY_TYPE_TO_CLASS_MAP = {
"dataset": DatasetPropertiesClass,
"dataproduct": DataProductPropertiesClass
}
def get_class_from_type(entity_type: str):
return ENTITY_TYPE_TO_CLASS_MAP.get(entity_type.lower())
class StructuredPropertyAction(Action):
def __init__(self, context: PipelineContext, config: dict) -> None:
graph_config = DatahubClientConfig(
server=config['datahub_rest_endpoint'],
token=config.pop('datahub_token', None)
)
self.graph = DataHubGraph(graph_config)
self.ctx = context
self.gms_endpoint = config.get("gms_endpoint")
self.gms_token = config.get("gms_token")
self.allowed_assets = config.get("allowed_asset_types", [])
if config.get("update_old_entities"):
self.update_old_entities()
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Action":
return cls(ctx, config_dict)
def close(self) -> None:
print("Closing action...")
def fetch_entity(self, entity_urn: str, entity_type: str):
try:
entity_class = get_class_from_type(entity_type)
entity = self.graph.get_aspect(entity_urn, entity_class)
if entity is None:
return SimpleNamespace()
return entity
except Exception as e:
print(f"Error fetching entity {entity_urn}: {e}")
return None
def act(self, event: EventEnvelope) -> None:
entity_type = event.event.get("entityType", "").lower()
#These filters can all be changed depending on what you're aiming for
if entity_type not in self.allowed_assets:
return
category = event.event.get("category")
if category != "LIFECYCLE":
return
operation = event.event.get("operation")
change = event.event.get("changeType")
if operation != "ADD" and operation != "CREATE" and change != "UPSERT":
return
urn = event.event.get("entityUrn")
print(f"Added classification to {urn}.")
self.add_classification(urn, entity_type)
def add_classification(self, entity_urn: str, entity_type: str) -> None:
entity = self.fetch_entity(entity_urn, entity_type)
if entity is None:
return
metadata_change_proposal = MetadataChangeProposalWrapper(
entityUrn = entity_urn,
changeType = "UPSERT",
entityType = entity_type,
aspect = structured_properties_aspect
)
event_emitter = DatahubRestEmitter(self.gms_endpoint, token=self.gms_token)
event_emitter.emit_mcp(metadata_change_proposal)
event_emitter.close()
When this action is running, all new datasets will be tagged with the structured property.
One note: an error I encountered When trying to run this is that the python file name and python class name much match; for instance, if your python file was example_python_file.py
, then you would want to name your Action class ExamplePythonFile
and have the action: type: in the yml file be example_python_file:ExamplePythonFile
(assuming they're in the same directory).