
Using parameters and dynamic content in pre-SQL script for Azure Data Factory data flow sink transformation

I have a pipeline parameter called query_sink (type string) it comes from a database and the posible values for the parameter could be

possible values

The second record is something like IF EXISTS(...) DELETE FROM table1 WHERE country = 1

So, I want to use a dataflow where in the sink transformation use the parameter query_sink.

Then use it in the pre SQL script in the sink transformation, For now I just pass the parameter without changes like this

but I have problems when the value in the database is null

'sink1': There are no batches in the input script."

I'm not sure if the parameter in that case is null or is it as if the parameter didn't receive anything. What I need is in the case the parameter value is a query(second record of the first image) execut it, but in the other case(first record of the first image) do nothing.

Edit: the input for the pipeline parameter receive this value


    You can pass the below dynamic content to the dataflow parameter where query is a pipeline parameter.

    @if(greater(length(pipeline().parameters.query),4), concat('''',pipeline().parameters.query,''''),concat('''','DECLARE @id AS INT;',''''))

    Give this to a string variable and pass the string to the dataflow parameter like below.

    This is my source table:

    target table final_target2 before deleting:

    Target table when query parameter value is delete from final_target2 where country=1;.

    You can see the records with country=1 were deleted and new records from final_source1 were inserted above.

    Target table when query parameter value is NULL. Here in this case I am Executing a sample query DECLARE @id AS INT; which does nothing to target table.

    My pipeline JSON:

        "name": "pipeline1",
        "properties": {
            "activities": [
                    "name": "Set variable1",
                    "type": "SetVariable",
                    "dependsOn": [],
                    "userProperties": [],
                    "typeProperties": {
                        "variableName": "var",
                        "value": {
                            "value": "@if(greater(length(pipeline().parameters.query),4), concat('''',pipeline().parameters.query,''''),concat('''','DECLARE @id AS INT;',''''))",
                            "type": "Expression"
                    "name": "Data flow1",
                    "type": "ExecuteDataFlow",
                    "dependsOn": [
                            "activity": "Set variable1",
                            "dependencyConditions": [
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    "userProperties": [],
                    "typeProperties": {
                        "dataflow": {
                            "referenceName": "dataflow1",
                            "type": "DataFlowReference",
                            "parameters": {
                                "query_sink": {
                                    "value": "@variables('var')",
                                    "type": "Expression"
                        "compute": {
                            "coreCount": 8,
                            "computeType": "General"
                        "traceLevel": "Fine"
            "parameters": {
                "query": {
                    "type": "string",
                    "defaultValue": "NULL"
            "variables": {
                "var": {
                    "type": "String"
            "annotations": [],
            "lastPublishTime": "2023-03-27T15:35:50Z"
        "type": "Microsoft.DataFactory/factories/pipelines"

    My dataflow JSON:

        "name": "dataflow1",
        "properties": {
            "type": "MappingDataFlow",
            "typeProperties": {
                "sources": [
                        "dataset": {
                            "referenceName": "source2_table",
                            "type": "DatasetReference"
                        "name": "source1"
                "sinks": [
                        "dataset": {
                            "referenceName": "final_target",
                            "type": "DatasetReference"
                        "name": "sink1"
                "transformations": [],
                "scriptLines": [
                    "     query_sink as string",
                    "          id as integer,",
                    "          country as integer",
                    "     ),",
                    "     allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     isolationLevel: 'READ_UNCOMMITTED',",
                    "     format: 'table') ~> source1",
                    "source1 sink(allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     input(",
                    "          name as string",
                    "     ),",
                    "     deletable:false,",
                    "     insertable:true,",
                    "     updateable:false,",
                    "     upsertable:false,",
                    "     format: 'table',",
                    "     preSQLs:[($query_sink)],",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true,",
                    "     errorHandlingOption: 'stopOnFirstError') ~> sink1"