apache-sparksparkcore

Spark closures behavior


I have a basic question on spark closures. I am not able to distinguish code behavior between scenario 2 & 3, both produces same output but based on my understanding scenario 3 should not work as expected.

The Below code is common for all scenarios:

class A implements Serializable{
        String t;
        A(String t){
            this.t=t;
        }
    }

//Initiaze spark context
JavaSparkContext context=....
//create rdd
JavaRDD<String> rdd = context.parallelize(Arrays.asList("a","b","c","d","e"),3);

Scenerio 1: don't do this because A is initialize in driver and not visible on executor.

A a=new A("pqr");
rdd.map(i->i+a.t).collect();

Scenerio 2: Recommended way of sharing object

Broadcast<A> broadCast = context.broadcast(new A("pqr"));
rdd.map(i->broadCast.getValue().t+i).collect();
//output: [pqra, pqrb, pqrc, pqrd, pqre]

Scenerio 3: why this code work as expected even when I initiate A in driver?

 class TestFunction implements Function<String, String>, Serializable {
    private A val;
    public TestFunction(){ }
    public TestFunction(A a){
        this.val = a;
    }
    @Override
    public String call(String integer) throws Exception {
        return val.t+integer;
    }
}
    TestFunction mapFunction = new TestFunction(new A("pqr"));
    System.out.println(rdd.map(mapFunction).collect());
    //output: [pqra, pqrb, pqrc, pqrd, pqre]

Note: I am running program in cluster mode.


Solution

  • The generated Java bytecodes for Scenerio 1 & 3 are almost the same. The benefit of using Broadcast (Scenerio 2) is the broadcast object will only be sent to an executor once and reuse it in other tasks on this executor. Scenerio 1 & 3 will always send the object A to executors for each task.