I am reading from kafka topic which has 5 partitions. Since 5 cores are not sufficient to handle the load, I am doing repartitioning the input to 30. I have given 30 cores to my spark process with 6 cores on each executor. With this setup i was assuming that each executor will get 6 tasks. But more oftan than not we are seeing that one executor is getting 4 tasks and others are getting 7 tasks. It is skewing the processing time of our job.
Can someone help me understand why all the executor will not get equal number of tasks? Here is the executor metrics after job has run for 12 hours.
Address | Status | RDD Blocks | Storage Memory | Disk Used | Cores | Active Tasks | Failed Tasks | Complete Tasks | Total Tasks | Task Time (GC Time) | Input | Shuffle Read | Shuffle Write |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
ip1:36759 | Active | 7 | 1.6 MB / 144.7 MB | 0.0 B | 6 | 6 | 0 | 442506 | 442512 | 35.9 h (26 min) | 42.1 GB | 25.9 GB | 24.7 GB |
ip2:36689 | Active | 0 | 0.0 B / 128 MB | 0.0 B | 0 | 0 | 0 | 0 | 0 | 0 ms (0 ms) | 0.0 B | 0.0 B | 0.0 B |
ip5:44481 | Active | 7 | 1.6 MB / 144.7 MB | 0.0 B | 6 | 6 | 0 | 399948 | 399954 | 29.0 h (20 min) | 37.3 GB | 22.8 GB | 24.7 GB |
ip1:33187 | Active | 7 | 1.5 MB / 144.7 MB | 0.0 B | 6 | 5 | 0 | 445720 | 445725 | 35.9 h (26 min) | 42.4 GB | 26 GB | 24.7 GB |
ip3:34935 | Active | 7 | 1.6 MB / 144.7 MB | 0.0 B | 6 | 6 | 0 | 427950 | 427956 | 33.8 h (23 min) | 40.5 GB | 24.8 GB | 24.7 GB |
ip4:38851 | Active | 7 | 1.7 MB / 144.7 MB | 0.0 B | 6 | 6 | 0 | 410276 | 410282 | 31.6 h (24 min) | 39 GB | 23.9 GB | 24.7 GB |
If you see there is a skew in number of tasks completed by ip5:44481. I dont see abnormal GC activity as well.
What metrics should i be looking at to understand this skew?
UPDATE
Upon further debugging I can see that all the partitions are having unequal data. And all the tasks are given approx same number of records.
Executor ID | Address | Task Time | Total Tasks | Failed Tasks | Killed Tasks | Succeeded Tasks | Shuffle Read Size / Records | Blacklisted |
---|---|---|---|---|---|---|---|---|
0 stdout stderr |
ip3:37049 | 0.8 s | 6 | 0 | 0 | 6 | 600.9 KB / 272 | FALSE |
1 stdout stderr |
ip1:37875 | 0.6 s | 6 | 0 | 0 | 6 | 612.2 KB / 273 | FALSE |
2 stdout stderr |
ip3:41739 | 0.7 s | 5 | 0 | 0 | 5 | 529.0 KB / 226 | FALSE |
3 stdout stderr |
ip2:38269 | 0.5 s | 6 | 0 | 0 | 6 | 623.4 KB / 272 | FALSE |
4 stdout stderr |
ip1:40083 | 0.6 s | 7 | 0 | 0 | 7 | 726.7 KB / 318 | FALSE |
This is the stats of stage just after repartitioning. We can see that number of tasks are proportional to number of records. As a next step I am trying to see how the partition function is working.
Update 2:
The only explanation i have come across is spark uses round robin partitioning. And It is independently executed on each partition. For example if there are 5 records on node1 and 7 records on node2. Node1's round robin will distribute approximately 3 records to node1, and approximately 2 records to node2. Node2's round robin will distribute approximately 4 records to node1, and approximately 3 records to node2. So, there is the possibility of having 7 records on node1 and 5 records on node2, depending on the ordering of the nodes that is interpreted within the framework code for each individual node. [source][1]
NOTE: if you notice the best performing guys are on same IP. Is it because after shuffling transferring data on same host is faster? compared to other IP?
Based on above data we can see that repartition is working fine, i.e. assigning equal number of records to 30 partitions, but the question is why some executors are getting more partitions than others.
** update 3 **
adding our code snippet
protected JavaDStream<byte[]> getRepartitionedValueStream(JavaInputDStream<ConsumerRecord<String, byte[]>> stream) {
return stream.mapToPair(new PairFunction<ConsumerRecord<String, byte[]>, String, byte[]>() {
public Tuple2<String, byte[]> call(ConsumerRecord<String, byte[]> x) {
return new Tuple2(x.key(), x.value());
}
}).groupByKey(this.partitions).flatMap((x) -> {
return ((Iterable)x._2()).iterator();
});
}
We are getting records from Kafka, and then using groupby
so that events from the same partition go to same group.
[1]: https://www.ibm.com/support/pages/when-running-datastage-parallel-jobs-records-are-not-evenly-distributed-across-nodes
This is because by default spark tries to allocate task that optimize for data locality. This is a doc that task about locality in details.
Quoting original author. Credit goes to Russell Spitzer
When to give up on locality spark.locality.wait Imagine we are pulling data from a co-located Kafka node in a 3 node Spark Cluster. The Kafka is running on machine A of Spark nodes A,B,and C All of the data will be marked as being NODE_LOCAL to a Node A. This means once every core on A is occupied we will be left with tasks whose preferred location is A but we only have execution space on B and C. Spark only has two options, wait for cores to become available on A or downgrade the locality level of the task and try to find a space for it and take whatever penalty there is for running non-local.
The spark.locality.wait parameter describes how long to wait before downgrading tasks that could potentially be run a higher locality level to a lower level. This parameter is basically our estimate of how much time waiting for locality is worth. The default is 3 seconds which means that in our Kafka example, once our co-located node A is saturated with tasks, our other machines B and C will stand idle for 3 seconds before tasks which could have been NODE_LOCAL are downgraded to ANY* and run (code reference).
- If ‘A’ and ‘C’ were specified to be in the same rack, the locality would be downgraded to RACK_LOCAL first and the task run on ‘C’ instead. Once C was full the task would wait another 3 seconds before doing the final downgrade to ANY and be run on B.
Conclusion In a streaming applications, I’ve seen many users just set spark.locality.wait to 0 to insure that all tasks are being worked on immediately. A batch window of 5 seconds makes the default wait of 3 seconds excruciatingly long. After all, waiting 60% of your batch time before starting work is probably inefficient. In other applications I’ve seen locality be extremely important and the wait time increased to near infinite. The correct setting will depend on the latency goals and locality benefit for your particular application. There are even finer grained controls if you would like to make the delays between different levels of locality weighted differently.