azure-data-factoryazure-cosmosdb

Azure Data Factory: Doing date comparisons against cosmos DB not working in Data Flow with parameter


So I'm sending a variable to my dataflow's parameter called LastRunTimestamp as a string. The timestamp is in this format 2024-11-21T00:00:00.0000000Z

My query is something like "SELECT * FROM c WHERE c.Discriminator = 'D1' AND c.InsertDateTime > '$LastRunTimestamp'"

The query doesn't error out, but it's definitely disregarding the parameter. If I hard code or test this in data explorer, it works. But I just cannot get this to work as a parameter.

I've tried quite a few different ways of doing this including concat('SELECT * FROM c WHERE c.InsertDateTime > ','D1',' AND c.InsertDateTime > ',$LastRunTimestamp)

but every time I get the query to work, it seems to always disregard the parameter. I've also tried concatenating the string without using the concat function etc. I don't know what else I can try.

JSON Definition of data flow

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "ContainerTesting",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "TestTable",
                        "type": "DatasetReference"
                    },
                    "name": "sink2"
                },
                {
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "name": "sink3"
                }
            ],
            "transformations": [
                {
                    "name": "derivedColumn1"
                },
                {
                    "name": "derivedColumn2"
                }
            ],
            "scriptLines": [
                "parameters{",
                "     LastRunTimestamp as string,",
                "}",
                "source(output(",
                "          Id as integer,",
                "          UpdateDateTime as string,",
                "          PartitionKey as integer,",
                "          Discriminator as string,",
                "          InsertDateTime as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     limit: 100,",
                "     query: ("SELECT c.Id, c.Discriminator, c.PartitionKey, c.InsertDateTime, c.UpdateDateTime FROM c where c.Discriminator = 'D1' AND c.InsertDateTime > '$LastRunTimestamp'"),",
                "     format: 'documentQuery',",
                "     systemColumns: false) ~> source1",
                "source1 derive(Blob_Location = concat('REDACTED', toString(Id), '.json'),",
                "          Load_Time = toString(currentUTC(),'yyyy-MM-dd HH:mm:ss'),",
                "          LastTimestamp = $LastRunTimestamp) ~> derivedColumn1",
                "source1 derive(IdAsString = toString(Id)) ~> derivedColumn2",
                "derivedColumn1 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     errorHandlingOption: 'stopOnFirstError') ~> sink2",
                "derivedColumn2 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     rowUrlColumn:'IdAsString',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     mapColumn(",
                "          Id,",
                "          UpdateDateTime,",
                "          PartitionKey,",
                "          Discriminator,",
                "          InsertDateTime,",
                "          IdAsString",
                "     )) ~> sink3"
            ]
        }
    }
}

Solution

  • To add parameter in the query with dynamic expression for Cosmos db you need to pass the sting parameter enclosed in Quotes.

    For this you need to pass the query like below:

    concat("SELECT * FROM c WHERE c.Discriminator = 'D1' AND c.InsertDateTime > '" ,$LastRunTimestamp,"'")
    

    so, it will work as below, and dataflow will take parameter value correctly in query:

    SELECT * FROM c WHERE c.Discriminator = 'D1' AND c.InsertDateTime > ' $LastRunTimestamp_value'

    enter image description here enter image description here