jsonazure-data-factory

JSON restructuring using ADF


I have a business requirement to process a JSON API using Azure Data Factory, preferably avoiding alternative code-based functions like Azure Functions. The API has the form:

{ 
  "parentProperty": {
       "key1": {
           "key2": {
               "property1": x,
               "property2: y
           },
           "key3": {
               "property1": z,
               "property2: a
           }
       },
       "key4": {
           "key5": {
               "property1": b,
               "property2: c
           },
           "key6": {
               "property1": d,
               "property2: e
           }
       }
  }
}

This needs to be transformed into a table like:

Column1 Column2 Column3 Column4
key1 key2 x y
key1 key3 z a
key4 key5 b c
key4 key6 d e

This would be very easy in code such as a function, but the operators in Azure Data Factory looked limited for this type of transformations. Is there a way to use the native operators to transform this JSON?

Update Under current testing, the source and sinks are:

Source: JSON Blob Storage Sink: Delimited Text Blob Storage

Transformations Attempted


Solution

  • You can achieve your requirement using the combination of select, derivedColumn and filter transformations in the dataflow like below.

    But this approach will only work for this structure(3 level) and it will involve some manual operations.

    Follow the below transformations in the dataflow.


    Result csv file:

    enter image description here

    This is my Dataflow JSON for your reference:

    {
        "name": "dataflow6",
        "properties": {
            "type": "MappingDataFlow",
            "typeProperties": {
                "sources": [
                    {
                        "dataset": {
                            "referenceName": "connstruct_json",
                            "type": "DatasetReference"
                        },
                        "name": "source1"
                    }
                ],
                "sinks": [
                    {
                        "dataset": {
                            "referenceName": "construct_csv",
                            "type": "DatasetReference"
                        },
                        "name": "sink1"
                    }
                ],
                "transformations": [
                    {
                        "name": "select1"
                    },
                    {
                        "name": "derivedColumn1"
                    },
                    {
                        "name": "derivedColumn2"
                    },
                    {
                        "name": "select2"
                    },
                    {
                        "name": "derivedColumn3"
                    },
                    {
                        "name": "derivedColumn4"
                    },
                    {
                        "name": "filter1"
                    },
                    {
                        "name": "filter2"
                    },
                    {
                        "name": "derivedColumn5"
                    },
                    {
                        "name": "select3"
                    }
                ],
                "scriptLines": [
                    "source(output(",
                    "          parentProperty as (key1 as (key2 as (property1 as string, property2 as string), key3 as (property1 as string, property2 as string)), key4 as (key5 as (property1 as string, property2 as string), key6 as (property1 as string, property2 as string)))",
                    "     ),",
                    "     allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     ignoreNoFilesFound: false,",
                    "     documentForm: 'singleDocument') ~> source1",
                    "derivedColumn2 select(mapColumn(",
                    "          each(parentProperty,match(true())),",
                    "          myjson",
                    "     ),",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true) ~> select1",
                    "select1 derive(columns = filter(columnNames(),(#item!='myjson')),",
                    "          names = slice(map(split(myjson,'\":{'),split(#item,'\"')[size(split(#item,'\"'))]),1,size(map(split(myjson,'\":{'),split(#item,'\"')[size(split(#item,'\"'))]))-1)) ~> derivedColumn1",
                    "source1 derive(myjson = toString(parentProperty)) ~> derivedColumn2",
                    "derivedColumn1 select(mapColumn(",
                    "          myjson,",
                    "          columns,",
                    "          names",
                    "     ),",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true) ~> select2",
                    "select2 derive(column1 = toString(unfold(columns))) ~> derivedColumn3",
                    "derivedColumn3 derive(column2 = toString(unfold(names))) ~> derivedColumn4",
                    "derivedColumn4 filter((column1!=column2) && (!contains(columns,#item==column2))) ~> filter1",
                    "filter1 filter((toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2])>toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2]))&&(divide(size(names),size(columns)))>(minus(toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2]),toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2])))) ~> filter2",
                    "filter2 derive(column3 = replace(split(split(split(myjson,column2)[2],':')[3],',')[1],'\"',''),",
                    "          column4 = replace(replace(split(split(split(myjson,column2)[2],':')[4],',')[1],'\"',''),'}','')) ~> derivedColumn5",
                    "derivedColumn5 select(mapColumn(",
                    "          column1,",
                    "          column2,",
                    "          column3,",
                    "          column4",
                    "     ),",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true) ~> select3",
                    "select3 sink(allowSchemaDrift: true,",
                    "     validateSchema: false,",
                    "     umask: 0022,",
                    "     preCommands: [],",
                    "     postCommands: [],",
                    "     skipDuplicateMapInputs: true,",
                    "     skipDuplicateMapOutputs: true) ~> sink1"
                ]
            }
        }
    }