apache-sparkpysparkdatabricksparquetmicrosoft-fabric

Reading multiple Parquet files in PySpark notebook


When reading in multiple parquet files into a dataframe, it seems to evaluate per parquet file afterwards for subsequent transformations, when it should be doing the evaluations on the dataframe.

I am working inside a fabric notebook using pyspark. I am trying to read in multiple parquet files into one dataframe. Each parquet file has the same number of columns but there's a chance the column schema's may differ, for example one column called "adjustment" may be type int, but type string if left empty. I am currently reading in the files to my dataframe as

df = spark.read.schema(schema).parquet(*files).withColumn(
    "file_name", split(input_file_name(), "/").getItem(8)
)

where I have specified a schema and files is a list of file paths to the files I want to load from my lake. File_name is just the name of the file which contains a date in there.

when I run

display(df.where(col("file_name").contains("2024-10-01")))

It seems to display the dataframe fine, similar to display(df) but when I run

display(df.where(col("file_name").contains("2024-12-01")))

it gives me this error

org.apache.spark.SparkException: Parquet column cannot be converted in file abfss://workspace@onelake.dfs.fabric.microsoft.com/lakehouse/path/to/my/data/Data_2024-12-01. Column: [data.adjustment], Expected: string, Found: INT64.

I have tried to specify the schema, I have tried to .cache() or .persist() the dataframe, but everytime I end up with this error. I thought it was something to do with lazy evaluation, but I just cannot think of what else to do apart from reading in each parquet file separately and then union'ing them after enforcing schema changes on each column. Thanks in advance for any help


Solution

  • but I just cannot think of what else to do apart from reading in each parquet file separately and then union'ing them after enforcing schema changes on each column

    Yes. That's what you do. Reading multiple parquet files in a single statement only works if the schemas match.

    So loop over the files in python, reading each into a DataFrame, add transforms to make the schemas match, and then UNION them together into your final DataFrame.