I have a business requirement to process a JSON API using Azure Data Factory, preferably avoiding alternative code-based functions like Azure Functions. The API has the form:
{
"parentProperty": {
"key1": {
"key2": {
"property1": x,
"property2: y
},
"key3": {
"property1": z,
"property2: a
}
},
"key4": {
"key5": {
"property1": b,
"property2: c
},
"key6": {
"property1": d,
"property2: e
}
}
}
}
This needs to be transformed into a table like:
Column1 | Column2 | Column3 | Column4 |
---|---|---|---|
key1 | key2 | x | y |
key1 | key3 | z | a |
key4 | key5 | b | c |
key4 | key6 | d | e |
This would be very easy in code such as a function, but the operators in Azure Data Factory looked limited for this type of transformations. Is there a way to use the native operators to transform this JSON?
Update Under current testing, the source and sinks are:
Source: JSON Blob Storage Sink: Delimited Text Blob Storage
Transformations Attempted
You can achieve your requirement using the combination of select, derivedColumn and filter transformations in the dataflow like below.
But this approach will only work for this structure(3 level) and it will involve some manual operations.
Follow the below transformations in the dataflow.
Source - Add your JSON dataset here and make sure you select Single document in the JSON settings of the source.
derived Column2 - Create a column myjson
and store the JSON string from parentProperty
with expression toString(parentProperty)
.
select1 - Extract the inner keys in the parentProperty
object using Rule-based mapping as new columns in select transformation.
derived Column1 - Create 2 columns named as columns
and names
. The columns
column will store the array of extracted column names which are the first level key names while the names
column will store the array of all keys in the JSON in the same order.
Use below expressions for those.
columns - filter(columnNames(),(#item!='myjson'))
names - slice(map(split(myjson,'":{'),split(#item,'"')[size(split(#item,'"'))]),1,size(map(split(myjson,'":{'),split(#item,'"')[size(split(#item,'"'))]))-1)
select2 - Remove the extracted columns from select1 and keep only the columns myjson
,columns
,names
.
derivedColumn3 - Unfold the columns
array column and store the result in a new column column1
with expression toString(unfold(columns))
. This will contain the first level key names.
derivedColumn4 - Similarly unfold the names
array column and store the result in a new column column2
with expression toString(unfold(columns))
. This will be the second level key names but will have extra data. This will give the result like below. Here, the column1
and column2
contains the first level and second level keys but there will be duplicates and extra combinations.
filter1 - This will filter out the duplicate key rows by comparing both columns and also will remove the first level key names in the column2
. Use the (column1!=column2) && (!contains(columns,#item==column2))
expression for this.
filter2 - This will filter out the correct combination rows of first and second level keys. Use the below expression for it.
(toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2])>toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2]))&&(divide(size(names),size(columns)))>(minus(toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2]),toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2])))
It will give the result like below.
derivedColumn5 - Create the new columns column3
and column4
to get the inner values. Use the below expression for this.
column3 - replace(split(split(split(myjson,column2)[2],':')[3],',')[1],'"','')
column4 - replace(replace(split(split(split(myjson,column2)[2],':')[4],',')[1],'"',''),'}','')
This will give the required columns.
Result csv file:
This is my Dataflow JSON for your reference:
{
"name": "dataflow6",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "connstruct_json",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "construct_csv",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "select1"
},
{
"name": "derivedColumn1"
},
{
"name": "derivedColumn2"
},
{
"name": "select2"
},
{
"name": "derivedColumn3"
},
{
"name": "derivedColumn4"
},
{
"name": "filter1"
},
{
"name": "filter2"
},
{
"name": "derivedColumn5"
},
{
"name": "select3"
}
],
"scriptLines": [
"source(output(",
" parentProperty as (key1 as (key2 as (property1 as string, property2 as string), key3 as (property1 as string, property2 as string)), key4 as (key5 as (property1 as string, property2 as string), key6 as (property1 as string, property2 as string)))",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false,",
" documentForm: 'singleDocument') ~> source1",
"derivedColumn2 select(mapColumn(",
" each(parentProperty,match(true())),",
" myjson",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"select1 derive(columns = filter(columnNames(),(#item!='myjson')),",
" names = slice(map(split(myjson,'\":{'),split(#item,'\"')[size(split(#item,'\"'))]),1,size(map(split(myjson,'\":{'),split(#item,'\"')[size(split(#item,'\"'))]))-1)) ~> derivedColumn1",
"source1 derive(myjson = toString(parentProperty)) ~> derivedColumn2",
"derivedColumn1 select(mapColumn(",
" myjson,",
" columns,",
" names",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select2",
"select2 derive(column1 = toString(unfold(columns))) ~> derivedColumn3",
"derivedColumn3 derive(column2 = toString(unfold(names))) ~> derivedColumn4",
"derivedColumn4 filter((column1!=column2) && (!contains(columns,#item==column2))) ~> filter1",
"filter1 filter((toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2])>toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2]))&&(divide(size(names),size(columns)))>(minus(toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2]),toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2])))) ~> filter2",
"filter2 derive(column3 = replace(split(split(split(myjson,column2)[2],':')[3],',')[1],'\"',''),",
" column4 = replace(replace(split(split(split(myjson,column2)[2],':')[4],',')[1],'\"',''),'}','')) ~> derivedColumn5",
"derivedColumn5 select(mapColumn(",
" column1,",
" column2,",
" column3,",
" column4",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select3",
"select3 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" umask: 0022,",
" preCommands: [],",
" postCommands: [],",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> sink1"
]
}
}
}