I need help with Mapping Data Flow in ADF.
The problem is that I have a dynamic source with dynamic columns. The dataset column names are unknown at design time. I have a JSON document that describes rules like Column name and Value as regular expressions.
Every single row in the dynamic dataset should be evaluated againts these rules and filter away rows where the Columns and Value fields a not present.
In the JSON document we could have "Column" : "ScanAddress" and "Value" : "^PLC.*". Eg If the dataset contains a column where the name is ScanAddress and with row values the match the regular expressions then these rows are kept.
Most of the times there are multiple rules for each row.
In the following json document we have three rules. The first two rules must evaluate two columns in the source data set and the last must evaluate just on. It is very dynamic.
{
"Events": [
{
"Id": "",
"Class": "Alarm",
"Category": "Network",
"Specifications": [
{
"Specification": "Comm error",
"Filters": [
{
"Column": "Control",
"DataType": "integer",
"Value": 1
},
{
"Column": "ScanAddress",
"DataType": "string",
"Value": "^.*_System._Error.*$"
}
]
},
{
"Specification": "Comm error",
"Filters": [
{
"Column": "ControlOnOff",
"DataType": "integer",
"Value": 1
},
{
"Column": "ScanAddress",
"DataType": "string",
"Value": "PLC:\".*\"1"
}
]
},
{
"Specification": "Comm error",
"Filters": [
{
"Column": "ControlOnOff",
"DataType": "boolean",
"Value": 1
}
]
}
]
}
]
}
The solution that I building is a classification system that looks up the column and value for a specific item / row and classify each item based on the json rule set.
Here is a data sample. Each row I need to tag with Class and Category and save the data back to a Delta Lake. The schema is drifting og the Bronze dataset is schemaless.
PntNo,PntName,Control,ControlOnOff,AlarmSegment,ScanAddress,AlarmDescription,AlarmDescription2,pntname,AlarmCount,AlarmLatest
1,"V010A1:ERROR_ALM",1,0,"VAND:A_ALM","PLC:\"V010A1\":10","Målebrønd: Dataopsamlingsfejl","Dataopsamlingsfejl","V010A1:_ERROR_ALM",26,2021-05-22 00:59:09.7070000
42,"SYSTEM:ERROR",1,0,"SYS:A_ALM","PLC:\"SYSTEM\":10","Dataopsamlingsfejl","Dataopsamling - status","SYSTEM:DATA_ERROR",32,2022-06-03 13:13:48.5830000
43,"V00_FI01L:ALM",1,0,"VAND:A_ALM","B:\"DB3,B2.0\":BOOL","FI01LS01 - Filter 1 - Lav niveau","Filter 1 - Lav niveau","V00_FI01LS01:ALM",12,2019-11-06 00:35:18.0000000
44,"V001_FI01L:ALM_BLK",1,0,"VAND:C_BLK","B:\"DB4,B172.2\":BOOL","FI0S01 - Filter 1 - Lav niveau: Alarm blokeret","FI0101:Bloker","V00_FI01:ALM_BLK",2,2018-12-30 12:57:25.0000000
Expected output:
PntNo,PntName,Control,ControlOnOff,AlarmSegment,ScanAddress,AlarmDescription,AlarmDescription2,pntname,AlarmCount,AlarmLatest,Specification, SpecificationDescription, Class, Category
1 1,"V010A1:ERROR_ALM",1,0,"VAND:A_ALM","PLC:\"V010A1\":10","Målebrønd: Dataopsamlingsfejl","Dataopsamlingsfejl","V010A1:_ERROR_ALM",26,2021-05-22 00:59:09.7070000, "Dataopsamlingsfejl", "Alarmer i forbindelse med dataopsamling", "Alarm", "Network"
As you can see are additional four columns added to the (outputSpecification, SpecificationDescription, Class, Category)
Tried solutions
As of now I have output the rule set to a cached sink. From here I have tried to evaluate multiple column values based on the cached sinks array of objects but with out any luck. I have also tried a custom cross join but with no luck.
The big issue is concerned with the fact that I need to evalute a dynamic number of columns. Looking for help. Thanks
My challenge was to have multiple columns located in a array to filter my bronze data.
I did use another approach that I thinks works very well.
I am using the Assert activity and a Conditional Split activity.
The Assert activity will easily let me add multiple filters. All those evaluated true() will go in one bucket and those that evaluates to false() will go in another bucket. The Conditional split will help me split the to buckets.