pysparkazure-data-factorypyspark-schema

PySpark: Get Number of Columns from DataSchema


I am loading a predefined schema from a JSON file for a specific dataset I ingest into a Azure Data Lake. The JSON file that contains the schema is also stored on the Data Lake.

varSchema = 'abfss://landing@[hidden].dfs.core.windows.net/'+parSourceSystemName+'/'+parDatasetName+'.json'

rdd = spark.sparkContext.wholeTextFiles(varSchema)
text = rdd.collect()[0][1]
dict = json.loads(str(text))
dataSchema = StructType.fromJson(dict)

I want to get the number fields in this schema variable so I can compare it to the number columns of a dataframe that was loaded from a file in my landing container to determine whether there is a schema change in the new landing data.

If the Schema states that there should be 20 fields but the landing data file contain 21 - I would know that the source system added a new field.


Solution

  • varSchema = 'abfss://landing@[hidden].dfs.core.windows.net/'+parSourceSystemName+'/'+parDatasetName+'.json'
    rdd = spark.sparkContext.wholeTextFiles(varSchema)
    text = rdd.collect()[0][1]
    schema_dict = json.loads(text)
    dataSchema = StructType.fromJson(schema_dict)
    

    Create an empty DataFrame with the schema

    empty_DF = spark.createDataFrame([], dataSchema)
    

    Load the actual data into another DF

    landing_data_DF = spark.read.format("csv").load("abfss://landing@[hidden].dfs.core.windows.net/path/to/datafile.csv")
    

    Get the number of fields in the schema and the number of columns in the landing data DF and compare them: (I assumed here that you want print statements)

    num_fields_in_schema = len(dataSchema)
    
    num_columns_in_data = len(landing_data_DF.columns)
    
    # I used print statements but you can do other methods according to your preference
    
    if num_fields_in_schema == num_columns_in_data:
        print("No schema change.")
    elif num_fields_in_schema < num_columns_in_data:
        print("The source system added new fields.")
    else:
        print("The source system removed fields.")