I have about 400 data csv files of varying schemas that I am looping through in ADF for processing. I am trying to use a Mapping Data Flow with schema drift and a sql server column mapping table to map the data from the csv file to a final sql data table. I am having loads of trouble trying to get that list of column names and then joining that to the source column of the mapping table to then directly sink the data, or rename the columns and then sink, etc. byName doesn't work because I do not know the name of the column I need to map. I've tried joins, but again I'm not able to obtain the column names from the csv file. Any ideas are appreciated!
A sample input is a csv file with firstname, lastname, and employeeid. The source files vary though. All 400 could be different.
For output, I would like the data synced to a single table in sql server based on the mappings in the mapping table.
There are no data transformations. I need to copy data from the csv file into the appropriate field in a sql table. See below.
I have tried the above scenario using joining of mapping table and source csv files. But after the joining, the mapping table columns also will be considered as one of the columns in the source. When I tried to filter the csv file column names in the select transformation's Rule-based mapping using the mapping table source column names, it's not giving the option to choose the mapping table source column names.
So, you can use Sink cache in the Dataflow to compare the source csv column names to the mapping table data. The sink cache will enable the catched lookup functions which can be used in the select transformation's Rule-based mapping.
Here is a sample mapping table that I took in the SQL Database:
source_mapping sink_mapping
id Id
name Name
age Age
I have created the final target table as below.
create table final_target2(Id int,Name varchar(50),Age int);
In the Dataflow, take two sources, one for the source csv files and another for the above mapping table.
After the mapping table source, take a Derived column and create a column with any string value. The new column with given value for all rows will be added to the mapping table.
This column will be used in the Group By
and a JSON array of source_mapping
and sink_mapping
columns will be created. Take an Aggregate transformation and give the above column group
in the group by section. In the Aggregate section, create a new column mapping
with below expression.
collect(@(source_mapping=source_mapping,sink_mapping=sink_mapping))
This will generate the required JSON array as shown below.
Now, add a sink to this and select the sink type as cache.
Now, after the csv files source, take a select transformation and click on Add mapping -> Rule-based mapping. Use the below expressions involving the cached lookup functions.
Source's columns:
contains(sink1#output().mapping,#item.source_mapping==name)
Name as:
find(sink1#output().mapping,#item.source_mapping==$$).sink_mapping
Here, for each source column, the first expression will take the JSON array that we have created from the Aggregate transformation and checks whether the source column name equals to any of the source_mapping
key value. It will return true
if the source column name exists in the source_mapping
.
The next expression takes the input column name which matched in the above expression ($$
) and finds its object in the JSON array. After finding the object, it takes the sink_mapping
of that object.
It means, the first expression will check whether the source column exists or not in the mapping and if exists, the second expression renames it as per the sink column name in the mapping table.
Add your final SQL table as sink after this transformation.
Before running the pipeline, set the write order for the sink1(cache sink) as 1 and the target table sink2 as 2.
Now, debug the pipeline with your csv files, and it will copy each csv file to the target table as shown below.