INPUT:
The input data set contains 10 million transactions in multiple files stored as parquet. The size of the entire data set including all files ranges from 6 to 8GB.
PROBLEM STATEMENT:
Partition the transactions based on customer id's which would create one folder per customer id and each folder containing all the transactions done by that particular customer.
HDFS has a hard limit of 6.4 million on the number of sub directories within a root directory that can be created so using the last two digits of the customer id ranging from 00,01,02...to 99 to create top level directories and each top level directory would contain all the customer id's ending with that specific two digits.
Sample output directory structure:
00/cust_id=100900/part1.csv
00/cust_id=100800/part33.csv
01/cust_id=100801/part1.csv
03/cust_id=100803/part1.csv
CODE:
// Reading input file and storing in cache
val parquetReader = sparksession.read
.parquet("/inputs")
.persist(StorageLevel.MEMORY_ONLY) //No spill will occur has enough memory
// Logic to partition
var customerIdEndingPattern = 0
while (cardAccountEndingPattern < 100) {
var idEndPattern = customerIdEndingPattern + ""
if (customerIdEndingPattern < 10) {
idEndPattern = "0" + customerIdEndingPattern
}
parquetReader
.filter(col("customer_id").endsWith(idEndPattern))
.repartition(945, col("customer_id"))
.write
.partitionBy("customer_id")
.option("header", "true")
.mode("append")
.csv("/" + idEndPattern)
customerIdEndingPattern = customerIdEndingPattern + 1
}
Spark Configuration: Amazon EMR 5.29.0 (Spark 2.4.4 & Hadoop 2.8.5)
1 master and 10 slaves and each of them has 96 vCores and 768GB RAM(Amazon AWS R5.24xlarge instance). Hard disks are EBS with bust of 3000 IOPS for 30 mins.
'spark.hadoop.dfs.replication': '3',
'spark.driver.cores':'5',
'spark.driver.memory':'32g',
'spark.executor.instances': '189',
'spark.executor.memory': '32g',
'spark.executor.cores': '5',
'spark.executor.memoryOverhead':'8192',
'spark.driver.memoryOverhead':'8192',
'spark.default.parallelism':'945',
'spark.sql.shuffle.partitions' :'945',
'spark.serializer':'org.apache.spark.serializer.KryoSerializer',
'spark.dynamicAllocation.enabled': 'false',
'spark.memory.fraction':'0.8',
'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version':'2',
'spark.memory.storageFraction':'0.2',
'spark.task.maxFailures': '6',
'spark.driver.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"
'spark.executor.extraJavaOptions': '-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError="kill -9 %p"
SCALING ISSUES:
Experimented from 10 to all the way upto 40 slaves(adjusting the spark configs accordingly) but still the same results the job takes more than 2hrs to complete(as shown in the first pic each job takes more than a minute and the while loop runs 99 times). Also the reads from remote executors are almost non existent(which is good) most are process local.
Partition seems to work fine(refer second pic) got 5 RDD blocks per instance and 5 tasks running at all times(each instance has 5 cores and 19 instances per slave node). GC is optimized too.
Each partitionby task as written in the while loop takes a minute or more to complete.
METRICS:
Sample duration of a few jobs we have 99 jobs in total
Summary from 1 job basically one partitionby execution
Summary of a few instances after full job completion hence RDD blocks is zero and the first row is driver.
So the question is how to optimize it more and why it's not scaling up? Is there a better way to go about it? Have I reached the max performance already? Assuming I have access to more resources in terms of hardware is there anything I could do better? Any suggestions are welcome.
Touching every record 100 times is very inefficient, even if data can be cached in memory and not be evicted downstream. Not to mention persisting alone is expensive
Instead you could add a virtual column
import org.apache.spark.sql.functions.substring
val df = sparksession.read
.parquet("/inputs")
.withColumn("partition_id", substring($"customer_id", -2, 2))
and use it later for partitioning
df
.write
.partitionBy("partition_id", "customer_id")
.option("header", "true")
.mode("append")
.csv("/")
To avoid to many small files you can repartition first using longer suffix
val nParts: Int = ???
val suffixLength: Int = ??? // >= suffix length used for write partitions
df
.repartitionByRange(
nParts,
substring($"customer_id", -suffixLength, suffixLength)
.write
.partitionBy("partition_id", "customer_id")
.option("header", "true")
.mode("append")
.csv("/")
Such changes will allow you to process all data in a single pass without any explicit caching.