I have a pipeline parameter called query_sink (type string) it comes from a database and the posible values for the parameter could be
The second record is something like IF EXISTS(...) DELETE FROM table1 WHERE country = 1
So, I want to use a dataflow where in the sink transformation use the parameter query_sink.
Then use it in the pre SQL script in the sink transformation, For now I just pass the parameter without changes like this
but I have problems when the value in the database is null
'sink1': There are no batches in the input script."
I'm not sure if the parameter in that case is null or is it as if the parameter didn't receive anything. What I need is in the case the parameter value is a query(second record of the first image) execut it, but in the other case(first record of the first image) do nothing.
Edit: the input for the pipeline parameter receive this value
What I need is in the case the parameter value is a query(second record of the first image) execut it, but in the other case(first record of the first image) do nothing.
You can pass the below dynamic content to the dataflow parameter where query
is a pipeline parameter.
@if(greater(length(pipeline().parameters.query),4), concat('''',pipeline().parameters.query,''''),concat('''','DECLARE @id AS INT;',''''))
Give this to a string variable and pass the string to the dataflow parameter like below.
This is my source table:
target table final_target2
before deleting:
Target table when query parameter value is delete from final_target2 where country=1;
.
You can see the records with country=1
were deleted and new records from final_source1
were inserted above.
Target table when query parameter value is NULL
. Here in this case I am Executing a sample query DECLARE @id AS INT;
which does nothing to target table.
My pipeline JSON:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Set variable1",
"type": "SetVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "var",
"value": {
"value": "@if(greater(length(pipeline().parameters.query),4), concat('''',pipeline().parameters.query,''''),concat('''','DECLARE @id AS INT;',''''))",
"type": "Expression"
}
}
},
{
"name": "Data flow1",
"type": "ExecuteDataFlow",
"dependsOn": [
{
"activity": "Set variable1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataflow": {
"referenceName": "dataflow1",
"type": "DataFlowReference",
"parameters": {
"query_sink": {
"value": "@variables('var')",
"type": "Expression"
}
}
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "Fine"
}
}
],
"parameters": {
"query": {
"type": "string",
"defaultValue": "NULL"
}
},
"variables": {
"var": {
"type": "String"
}
},
"annotations": [],
"lastPublishTime": "2023-03-27T15:35:50Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
My dataflow JSON:
{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "source2_table",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "final_target",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [],
"scriptLines": [
"parameters{",
" query_sink as string",
"}",
"source(output(",
" id as integer,",
" country as integer",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" isolationLevel: 'READ_UNCOMMITTED',",
" format: 'table') ~> source1",
"source1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" input(",
" name as string",
" ),",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" format: 'table',",
" preSQLs:[($query_sink)],",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> sink1"
]
}
}
}