I have a huge PySpark dataframe that contains 1.5B rows, including the column fieldA
. I have a list of 8.8M unique fieldA
values, that I want to filter out of the 1.5B rows. However, I think due to the large data size, I keep getting errors like StackOverflowError
or OutOfMemoryError
.
I've tried to split the 8.8M list into smaller lists of 20K values, and also split the 1.5B dataframes into smaller dataframes of 15M rows each. Then for each dataframe of 15M rows, continuously (in a loop) filter away different 20K of the fieldA
values (temp_df = temp_df.filter(~col('fieldA').isin(fieldA_part_list))
) until all 8.8M values were filtered away, then write the final temp_df
to parquet files. Repeat for the next 15M rows of dataframes. However, I think this resulted in hundreds of .filter()
, and that might be what gave me the StackOverflowError
when I tried to write to parquet files on the first 15M dataframe.
I then tried to filter away the full 8.8M values from each 15M dataframe. For each 15M dataframe, I would write the filtered results to parquet files. However, when I tried to write to parquet files, I got the OutOfMemoryError
on the first 15M dataframe.
How can I filter away rows that match any of the 8.8M fieldA
values from the 1.5B rows of dataframe, in an efficient manner?
There are a couple of things you can do to improve the memory efficiency of our code.
Select only the columns you need at beginning: This will help reducing the memory consumption of your temp_df
DataFrame.
Use a join
instead isin
if your list of elements is too large: This is probably causing an overhead when filtering. My suggestion would be transforming the list of elements as a DataFrame and use a join technique like leftanti
to get all the elements from your temp_df
that does not exist in your fieldA_part_list
.
Trying DataFrame.exceptAll (pyspark.sql.DataFrame.exceptAll): This is also a nice native pyspark operation you can try that basically returns all the rows from temp_df
that are not in the fieldA_part_list
(considering that is now a dataframe).
Here are some examples you can use as a baseline:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
fieldA_part_list = ["someValue1", "someValue2", "someValue3"]
# transform the fieldA_list as a dataframe
fieldA_part_df = spark.createDataFrame(fieldA_part_list, StringType()).toDF("value")
join_condition_expr = col("fieldA") == col("value")
# Perform leftanti to get only temp_df records
temp_df = temp_df.join(fieldA_part_df, on= join_condition_expr, how="leftanti")
# Alternatively using exceptAll - use one or other given memory pessure is on
another_result_df = temp_df.exceptAll(fieldA_part_df)
Out of curiosity, are you using your local machine or a Databricks cluster to perform these operations?