I am loading a predefined schema from a JSON file for a specific dataset I ingest into a Azure Data Lake. The JSON file that contains the schema is also stored on the Data Lake.
varSchema = 'abfss://landing@[hidden].dfs.core.windows.net/'+parSourceSystemName+'/'+parDatasetName+'.json'
rdd = spark.sparkContext.wholeTextFiles(varSchema)
text = rdd.collect()[0][1]
dict = json.loads(str(text))
dataSchema = StructType.fromJson(dict)
I want to get the number fields in this schema variable so I can compare it to the number columns of a dataframe that was loaded from a file in my landing container to determine whether there is a schema change in the new landing data.
If the Schema states that there should be 20 fields but the landing data file contain 21 - I would know that the source system added a new field.
varSchema = 'abfss://landing@[hidden].dfs.core.windows.net/'+parSourceSystemName+'/'+parDatasetName+'.json'
rdd = spark.sparkContext.wholeTextFiles(varSchema)
text = rdd.collect()[0][1]
schema_dict = json.loads(text)
dataSchema = StructType.fromJson(schema_dict)
Create an empty DataFrame with the schema
empty_DF = spark.createDataFrame([], dataSchema)
Load the actual data into another DF
landing_data_DF = spark.read.format("csv").load("abfss://landing@[hidden].dfs.core.windows.net/path/to/datafile.csv")
Get the number of fields in the schema and the number of columns in the landing data DF and compare them: (I assumed here that you want print statements)
num_fields_in_schema = len(dataSchema)
num_columns_in_data = len(landing_data_DF.columns)
# I used print statements but you can do other methods according to your preference
if num_fields_in_schema == num_columns_in_data:
print("No schema change.")
elif num_fields_in_schema < num_columns_in_data:
print("The source system added new fields.")
else:
print("The source system removed fields.")