pyspark

Efficiently process multiple Pyspark Dataframes


I'm new to Pyspark. I come from a SAS background. I'll try to keep this brief and pretty general.

My job involves working w/ medical claims where records are contained in separate monthly datasets. The datasets are very large but have identical schema. Billions of rows. They already exist in the Catalog.

I want to efficiently apply the same transformation to each month's dataframe (across many years) and then later append the collected records into a single dataframe. What is the best way to do this?

Below is some extremely simple code. My basic understanding is that Spark records the transformations (not the filtered rows) in each loop and then appends those directions to the list: temp. Since no action occurs inside the loop, the action plan is only created and not executed. Then, reduce is called and Spark begins to process the dataframes. Each dataframe is processed across its partitions (in parallel), but the looping across the dataframes is sequential. So, February starts when January is done and so on. Finally, the reduce call appends the filter monthly dfs into one and discards the individual pieces.

Is this logic correct--at least in simplified terms for a Spark beginner. Is this an efficient way to do this? I'm already seeing that I have to rethink my SAS-based when working w/ Spark so this may be one of those cases. Thank you

from pyspark.sql.functions import *
from functools import reduce

jan=spark.createDataFrame([(1,'A', 100),(2,'B', 200),(3,'C',300),(4,'D',400)], ['id', 'claim', 'amt'])
feb=spark.createDataFrame([(1,'A', 200),(2,'B', 300),(3,'C',500),(4,'D',500)], ['id', 'claim', 'amt'])
mar=spark.createDataFrame([(1,'A', 300),(2,'B', 400),(3,'C',600),(4,'D',600)], ['id', 'claim', 'amt'])

temp=[]
for mnth in(jan, feb, mar):
    mnth = mnth.filter(col('amt')>=300)
    temp.append(mnth)

all=reduce(DataFrame.unionAll, temp)
all.show()


Solution

  • Your solution filters each month sequentially in the loop. Although Spark uses lazy evaluation and triggers computation at reduce(DataFrame.unionAll, temp), the transformations (like filter) are applied separately to each DataFrame, which may limit Spark's optimization when appending them.

    Effienct Way : Loading the data using spark.read.format for each partition, Spark can load data in parallel, making the entire process faster. The union operation (unionByName) is more efficient for appending datasets and handles column names better.

    from pyspark.sql.functions import *
    from functools import reduce
    
    months = ['2023-01', '2023-02', '2023-03']  # List of months to load
    
    # Use a loop to load each month's data, apply filter, and store in temp list
    temp = []
    for month in months:
        df = (spark.read.format(<format>)
              .option("year_month", month)
              .load(<path>)
        
        filtered_df = df.filter(col('amt') >= 300)
        temp.append(filtered_df)
    
    # Use unionByName instead of unionAll to combine all filtered dataframes into one
    all_data = reduce(DataFrame.unionByName, temp)
    all_data.show()