I'm trying to create a dataframe from a CSV file located in an Azure Data Lake (gen2) in the CDM format. The file definition is located in a model.json file at the top level; the file describes every entity in the data lake. This data is outputted by Microsoft's automatic CDS replication to Azure Data Lake.
My goal is to read this file and do some processing in Azure Databricks. I can successfully read the model.json file and extract the columns names for each entity, but I run into certain CSV files that have less columns than is described in the model.json file, and as you can imagine trying to apply these columns names to the non-headered CSV files will result in an error:
java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Below are some code snippets describing the transformation process. Any help is appreciated. If there is an easier way to process the data in the CSV files then I am also interested in hearing this.
Loading the model.json file
model = spark.read.json(base_path + "model.json", multiLine=True)
entities = model.select(explode(model["entities"]).alias("entity"))
entity_info = entities.select("entity.name", "entity.attributes", "entity.partitions")
Extracting the column names and file paths from the JSON file
entity_metadata = (
filtered_entity_info.withColumn("attributes", explode("attributes"))
.select("name", "partitions", col("attributes")["name"].alias("column_name"))
)
entity_metadata = (
entity_metadata.groupBy("name", "partitions")
.agg(collect_list("column_name").alias("columns"))
.select("*")
)
entity_metadata = (
entity_metadata.withColumn("partitions", explode("partitions"))
.select("name", col("partitions")["location"].alias("filePath"), "columns")
)
Loading the file, applying the column names in an attempt to create a DF
def build_file_url(file_url):
url = file_url.split(blob_container_name + "/")[1]
return base_path + url
def populate_entity_df(tableName, url, column_names):
file_path = build_file_url(url)
df = (
spark.read.option("header", "false")
.option("inferSchema", "true")
.option("delimiter", ',')
.option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'")
.option("multiLine", "true")
.csv(file_path)
)
return df.toDF(*column_names)
array_of_metadatas = entity_metadata.collect()
opportunity_metadata = next(x for x in array_of_metadatas if x.name == "opportunity")
opportunity_df = populate_entity_df(opportunity_metadata.name, opportunity_metadata.filePath, opportunity_metadata.columns)
And, if interested, here is an example of the model.json file.
{
"name": "cdm",
"description": "cdm",
"version": "1.0",
"entities": [
{
"$type": "LocalEntity",
"name": "account",
"description": "account",
"annotations": [
{
"name": "Athena:PartitionGranularity",
"value": "Year"
},
{
"name": "Athena:InitialSyncState",
"value": "Completed"
},
{
"name": "Athena:InitialSyncDataCompletedTime",
"value": "9/1/2020 3:43:50 PM"
}
],
"attributes": [
{
"name": "Id",
"dataType": "guid"
},
{
"name": "SinkCreatedOn",
"dataType": "dateTime"
},
{
"name": "SinkModifiedOn",
"dataType": "dateTime"
},
{
"name": "statecode",
"dataType": "int64"
},
{
"name": "statuscode",
"dataType": "int64"
},
...
],
"partitions": [
{
"name": "2020",
"location": "https://<storage account>.dfs.core.windows.net:443/<blob container>/opportunity/Snapshot/2020_1602009522.csv",
"fileFormatSettings": {
"$type": "CsvFormatSettings",
"columnHeaders": false,
"delimiter": ",",
"quoteStyle": "QuoteStyle.Csv",
"csvStyle": "CsvStyle.QuoteAlways",
"encoding": "UTF-8"
},
"annotations": [
{
"name": "Athena:PartitionYear",
"value": "2020"
}
]
}
]
}
]
}
Turns out its a classic issue of the outputted CSV file not having commas for every columns. I didn't spot this because Dynamics 365 entities have hundreds of columns and seeing 387 commas instead of 378 didn't quite register when looking at the files.
jim,12,
bob,13,programmer,texas,houston
jane,88,director,alaska
PySpark, when using the .csv api, only uses the first row's number of columns and drops any extra columns from future rows.
To get around this I use the list of column names to generate a schema at runtime.
def get_schema(cols):
arr = []
for col in cols:
arr.append(StructField(col, StringType(), True))
return StructType(arr)
I'm just using StringType for now, but in the future it seems easy enough to also pull in the data type from the entity definitions and create a mapping.
To tie it all together, here is how the schema is applied:
df = (
spark.read.option("header", "false")
.schema(schema)
.option("delimiter", ',')
.option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'")
.option("multiLine", "true")
.csv(file_path)
)