spring-bootmavenapache-sparkkubernetesminikube

Spring Boot Spark on K8S (Minikube): cannot assign instance of java.lang.invoke.SerializedLambda


I've seen others have been dealing with this same issue, but since none of the proposed solutions or workarounds worked for me and I've already spent hours on this, I figured I would share my specific case in detail in hope someone could point out what I'm missing.

I wanted to experiment with running a very simple Spark Spring-Boot application on a Minikube k8s cluster. When I run the app locally (using SparkSession.builder().master("local")) everything works as expected. However, when I deploy my app to minikube, I manage to get my driver pod to spin up the executor pods when the job is triggered, but then I get this exception on my executor pods:

ERROR Executor: Exception in task 0.1 in stage 0.0 (TID 1)
cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.execution.MapPartitionsExec.func of type scala.Function1 in instance of org.apache.spark.sql.execution.MapPartitionsExec

Here is my spring-boot app. For the sake of simplicity of sharing this, I kept all the logic on the controller:

WordcountController

@RestController
public class WordCountController implements Serializable {
    @PostMapping("/wordcount")
    public ResponseEntity<String> handleFileUpload(@RequestParam("file") MultipartFile file) throws IOException {
        String hostIp;
        try {
            hostIp = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("count.words.in.file")
                .setMaster("k8s://https://kubernetes.default.svc:443")
                .setJars(new String[]{"/app/wordcount.jar"})
                .set("spark.driver.host", hostIp)
                .set("spark.driver.port", "8080")
                .set("spark.kubernetes.namespace", "default")
                .set("spark.kubernetes.container.image", "spark:3.3.2h.1")
                .set("spark.executor.cores", "2")
                .set("spark.executor.memory", "1g")
                .set("spark.kubernetes.authenticate.executor.serviceAccountName", "spark")
                .set("spark.kubernetes.dynamicAllocation.deleteGracePeriod", "20")
                .set("spark.cores.max", "4")
                .set("spark.executor.instances", "2");

        SparkSession spark = SparkSession.builder()
                .config(conf)
                .getOrCreate();

        byte[] byteArray = file.getBytes();
        String contents = new String(byteArray, StandardCharsets.UTF_8);
        Dataset<String> text = spark.createDataset(Arrays.asList(contents), Encoders.STRING());

        Dataset<String> wordsDataset = text.flatMap((FlatMapFunction<String, String>) line -> {
            List<String> words = new ArrayList<>();
            for (String word : line.split(" ")) {
                words.add(word);
            }
            return words.iterator();
        }, Encoders.STRING());

        // Count the number of occurrences of each word
        Dataset<Row> wordCounts = wordsDataset.groupBy("value")
                .agg(count("*").as("count"))
                .orderBy(desc("count"));

        // Convert the word count results to a List of Rows
        List<Row> wordCountsList = wordCounts.collectAsList();

        StringBuilder resultStringBuffer = new StringBuilder();

        // Build the final string representation
        for (Row row : wordCountsList) {
            resultStringBuffer.append(row.getString(0)).append(": ").append(row.getLong(1)).append("\n");
        }
        return ResponseEntity.ok(resultStringBuffer.toString());

    }

Here is my maven pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.8</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>wordcount</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>wordcount</name>
    <description>wordcount</description>
    <properties>
        <java.version>11</java.version>
        <spark.version>3.3.2</spark.version>
        <scala.version>2.12</scala.version>
    </properties>
    <dependencyManagement>
        <dependencies>
            <!--Spark java.lang.NoClassDefFoundError: org/codehaus/janino/InternalCompilerException-->
            <dependency>
                <groupId>org.codehaus.janino</groupId>
                <artifactId>commons-compiler</artifactId>
                <version>3.0.8</version>
            </dependency>
            <dependency>
                <groupId>org.codehaus.janino</groupId>
                <artifactId>janino</artifactId>
                <version>3.0.8</version>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler</artifactId>
            <version>3.0.8</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
            <version>3.0.8</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-kubernetes_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

and here is the Dockerfile I'm using to package my spring-boot application before deploying it to minikube:

# Use an existing image as the base image
FROM openjdk:11-jdk

# Set the working directory
WORKDIR /app

# Copy the compiled JAR file to the image
COPY target/wordcount-0.0.1-SNAPSHOT.jar /app/wordcount.jar

RUN useradd -u 185 sparkuser

# Set the entrypoint command to run the JAR file
ENTRYPOINT ["java", "-jar", "wordcount.jar"]

For the spark.kubernetes.container.image I built a docker image using the Dockerfile which is shipped with my local Spark bin (spark-3.3.2-bin-hadoop3 - same Spark version used by my spring-boot app) following these instructions and loaded it to minikube.

Here are some of the things I tried with no luck so far:

public static class SplitLine implements FlatMapFunction<String, String> {
   @Override
   public Iterator<String> call(String line) throws Exception {
       List<String> words = new ArrayList<>();
       for (String word : line.split(" ")) {
           words.add(word);
       }
       return words.iterator();
   }

...

Dataset<String> wordsDataset = text.flatMap(new SplitLine(), Encoders.STRING());

Any tips or hints regarding my setup or suggestions on how I can refactor my code to get it to work with the existing setup would be greatly appreciated.


Solution

  • In the end I managed to get around this issue by turning this example into a maven multi-module project.

    This allowed creating a separate jar for the executor code and then letting Spark know about it using spark-config:

    .setJars(new String[]{"word-count-spark-job.jar"})
    

    More on this solution in this blog-post.