azure-data-factoryazure-mapping-data-flow

ADF Column Name Validation and Data Validation


I am trying to add some validation to my ADF pipeline. Is there a way to achieve the following validation in ADF?

  1. Validate column header and return error message. There is a list of required column names that I need to check against the raw Excel file. For example, the raw file might have column A,B,C,D, but the required columns are A,B,E. So is there a way to validate and return an error message that the column E is missing in the raw file?
  2. Validate the data type in data mapping flow, if column A should be a numeric field but some of the cells have text in it, or column B should be datetime type but has a number in it. Is there a way to check values in each row and return error message if the data validation fails on that row?

Solution

  • Adding to @Nandan, you can use Get Meta data activity structure like below.

    This is my repro for your reference:

    First, I have used 2 parameters for column names and Data types.

    enter image description here

    Get Meta data activity:

    enter image description here

    Get Meta activity output array:

    enter image description here

    Then I have created two arrays to get the above names and columns using forEach.

    Then I have used two filter activities to filter the above parameter arrays.

    The used if activity to check the parameter arrays length and filter activity output arrays lengths.

    If its true, the inside True activities you can use your copy activity or Data flow as per your requirement. Inside False activities, use a fail activity.

    My pipeline JSON:

    {
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Get Metadata1",
                "type": "GetMetadata",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "dataset": {
                        "referenceName": "Excel1",
                        "type": "DatasetReference"
                    },
                    "fieldList": [
                        "structure"
                    ],
                    "storeSettings": {
                        "type": "AzureBlobFSReadSettings",
                        "enablePartitionDiscovery": false
                    }
                }
            },
            {
                "name": "Filtering names",
                "type": "Filter",
                "dependsOn": [
                    {
                        "activity": "Getting names and columns as list",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@pipeline().parameters.names",
                        "type": "Expression"
                    },
                    "condition": {
                        "value": "@contains(variables('namesvararray'),item())",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "Filtering types",
                "type": "Filter",
                "dependsOn": [
                    {
                        "activity": "Filtering names",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@variables('typevararray')",
                        "type": "Expression"
                    },
                    "condition": {
                        "value": "@contains(variables('typevararray'), item())",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "Getting names and columns as list",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Get Metadata1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('Get Metadata1').output.structure",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Append names",
                            "type": "AppendVariable",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "namesvararray",
                                "value": {
                                    "value": "@item().name",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Append types",
                            "type": "AppendVariable",
                            "dependsOn": [
                                {
                                    "activity": "Append names",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "typevararray",
                                "value": {
                                    "value": "@item().type",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "If Condition1",
                "type": "IfCondition",
                "dependsOn": [
                    {
                        "activity": "Filtering types",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "expression": {
                        "value": "@and(equals(length(pipeline().parameters.names),activity('Filtering names').output.FilteredItemsCount),equals(length(pipeline().parameters.columns),activity('Filtering types').output.FilteredItemsCount))",
                        "type": "Expression"
                    },
                    "ifFalseActivities": [
                        {
                            "name": "Fail1",
                            "type": "Fail",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "message": "Some of the headers or types are not as required",
                                "errorCode": "240"
                            }
                        }
                    ],
                    "ifTrueActivities": [
                        {
                            "name": "Set variable1",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "sample",
                                "value": "All good"
                            }
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "names": {
                "type": "array",
                "defaultValue": [
                    "A",
                    "B",
                    "C"
                ]
            },
            "columns": {
                "type": "array",
                "defaultValue": [
                    "String",
                    "String",
                    "String"
                ]
            }
        },
        "variables": {
            "namesvararray": {
                "type": "Array"
            },
            "typevararray": {
                "type": "Array"
            },
            "sample": {
                "type": "String"
            }
        },
        "annotations": []
    }
    }
    

    My pipeline failed and got error:

    enter image description here