scalaapache-sparkaccumulator

How to use Accumulator correctly in Spark to get right answer?


I am new in scala and spark. I want to use Accumulators in spark inside a function to increment as the function is called by a map function.

I have an RDD that its name is vertices and each row of RDD has a tuple2, ID and its property(key,value), for example:

(1,1)
(2,1)
.
.
.
(34,1)

I want to use accumulator to be incremented when the key % 2 is equal to zero. If it is equal to zero, accumulator will be incremented by 1 and we will have a tuple2 with key equal to ID and value will be accumulator that is incremented and if the key % 2 is not equal to zero, we will have a tuple2 with key equal to ID and value equal to last accumulator value.

val label_counter = sc.accumulator(0,"label_counter")  

def do_someThing (vertex:VertexId): (VertexId, Accumulator[Int]) = {
    if (vertex % 2 == 0) {
        label_counter +=1
        return (vertex,label_counter)
    } else return (vertex, label_counter)
}

val accum_test = vertices.map(x => (x._1)).map(do_someThing )
accum_test.foreach(println)

in this case the result is something like this:

 (2,1)
 (13,1)
 (19,1)
 (34,2)
 (15,2)
 (4,3)
 ..... 

This result is what I expect. The key is a node ID and the value is accumulator value in each map call.

but instead of the last line of code, when i use accum_test.collect().foreach(println) the result looks like this:

 (2,17)
 (13,17)
 (19,17)
 (34,17)
 (15,17)
 (4,17)
 ....

in this case when i use collect , the value part of tuple2 is all 17. its the last value that accumulator takes. and i dont expect this answer

i dont know why this happens and where is the mistake?? how should i write this code in correct way??. i thinks i have some problem with accumulator concept.

i have another question. the nodes that are not ordered, for example 2,13,19,34,15,4 and ... means that these numbers are distributed on different executors?? and distributing them made them to become unordered?? because they are ordered number in the text file.

please help me with these problems. thank you


Solution

  • You are using accumulators for something they are not meant for. Accumulators are meant to accumulate something over the RDD and get that accumulation back to the driver. The value is not meant to be used within the executors, and the result when you do it is not defined, which is why you get different result depending on seemingly unrelated details. Keep in mind that RDDs are distributed and the accumulation is run in parallel on the various partitions. This is why the result of accessing the value within the executors is unexpected.

    To illustrate what I was saying before. The correct use case just is:

    vertices.foreach(do_something) // accumulating
    println(label_counter.value) // result on the driver