I am looking for help implementing a “know nothing” import and load routine in Azure. Input would be, load-over-load, an arbitrary delimited file of N columns and M rows. Output would be to a Json map that essentially says, “[{row1, field1<>, dataValue}, {row1, field2, dataValue}, … {rowN, fieldM, dataValue}]. For example, assume an incoming pipe-delimited file named EMPS2024 as follows:
EMPID|EMPNAME|STARTDT|ENDDT|SALARY
1234|Jane Jones|2019-01-01|NULL|100000
2215|Don Davis|2010-04-01|2023-12-31|99000
7448|Mary Mays|2008-09-30|NULL|70000
JSON OUTPUT: { “filename” : “EMPS2024”, “Data” : [ {“Row” : “1”, “Field” : “EMPID”, “DataValue” : “1234” }, {“Row” : “1”, “Field” : “EMPNAME”, “DataValue” : “Jane Jones” }, {“Row”:1, “Field” : “STARTDT”, “DataValue” : “2019-01-01” }, {“Row” : “1”, “Field” : “ENDDT”, “DataValue” : “NULL” }, {“Row” : “1”, “Field” : ”SALARY”, “DataValue” : “100000” }, {“Row” : “2”, “Field” : “EMPID”, “DataValue” : “2215” }, {“Row” : “2”, “Field” : “EMPNAME”, “DataValue” : “Don Davis” }, {“Row” : “2”, “Field” : “STARTDT”, “DataValue” : “2010-04-01” }, {“Row” : “2”, “Field” : “ENDDT”, “DataValue” : “2023-12-31” }, {“Row” : “2”, “Field” : “SALARY”, “DataValue” : “99000” }, {“Row” : “3”, “Field” : “EMPID”, “DataValue” : “7448” }, {“Row” : “3”, “Field”: “EMPNAME”, “DataValue” : “Mary Mays” }, {“Row” : “3”, “Field” : “STARTDT”, “DataValue” : “2008-09-30” }, {“Row” : “3”, “Field” : “ENDDT”, “DataValue” : “NULL” }, {“Row” : “3”, “Field” : “SALARY”, “DataValue” : “70000” } ] }
The next file might be an Expenses file with 35 unrelated columns, the file after might be Sales with 20 unrelated columns, etc. The only thing in common is that they are all delimited text, and they will all be mapped to a common output of Row, Field, Value. What might come up next could be a unicorn, e.g., "GreatLakesFishes", content not known until runtime, but the delimited file can be written to the uniform Json output.
Can this be done in ADF?
We have had some success writing an "AnyDelimitedInput" routine using Adeptia, but have a need to do something via ADF into an Azure SQL Database. The output could just as easily be a heap table with structure FileName|RowNum|FieldName|DataValue.
You can use ADF mapping dataflow like below to achieve your requirement.
First make sure the source dataset mapping is clear both in the dataset and in the dataflow source as well. By making this, the same dataflow can be used for different input files.
After taking the source follow the below transformations:
Take a derivedColumn transformation and create two columns named cols
and one
with below expressions.
cols - columnNames()
one - split(concatWS(',', toString(columns())),',')
The column cols
will store an array of all input column names and one
column stores the array of all the input column values in each row.
Next, take a SurrogateKey transformation with a column key
and increment it by 1
starting with 1
.
The, again take another derivedColumn transformation and create a new column Data
with the below expression.
mapIndex(cols,concat('{"Key":"',toString(key),'","Field":"',#item,'","DataValue":"',one[#index],'"}'))
This will create a new column Data
of type Array which contains JSON strings of {row1, field1<>, dataValue}
for every row.
Now, take a select transformation and apply rule-based transformation like below to get only the Data
column as output.
After this, take a flatten transformation to combine all rows containing JSON strings and make a single column containing JSON strings.
The output from above, will only contains JSON
strings rows. we need to convert it into JSON object. Use parse transformation for it like below with below mapping.
(Key as string,Field as string,DataValue as string)
Parse will give output as JSON object columns, to make it as a JSON array of objects, use Aggregate transformation. Leave the group By section as it is and give the below expression for the Data
column.
collect(@(Key=Data.Key,Field=Data.Field,Datavalue=Data.DataValue))
This will give the desired Data
array, now add the filename by using the derivedColumn after this and give your filename in that. If you are using dataflow in a loop for all different files, you can pass the filename from pipeline for-loop to dataflow parameter and use that parameter in this.
This will give the output like below.
At the end, add a sink with target JSON dataset.
To check the data preview after every transformation, you can use a sample input file as source. For the multiple input files, make sure the mapping is clear.
This is my complete Dataflow JSON which you can use in building yours:
{
"name": "dataflow2",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DelimitedText2",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "Json1",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "surrogateKeyaddsKey"
},
{
"name": "derivedColumnbuildsJSONstring"
},
{
"name": "ColumnnamesColumnsVaues"
},
{
"name": "selectforDataColumn"
},
{
"name": "flattensDataColumn"
},
{
"name": "derivedColumnForFilename"
},
{
"name": "parsetheDatatoJSON"
},
{
"name": "aggregateForArrayOfObjects"
}
],
"scriptLines": [
"source(allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"ColumnnamesColumnsVaues keyGenerate(output(key as long),",
" startAt: 1L,",
" stepValue: 1L) ~> surrogateKeyaddsKey",
"surrogateKeyaddsKey derive(Data = mapIndex(cols,concat('{\"Key\":\"',toString(key),'\",\"Field\":\"',#item,'\",\"DataValue\":\"',one[#index],'\"}'))) ~> derivedColumnbuildsJSONstring",
"source1 derive(cols = columnNames(),",
" one = split(concatWS(',', toString(columns())),',')) ~> ColumnnamesColumnsVaues",
"derivedColumnbuildsJSONstring select(mapColumn(",
" each(match(name=='Data'))",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> selectforDataColumn",
"selectforDataColumn foldDown(unroll(Data, Data),",
" mapColumn(",
" Data",
" ),",
" skipDuplicateMapInputs: false,",
" skipDuplicateMapOutputs: false) ~> flattensDataColumn",
"aggregateForArrayOfObjects derive(filename = 'sample_blank.csv') ~> derivedColumnForFilename",
"flattensDataColumn parse(Data = Data ? (Key as string,",
" Field as string,",
" DataValue as string),",
" format: 'json',",
" documentForm: 'arrayOfDocuments') ~> parsetheDatatoJSON",
"parsetheDatatoJSON aggregate(Data = collect(@(Key=Data.Key,Field=Data.Field,Datavalue=Data.DataValue))) ~> aggregateForArrayOfObjects",
"derivedColumnForFilename sink(allowSchemaDrift: true,",
" validateSchema: false,",
" partitionFileNames:['res2.json'],",
" umask: 0022,",
" preCommands: [],",
" postCommands: [],",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" partitionBy('hash', 1)) ~> sink1"
]
}
}
}
Execute the dataflow from ADF pipeline and you will get the desired JSON output file like below.
I have checked this for multiple files with different number of rows and columns and it is giving the expected results.