pythonpandaspysparkout-of-memory

Unable to filter away dataframes in huge dataset in PySpark


I have a huge PySpark dataframe that contains 1.5B rows, including the column fieldA. I have a list of 8.8M unique fieldA values, that I want to filter out of the 1.5B rows. However, I think due to the large data size, I keep getting errors like StackOverflowError or OutOfMemoryError.

I've tried to split the 8.8M list into smaller lists of 20K values, and also split the 1.5B dataframes into smaller dataframes of 15M rows each. Then for each dataframe of 15M rows, continuously (in a loop) filter away different 20K of the fieldA values (temp_df = temp_df.filter(~col('fieldA').isin(fieldA_part_list))) until all 8.8M values were filtered away, then write the final temp_df to parquet files. Repeat for the next 15M rows of dataframes. However, I think this resulted in hundreds of .filter(), and that might be what gave me the StackOverflowError when I tried to write to parquet files on the first 15M dataframe.

I then tried to filter away the full 8.8M values from each 15M dataframe. For each 15M dataframe, I would write the filtered results to parquet files. However, when I tried to write to parquet files, I got the OutOfMemoryError on the first 15M dataframe.

How can I filter away rows that match any of the 8.8M fieldA values from the 1.5B rows of dataframe, in an efficient manner?


Solution

  • There are a couple of things you can do to improve the memory efficiency of our code.

    Here are some examples you can use as a baseline:

    from pyspark.sql.functions import *
    from pyspark.sql.types import StringType
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.getOrCreate()
    
    fieldA_part_list = ["someValue1", "someValue2", "someValue3"]  
    
    # transform the fieldA_list as a dataframe
    fieldA_part_df = spark.createDataFrame(fieldA_part_list, StringType()).toDF("value")
    
    join_condition_expr = col("fieldA") == col("value")
    
    # Perform leftanti to get only temp_df records
    temp_df = temp_df.join(fieldA_part_df, on= join_condition_expr, how="leftanti")
    
    # Alternatively using exceptAll - use one or other given memory pessure is on
    another_result_df = temp_df.exceptAll(fieldA_part_df)
    

    Out of curiosity, are you using your local machine or a Databricks cluster to perform these operations?