apache-sparkspark-streamingspark-streaming-kafkaapache-spark-standalone

Spark not giving equal tasks to all executors


I am reading from kafka topic which has 5 partitions. Since 5 cores are not sufficient to handle the load, I am doing repartitioning the input to 30. I have given 30 cores to my spark process with 6 cores on each executor. With this setup i was assuming that each executor will get 6 tasks. But more oftan than not we are seeing that one executor is getting 4 tasks and others are getting 7 tasks. It is skewing the processing time of our job.

Can someone help me understand why all the executor will not get equal number of tasks? Here is the executor metrics after job has run for 12 hours.

Address Status RDD Blocks Storage Memory Disk Used Cores Active Tasks Failed Tasks Complete Tasks Total Tasks Task Time (GC Time) Input Shuffle Read Shuffle Write
ip1:36759 Active 7 1.6 MB / 144.7 MB 0.0 B 6 6 0 442506 442512 35.9 h (26 min) 42.1 GB 25.9 GB 24.7 GB
ip2:36689 Active 0 0.0 B / 128 MB 0.0 B 0 0 0 0 0 0 ms (0 ms) 0.0 B 0.0 B 0.0 B
ip5:44481 Active 7 1.6 MB / 144.7 MB 0.0 B 6 6 0 399948 399954 29.0 h (20 min) 37.3 GB 22.8 GB 24.7 GB
ip1:33187 Active 7 1.5 MB / 144.7 MB 0.0 B 6 5 0 445720 445725 35.9 h (26 min) 42.4 GB 26 GB 24.7 GB
ip3:34935 Active 7 1.6 MB / 144.7 MB 0.0 B 6 6 0 427950 427956 33.8 h (23 min) 40.5 GB 24.8 GB 24.7 GB
ip4:38851 Active 7 1.7 MB / 144.7 MB 0.0 B 6 6 0 410276 410282 31.6 h (24 min) 39 GB 23.9 GB 24.7 GB

If you see there is a skew in number of tasks completed by ip5:44481. I dont see abnormal GC activity as well.

What metrics should i be looking at to understand this skew?

UPDATE

Upon further debugging I can see that all the partitions are having unequal data. And all the tasks are given approx same number of records.

Executor ID Address Task Time Total Tasks Failed Tasks Killed Tasks Succeeded Tasks Shuffle Read Size / Records Blacklisted
0
stdout
stderr
ip3:37049 0.8 s 6 0 0 6 600.9 KB / 272 FALSE
1
stdout
stderr
ip1:37875 0.6 s 6 0 0 6 612.2 KB / 273 FALSE
2
stdout
stderr
ip3:41739 0.7 s 5 0 0 5 529.0 KB / 226 FALSE
3
stdout
stderr
ip2:38269 0.5 s 6 0 0 6 623.4 KB / 272 FALSE
4
stdout
stderr
ip1:40083 0.6 s 7 0 0 7 726.7 KB / 318 FALSE

This is the stats of stage just after repartitioning. We can see that number of tasks are proportional to number of records. As a next step I am trying to see how the partition function is working.

Update 2:

The only explanation i have come across is spark uses round robin partitioning. And It is independently executed on each partition. For example if there are 5 records on node1 and 7 records on node2. Node1's round robin will distribute approximately 3 records to node1, and approximately 2 records to node2. Node2's round robin will distribute approximately 4 records to node1, and approximately 3 records to node2. So, there is the possibility of having 7 records on node1 and 5 records on node2, depending on the ordering of the nodes that is interpreted within the framework code for each individual node. [source][1]

NOTE: if you notice the best performing guys are on same IP. Is it because after shuffling transferring data on same host is faster? compared to other IP?

Based on above data we can see that repartition is working fine, i.e. assigning equal number of records to 30 partitions, but the question is why some executors are getting more partitions than others.

** update 3 **

adding our code snippet

    protected JavaDStream<byte[]> getRepartitionedValueStream(JavaInputDStream<ConsumerRecord<String, byte[]>> stream) {
        return stream.mapToPair(new PairFunction<ConsumerRecord<String, byte[]>, String, byte[]>() {
            public Tuple2<String, byte[]> call(ConsumerRecord<String, byte[]> x) {
                return new Tuple2(x.key(), x.value());
            }
        }).groupByKey(this.partitions).flatMap((x) -> {
            return ((Iterable)x._2()).iterator();
        });
    }

We are getting records from Kafka, and then using groupby so that events from the same partition go to same group. [1]: https://www.ibm.com/support/pages/when-running-datastage-parallel-jobs-records-are-not-evenly-distributed-across-nodes


Solution

  • This is because by default spark tries to allocate task that optimize for data locality. This is a doc that task about locality in details.

    Quoting original author. Credit goes to Russell Spitzer

    When to give up on locality spark.locality.wait Imagine we are pulling data from a co-located Kafka node in a 3 node Spark Cluster. The Kafka is running on machine A of Spark nodes A,B,and C All of the data will be marked as being NODE_LOCAL to a Node A. This means once every core on A is occupied we will be left with tasks whose preferred location is A but we only have execution space on B and C. Spark only has two options, wait for cores to become available on A or downgrade the locality level of the task and try to find a space for it and take whatever penalty there is for running non-local.

    The spark.locality.wait parameter describes how long to wait before downgrading tasks that could potentially be run a higher locality level to a lower level. This parameter is basically our estimate of how much time waiting for locality is worth. The default is 3 seconds which means that in our Kafka example, once our co-located node A is saturated with tasks, our other machines B and C will stand idle for 3 seconds before tasks which could have been NODE_LOCAL are downgraded to ANY* and run (code reference).

    • If ‘A’ and ‘C’ were specified to be in the same rack, the locality would be downgraded to RACK_LOCAL first and the task run on ‘C’ instead. Once C was full the task would wait another 3 seconds before doing the final downgrade to ANY and be run on B.

    Conclusion In a streaming applications, I’ve seen many users just set spark.locality.wait to 0 to insure that all tasks are being worked on immediately. A batch window of 5 seconds makes the default wait of 3 seconds excruciatingly long. After all, waiting 60% of your batch time before starting work is probably inefficient. In other applications I’ve seen locality be extremely important and the wait time increased to near infinite. The correct setting will depend on the latency goals and locality benefit for your particular application. There are even finer grained controls if you would like to make the delays between different levels of locality weighted differently.