azure-data-factory

Using parameters and dynamic content in pre-SQL script for Azure Data Factory data flow sink transformation


I have a pipeline parameter called query_sink (type string) it comes from a database and the posible values for the parameter could be

possible values

The second record is something like IF EXISTS(...) DELETE FROM table1 WHERE country = 1

So, I want to use a dataflow where in the sink transformation use the parameter query_sink.

Then use it in the pre SQL script in the sink transformation, For now I just pass the parameter without changes like this

but I have problems when the value in the database is null

'sink1': There are no batches in the input script."

I'm not sure if the parameter in that case is null or is it as if the parameter didn't receive anything. What I need is in the case the parameter value is a query(second record of the first image) execut it, but in the other case(first record of the first image) do nothing.

Edit: the input for the pipeline parameter receive this value


Solution

  • What I need is in the case the parameter value is a query(second record of the first image) execut it, but in the other case(first record of the first image) do nothing.

    You can pass the below dynamic content to the dataflow parameter where query is a pipeline parameter.

    @if(greater(length(pipeline().parameters.query),4), concat('''',pipeline().parameters.query,''''),concat('''','DECLARE @id AS INT;',''''))
    

    Give this to a string variable and pass the string to the dataflow parameter like below.

    enter image description here

    This is my source table:

    enter image description here

    target table final_target2 before deleting:

    enter image description here

    Target table when query parameter value is delete from final_target2 where country=1;.

    enter image description here

    You can see the records with country=1 were deleted and new records from final_source1 were inserted above.

    Target table when query parameter value is NULL. Here in this case I am Executing a sample query DECLARE @id AS INT; which does nothing to target table.

    enter image description here

    My pipeline JSON:

    {
        "name": "pipeline1",
        "properties": {
            "activities": [
                {
                    "name": "Set variable1",
                    "type": "SetVariable",
                    "dependsOn": [],
                    "userProperties": [],
                    "typeProperties": {
                        "variableName": "var",
                        "value": {
                            "value": "@if(greater(length(pipeline().parameters.query),4), concat('''',pipeline().parameters.query,''''),concat('''','DECLARE @id AS INT;',''''))",
                            "type": "Expression"
                        }
                    }
                },
                {
                    "name": "Data flow1",
                    "type": "ExecuteDataFlow",
                    "dependsOn": [
                        {
                            "activity": "Set variable1",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "dataflow": {
                            "referenceName": "dataflow1",
                            "type": "DataFlowReference",
                            "parameters": {
                                "query_sink": {
                                    "value": "@variables('var')",
                                    "type": "Expression"
                                }
                            }
                        },
                        "compute": {
                            "coreCount": 8,
                            "computeType": "General"
                        },
                        "traceLevel": "Fine"
                    }
                }
            ],
            "parameters": {
                "query": {
                    "type": "string",
                    "defaultValue": "NULL"
                }
            },
            "variables": {
                "var": {
                    "type": "String"
                }
            },
            "annotations": [],
            "lastPublishTime": "2023-03-27T15:35:50Z"
        },
        "type": "Microsoft.DataFactory/factories/pipelines"
    }
    

    My dataflow JSON:

    {
        "name": "dataflow1",
        "properties": {
            "type": "MappingDataFlow",
            "typeProperties": {
                "sources": [
                    {
                        "dataset": {
                            "referenceName": "source2_table",
                            "type": "DatasetReference"
                        },
                        "name": "source1"
                    }
                ],
                "sinks": [
                    {
                        "dataset": {
                            "referenceName": "final_target",
                            "type": "DatasetReference"
                        },
                        "name": "sink1"
                    }
                ],
                "transformations": [],
                "scriptLines": [
                    "parameters{",
                    "     query_sink as string",
                    "}",
                    "source(output(",
                    "          id as integer,",
                    "          country as integer",
                    "     ),",
                    "     allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     isolationLevel: 'READ_UNCOMMITTED',",
                    "     format: 'table') ~> source1",
                    "source1 sink(allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     input(",
                    "          name as string",
                    "     ),",
                    "     deletable:false,",
                    "     insertable:true,",
                    "     updateable:false,",
                    "     upsertable:false,",
                    "     format: 'table',",
                    "     preSQLs:[($query_sink)],",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true,",
                    "     errorHandlingOption: 'stopOnFirstError') ~> sink1"
                ]
            }
        }
    }