azureazure-data-factorytransformationdenormalization

Dynamic mapping arbitrary delimited files to single Json structure in ADF


I am looking for help implementing a “know nothing” import and load routine in Azure. Input would be, load-over-load, an arbitrary delimited file of N columns and M rows. Output would be to a Json map that essentially says, “[{row1, field1<>, dataValue}, {row1, field2, dataValue}, … {rowN, fieldM, dataValue}]. For example, assume an incoming pipe-delimited file named EMPS2024 as follows:

EMPID|EMPNAME|STARTDT|ENDDT|SALARY
1234|Jane Jones|2019-01-01|NULL|100000
2215|Don Davis|2010-04-01|2023-12-31|99000
7448|Mary Mays|2008-09-30|NULL|70000

JSON OUTPUT: { “filename” : “EMPS2024”, “Data” : [ {“Row” : “1”, “Field” : “EMPID”, “DataValue” : “1234” }, {“Row” : “1”, “Field” : “EMPNAME”, “DataValue” : “Jane Jones” }, {“Row”:1, “Field” : “STARTDT”, “DataValue” : “2019-01-01” }, {“Row” : “1”, “Field” : “ENDDT”, “DataValue” : “NULL” }, {“Row” : “1”, “Field” : ”SALARY”, “DataValue” : “100000” }, {“Row” : “2”, “Field” : “EMPID”, “DataValue” : “2215” }, {“Row” : “2”, “Field” : “EMPNAME”, “DataValue” : “Don Davis” }, {“Row” : “2”, “Field” : “STARTDT”, “DataValue” : “2010-04-01” }, {“Row” : “2”, “Field” : “ENDDT”, “DataValue” : “2023-12-31” }, {“Row” : “2”, “Field” : “SALARY”, “DataValue” : “99000” }, {“Row” : “3”, “Field” : “EMPID”, “DataValue” : “7448” }, {“Row” : “3”, “Field”: “EMPNAME”, “DataValue” : “Mary Mays” }, {“Row” : “3”, “Field” : “STARTDT”, “DataValue” : “2008-09-30” }, {“Row” : “3”, “Field” : “ENDDT”, “DataValue” : “NULL” }, {“Row” : “3”, “Field” : “SALARY”, “DataValue” : “70000” } ] }

The next file might be an Expenses file with 35 unrelated columns, the file after might be Sales with 20 unrelated columns, etc. The only thing in common is that they are all delimited text, and they will all be mapped to a common output of Row, Field, Value. What might come up next could be a unicorn, e.g., "GreatLakesFishes", content not known until runtime, but the delimited file can be written to the uniform Json output.

Can this be done in ADF?

We have had some success writing an "AnyDelimitedInput" routine using Adeptia, but have a need to do something via ADF into an Azure SQL Database. The output could just as easily be a heap table with structure FileName|RowNum|FieldName|DataValue.


Solution

  • You can use ADF mapping dataflow like below to achieve your requirement.

    First make sure the source dataset mapping is clear both in the dataset and in the dataflow source as well. By making this, the same dataflow can be used for different input files.

    After taking the source follow the below transformations:

    This is my complete Dataflow JSON which you can use in building yours:

    {
        "name": "dataflow2",
        "properties": {
            "type": "MappingDataFlow",
            "typeProperties": {
                "sources": [
                    {
                        "dataset": {
                            "referenceName": "DelimitedText2",
                            "type": "DatasetReference"
                        },
                        "name": "source1"
                    }
                ],
                "sinks": [
                    {
                        "dataset": {
                            "referenceName": "Json1",
                            "type": "DatasetReference"
                        },
                        "name": "sink1"
                    }
                ],
                "transformations": [
                    {
                        "name": "surrogateKeyaddsKey"
                    },
                    {
                        "name": "derivedColumnbuildsJSONstring"
                    },
                    {
                        "name": "ColumnnamesColumnsVaues"
                    },
                    {
                        "name": "selectforDataColumn"
                    },
                    {
                        "name": "flattensDataColumn"
                    },
                    {
                        "name": "derivedColumnForFilename"
                    },
                    {
                        "name": "parsetheDatatoJSON"
                    },
                    {
                        "name": "aggregateForArrayOfObjects"
                    }
                ],
                "scriptLines": [
                    "source(allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     ignoreNoFilesFound: false) ~> source1",
                    "ColumnnamesColumnsVaues keyGenerate(output(key as long),",
                    "     startAt: 1L,",
                    "     stepValue: 1L) ~> surrogateKeyaddsKey",
                    "surrogateKeyaddsKey derive(Data = mapIndex(cols,concat('{\"Key\":\"',toString(key),'\",\"Field\":\"',#item,'\",\"DataValue\":\"',one[#index],'\"}'))) ~> derivedColumnbuildsJSONstring",
                    "source1 derive(cols = columnNames(),",
                    "          one = split(concatWS(',', toString(columns())),',')) ~> ColumnnamesColumnsVaues",
                    "derivedColumnbuildsJSONstring select(mapColumn(",
                    "          each(match(name=='Data'))",
                    "     ),",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true) ~> selectforDataColumn",
                    "selectforDataColumn foldDown(unroll(Data, Data),",
                    "     mapColumn(",
                    "          Data",
                    "     ),",
                    "     skipDuplicateMapInputs: false,",
                    "     skipDuplicateMapOutputs: false) ~> flattensDataColumn",
                    "aggregateForArrayOfObjects derive(filename = 'sample_blank.csv') ~> derivedColumnForFilename",
                    "flattensDataColumn parse(Data = Data ? (Key as string,",
                    "          Field as string,",
                    "          DataValue as string),",
                    "     format: 'json',",
                    "     documentForm: 'arrayOfDocuments') ~> parsetheDatatoJSON",
                    "parsetheDatatoJSON aggregate(Data = collect(@(Key=Data.Key,Field=Data.Field,Datavalue=Data.DataValue))) ~> aggregateForArrayOfObjects",
                    "derivedColumnForFilename sink(allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     partitionFileNames:['res2.json'],",
                    "     umask: 0022,",
                    "     preCommands: [],",
                    "     postCommands: [],",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true,",
                    "     partitionBy('hash', 1)) ~> sink1"
                ]
            }
        }
    }
    

    Execute the dataflow from ADF pipeline and you will get the desired JSON output file like below.

    enter image description here

    I have checked this for multiple files with different number of rows and columns and it is giving the expected results.