I am attempting to use Spark for a very simple use case: given a large set of files (90k) with device time-series data for millions of devices group all of the time-series reads for a given device into a single set of files (partition). For now let’s say we are targeting 100 partitions, and it is not critical that a given devices data shows up in the same output file, just the same partition.
Given this problem we’ve come up with two ways to do this - repartition
then write
or write
with partitionBy
applied to the Writer
. The code for either of these is very simple:
repartition
(hash column is added to ensure that comparison to partitionBy
code below is one-to-one):
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.repartition("partition") \
.write.format("json") \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
partitionBy
:
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.write.format("json") \
.partitionBy(“partition”) \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
In our testing repartition
is 10x faster than partitionBy
. Why is this?
Based on my understanding repartition
will incur a shuffle which my Spark learnings have told me to try to avoid whenever possible. On the other hand, partitionBy
(based on my understanding) only incurs an sort operation local to each node - no shuffle is needed. Am I misunderstanding something that is causing me to think partitionBy
would be faster?
I think @Oli has explained the issue perfectly in his comments to the main answer. I just want to add my 2 cents and try to explain the same.
Let's say when you are reading the XML files [90K files], spark reads it into N partitions. This is decided based on the number of factors like spark.sql.files.maxPartitionBytes, file format, compression type etc.
Let's assume it to be 10K partitions. This is happening in the below part of the code.
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
Assuming you are using num_partitions = 100, you are adding a new column called partition with values 0-99. Spark is just adding a new column to the existing dataframe [or rdd] which is split across the 10K partitions.
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
Till this point, both the codes are the same.
Now, let's compare what is happening with repartition v/s partitionBy
Case 1: repartition
.repartition("partition") \
.write.format("json") \
Here, you are repartitioning the existing dataframe based on the column "partition" which has 100 distinct values. So the existing dataframe will incur a full shuffle bringing down the number of partitions from 10K to 100. This stage will be compute-heavy since a full shuffle is involved. This could also fail if the size of one particular partition is really huge [skewed partition].
But the advantage here is that in the next stage where write happens, Spark has to write only 100 files to the output_path. Each file will only have data corresponding to only one value of column "partition"
Case 2: partitionBy
.write.format("json") \
.partitionBy("partition") \
Here, you are asking spark to write the existing dataframe into output_path partitioned by the distinct values of the column "partition". You are nowhere asking spark to reduce the existing partition count of the dataframe.
So spark will create new folders inside the output_path and write data corresponding to each partitions inside it.
output_path + "\partition=0\"
output_path + "\partition=1\"
output_path + "\partition=99\"
Now since you have 10K spark partitions on the existing data frame and assuming the worst case where each of these 10K partitions has all the distinct values of the column "partition", Spark will have to write 10K * 100 = 1M files. ie, some part of all the 10K partitions will be written to all of the 100 folders created by the column "partition". This way spark will be writing 1M files to the output_path by creating sub-directories inside. The advantage is that we are skipping a full-shuffle using this method.
Now compared to the in-memory compute-intensive shuffle in Case 1, this will be much slower since Spark has to create 1M files and write them to persistent storage. That too, initially to a temporary folder and then to the output_path.
This will be much more slower if the write is happening to an object-store like AWS S3 or GCP Blob
Case 3: coalesce + partitionBy
.coalesce(num_partitions) \
.write.format("json") \
.partitionBy("partition") \
In this case, you will be reducing the number of spark partitions from 10K to 100 with coalesce() and writing it to output_path partitioned by column "partition".
So, assuming the worst case where each of these 100 partitions has all the distinct values of the column "partition", spark will have to write 100 * 100 = 10K files.
This will still be faster than Case 2, but will be slower than Case 1. This is because you are doing a partial shuffle with coalesce() but still end up writing 10K files to output_path.
Case 4: repartition+ partitionBy
.repartition("partition") \
.write.format("json") \
.partitionBy("partition") \
In this case, you will be reducing the number of spark partitions from 10K to 100 [distinct values of column "partition"] with repartition() and writing it to output_path partitioned by column "partition".
So, each of these 100 partitions has only one distinct value of the column "partition", spark will have to write 100 * 1 = 100 files. Each sub-folder created by partitionBy() will only have 1 file inside it.
This will take the same time as Case 1 since both the cases involve a full-shuffle and then writing 100 files. The only difference here will be that 100 files will be inside sub-folders under the output_path.
This setup will be useful for predicate push-down of filters while reading the output_path via spark or hive.
Conclusion:
Even though partitionBy is faster than repartition, depending on the number of dataframe partitions and distribution of data inside those partitions, just using partitionBy alone might end up costly.