apache-sparkjava-8sparkcore

Apache Spark map function org.apache.spark.SparkException: Task not serializable


I am learning Apache Spark and I am using Java 8 and Spark Core 2.3.2.

I am finding that when I use the map function on an RDD it only works when I use a Lambda Expression.

So this works:

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(x -> x*x );

But this does not and throws an org.apache.spark.SparkException: Task not serializable

JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
    public Integer call(Integer x) { return x*x; }
});

Can someone please explain why? Thanks


Solution

  • When you declare that new Function it contains a reference to the class that contains it. When Spark tries to send the new anonymous Function instance to the workers it tries to serialize the containing class too, but apparently that class doesn't implement Serializable or has other members that are not serializable. You probably got an error like object not serializable (class: YourClass, value: YourClass@e49bf8a) where "YourClass" is the class containing the Function declaration.

    If you instead declared the Function as a static member of your class:

    static Function<Integer, Integer> f = new Function<Integer, Integer>() {
        public Integer call(Integer x) {
            return x * x;
        }
    };
    

    and passed it to your map function:

    JavaRDD<Integer> result = rdd.map(f);
    

    then you'd probably be fine. I usually try to declare all the functions I'm going to use in transformations like that as static (if they're too big to use the lambda form), so I don't accidentally wind up serializing a whole class when I just want one function.