I have a large dataset in parquet format (~1TB in size) that is partitioned into 2 hierarchies: CLASS
and DATE
There are only 7 classes. But the Date is ever increasing from 2020-01-01 onwards.
My data is partitioned by CLASS
first and then DATE
So something like:
CLASS1---DATE 1
---DATE 2
--- .
--- .
--- .
---DATE N
CLASS2---DATE 1
---DATE 2
--- .
--- .
--- .
---DATE N
I load my data by CLASS
in a for-loop. If I load the entire parquet file, YARN kills the job since it overloads the memory instances. But I load all the days since I am doing a percentile calculation in my modeling. This method takes about 23hrs to complete.
However, if I repartition such that I only have the CLASS
partition, the job takes about 10hrs.
Does having too many sub-partitions slow down the spark executor jobs?
I keep the partition hierarchy as CLASS
-> DATE
only because I need to append new data by DATE
every day.
If having only 1 partition is more efficient, then I would have to repartition to just the CLASS
partition every day after loading new data.
Could someone explain why having a single partition works faster? And if so, what would be the best method to partition the data on a daily basis by appending and without repartitioning the entire dataset?
Thank You
EDIT:
I use the for loop on the file structure to loop by CLASS
partition like so:
fs = s3fs.S3FileSystem(anon=False)
inpath="s3://bucket/file.parquet/"
Dirs= fs.ls(inpath)
for paths in Dirs:
customPath='s3://' + uvapath + '/'
class=uvapath.split('=')[1]
df=spark.read.parquet(customPath)
outpath="s3://bucket/Output_" + class + ".parquet"
#Perform calculations
df.write.mode('overwrite').parquet(outpath)
The loaded df
will have all the dates for CLASS=1
. I then output the file as separate parquet files for each CLASS
such that I have 7 parquet files:
Output_1.parquet
Output_2.parquet
Output_3.parquet
Output_4.parquet
Output_5.parquet
Output_6.parquet
Output_7.parquet
I then merge the 7 parquets into a single parquet is not a problem as the resulting parquet files are much smaller.
I have the partitioned data with three columns, year, month, and id. The folder path hierarchy is
year=2020/month=08/id=1/*.parquet
year=2020/month=08/id=2/*.parquet
year=2020/month=08/id=3/*.parquet
...
year=2020/month=09/id=1/*.parquet
year=2020/month=09/id=2/*.parquet
year=2020/month=09/id=3/*.parquet
and I can read the DataFrame by loading the root path.
val df = spark.read.parquet("s3://mybucket/")
Then, the partitioned column is automatically added to the DataFrame. Now, then you can filter your data for the partitioned column in a way that
val df_filtered = df.filter("year = '2020' and month = '09'")
and do something with df_filtered
then the spark will use only the partitioned data!
For your repeated processing, you can use the fair scheduler
of the spark. Add the fair.xml file into src/main/resources of your project with the below code,
<?xml version="1.0"?>
<allocations>
<pool name="fair">
<schedulingMode>FAIR</schedulingMode>
<weight>10</weight>
<minShare>0</minShare>
</pool>
</allocations>
and set the spark configuration after creating the spark session.
spark.sparkContext.setLocalProperty("spark.scheduler.mode", "FAIR")
spark.sparkContext.setLocalProperty("spark.scheduler.allocation.file", getClass.getResource("/fair.xml").getPath)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "fair")
Then you can do your job in parallel. You may want to parallelize the job depends on the CLASS, so
val classes = (1 to 7).par
val date = '2020-09-25'
classes foreach { case i =>
val df_filtered = df.filter(s"CLASS == '$i' and DATE = '$date'")
// Do your job
}
the code will work at the same time with different CLASS values.