apache-sparkapache-flinkapache-beamdata-pipeline

Beam pipeline spark runner issue


i have a beam pipeline that reads from a kinesis stream, deserialized protobuf data inside, change to byte array and writes it to another kinesis stream (just a dummy pipeline)

This pipeline executes successfully if i run

mvn compile exec:java -Dexec.mainClass=org.example.StarterPipeline -Dexec.args="--runner=DirectRunner"

or

mvn compile exec:java -Dexec.mainClass=org.example.StarterPipeline -Dexec.args="--runner=FlinkRunner"

in both cases i see the data written into destination kinesis stream.

However, if i run

mvn compile exec:java -Dexec.mainClass=org.example.StarterPipeline -Dexec.args="--runner=SparkRunner"

i get this stackoverflow error



[INFO] Scanning for projects...
[INFO] 
[INFO] ------------------------< org.example:bs_test >-------------------------
[INFO] Building bs_test 1.0-SNAPSHOT
[INFO]   from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- os:1.7.1:detect (set-property) @ bs_test ---
[INFO] ------------------------------------------------------------------------
[INFO] Detecting the operating system and CPU architecture
[INFO] ------------------------------------------------------------------------
[INFO] os.detected.name: osx
[INFO] os.detected.arch: x86_64
[INFO] os.detected.bitness: 64
[INFO] os.detected.version: 13.2
[INFO] os.detected.version.major: 13
[INFO] os.detected.version.minor: 2
[INFO] os.detected.classifier: osx-x86_64
[INFO] 
[INFO] --- protobuf:0.6.1:compile (default) @ bs_test ---
[INFO] Compiling 1 proto file(s) to /Users/viswajith/IdeaProjects/bs_test/target/generated-sources/protobuf/java
[INFO] 
[INFO] --- resources:3.3.0:resources (default-resources) @ bs_test ---
[INFO] skip non existing resourceDirectory /Users/viswajith/IdeaProjects/bs_test/src/main/resources
[INFO] Copying 1 resource
[INFO] 
[INFO] --- compiler:3.10.1:compile (default-compile) @ bs_test ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 3 source files to /Users/viswajith/IdeaProjects/bs_test/target/classes
[INFO] /Users/viswajith/IdeaProjects/bs_test/src/main/java/org/example/StarterPipeline.java: /Users/viswajith/IdeaProjects/bs_test/src/main/java/org/example/StarterPipeline.java uses or overrides a deprecated API.
[INFO] /Users/viswajith/IdeaProjects/bs_test/src/main/java/org/example/StarterPipeline.java: Recompile with -Xlint:deprecation for details.
[INFO] 
[INFO] --- exec:3.1.0:java (default-cli) @ bs_test ---
Mar 08, 2023 3:24:40 PM org.example.StarterPipeline createPipeline
INFO: Creating SparkRunner pipeline
Mar 08, 2023 3:24:41 PM org.apache.beam.sdk.io.kinesis.KinesisIO$Read expand
WARNING: You are using a deprecated IO for Kinesis. Please migrate to module 'org.apache.beam:beam-sdks-java-io-amazon-web-services2'.
Mar 08, 2023 3:24:41 PM org.apache.beam.sdk.coders.SerializableCoder checkEqualsMethodDefined
WARNING: Can't verify serialized elements of type KinesisReaderCheckpoint have well defined equals method. This may produce incorrect results on some PipelineRunner implementations
Mar 08, 2023 3:24:41 PM org.apache.beam.sdk.io.kinesis.KinesisIO$Write expand
WARNING: You are using a deprecated IO for Kinesis. Please migrate to module 'org.apache.beam:beam-sdks-java-io-amazon-web-services2'.
Mar 08, 2023 3:24:41 PM org.apache.beam.runners.spark.SparkRunner run
INFO: Executing pipeline using the SparkRunner.
Mar 08, 2023 3:24:41 PM org.apache.beam.runners.spark.SparkRunner$TranslationModeDetector visitValue
INFO: Found unbounded PCollection KinesisIO.Read/Read(KinesisSource)/ParDo(UnboundedSourceAsSDFWrapper)/ParMultiDo(UnboundedSourceAsSDFWrapper).output. Switching to streaming execution.
Mar 08, 2023 3:24:41 PM org.apache.beam.runners.spark.translation.streaming.Checkpoint$CheckpointDir <init>
WARNING: The specified checkpoint dir /tmp/starterpipeline-viswajith-0308232441-cebb258c does not match a reliable filesystem so in case of failures this job may not recover properly or even at all.
Mar 08, 2023 3:24:41 PM org.apache.beam.runners.spark.translation.streaming.Checkpoint$CheckpointDir <init>
INFO: Checkpoint dir set to: /tmp/starterpipeline-viswajith-0308232441-cebb258c
[WARNING] 
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
    at org.apache.hadoop.util.ReflectionUtils.newInstance (ReflectionUtils.java:135)
    at org.apache.hadoop.security.Groups.<init> (Groups.java:106)
    at org.apache.hadoop.security.Groups.<init> (Groups.java:102)
    at org.apache.hadoop.security.Groups.getUserToGroupsMappingService (Groups.java:451)
    at org.apache.hadoop.security.UserGroupInformation.initialize (UserGroupInformation.java:337)
    at org.apache.hadoop.security.UserGroupInformation.setConfiguration (UserGroupInformation.java:365)
    at org.apache.spark.deploy.SparkHadoopUtil.<init> (SparkHadoopUtil.scala:50)
    at org.apache.spark.deploy.SparkHadoopUtil$.instance$lzycompute (SparkHadoopUtil.scala:397)
    at org.apache.spark.deploy.SparkHadoopUtil$.instance (SparkHadoopUtil.scala:397)
    at org.apache.spark.deploy.SparkHadoopUtil$.get (SparkHadoopUtil.scala:418)
    at org.apache.spark.streaming.StreamingContext$.getOrCreate$default$3 (StreamingContext.scala:836)
    at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate (JavaStreamingContext.scala:627)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate (JavaStreamingContext.scala)
    at org.apache.beam.runners.spark.SparkRunner.run (SparkRunner.java:175)
    at org.apache.beam.runners.spark.SparkRunner.run (SparkRunner.java:89)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
    at org.example.StarterPipeline.main (StarterPipeline.java:64)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:279)
    at java.lang.Thread.run (Thread.java:834)
Caused by: java.lang.reflect.InvocationTargetException
    at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0 (Native Method)
    at jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance (NativeConstructorAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance (DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance (Constructor.java:490)
    at org.apache.hadoop.util.ReflectionUtils.newInstance (ReflectionUtils.java:133)
    at org.apache.hadoop.security.Groups.<init> (Groups.java:106)
    at org.apache.hadoop.security.Groups.<init> (Groups.java:102)
    at org.apache.hadoop.security.Groups.getUserToGroupsMappingService (Groups.java:451)
    at org.apache.hadoop.security.UserGroupInformation.initialize (UserGroupInformation.java:337)
    at org.apache.hadoop.security.UserGroupInformation.setConfiguration (UserGroupInformation.java:365)
    at org.apache.spark.deploy.SparkHadoopUtil.<init> (SparkHadoopUtil.scala:50)
    at org.apache.spark.deploy.SparkHadoopUtil$.instance$lzycompute (SparkHadoopUtil.scala:397)
    at org.apache.spark.deploy.SparkHadoopUtil$.instance (SparkHadoopUtil.scala:397)
    at org.apache.spark.deploy.SparkHadoopUtil$.get (SparkHadoopUtil.scala:418)
    at org.apache.spark.streaming.StreamingContext$.getOrCreate$default$3 (StreamingContext.scala:836)
    at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate (JavaStreamingContext.scala:627)
    at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate (JavaStreamingContext.scala)
    at org.apache.beam.runners.spark.SparkRunner.run (SparkRunner.java:175)
    at org.apache.beam.runners.spark.SparkRunner.run (SparkRunner.java:89)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:309)
    at org.example.StarterPipeline.main (StarterPipeline.java:64)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:279)
    at java.lang.Thread.run (Thread.java:834)
Caused by: java.lang.StackOverflowError
    at java.time.Clock$SystemClock.instant (Clock.java:529)
    at java.time.Instant.now (Instant.java:273)
    at java.util.logging.LogRecord.<init> (LogRecord.java:229)
    at org.slf4j.jul.JDK14LoggerAdapter.innerNormalizedLoggingCallHandler (JDK14LoggerAdapter.java:147)
    at org.slf4j.jul.JDK14LoggerAdapter.log (JDK14LoggerAdapter.java:172)
    at org.slf4j.bridge.SLF4JBridgeHandler.callLocationAwareLogger (SLF4JBridgeHandler.java:221)
    at org.slf4j.bridge.SLF4JBridgeHandler.publish (SLF4JBridgeHandler.java:303)
    at java.util.logging.Logger.log (Logger.java:979)
.
.
.
.   <same error repeated until stack overflows >
.
.
.
    at org.slf4j.jul.JDK14LoggerAdapter.innerNormalizedLoggingCallHandler (JDK14LoggerAdapter.java:156)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  20.361 s
[INFO] Finished at: 2023-03-08T15:24:43-08:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.1.0:java (default-cli) on project bs_test: An exception occurred while executing the Java class. java.lang.reflect.InvocationTargetException: StackOverflowError -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
[WARNING] 
java.lang.NoClassDefFoundError: org/apache/hadoop/util/ShutdownHookManager$2
    at org.apache.hadoop.util.ShutdownHookManager.getShutdownHooksInOrder (ShutdownHookManager.java:273)
    at org.apache.hadoop.util.ShutdownHookManager.executeShutdown (ShutdownHookManager.java:121)
    at org.apache.hadoop.util.ShutdownHookManager$1.run (ShutdownHookManager.java:95)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.util.ShutdownHookManager$2
    at org.codehaus.mojo.exec.URLClassLoaderBuilder$ExecJavaClassLoader.loadClass (URLClassLoaderBuilder.java:198)
    at java.lang.ClassLoader.loadClass (ClassLoader.java:521)
    at org.apache.hadoop.util.ShutdownHookManager.getShutdownHooksInOrder (ShutdownHookManager.java:273)
    at org.apache.hadoop.util.ShutdownHookManager.executeShutdown (ShutdownHookManager.java:121)
    at org.apache.hadoop.util.ShutdownHookManager$1.run (ShutdownHookManager.java:95)

attaching beam pipeline code and pom file

package org.example;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.kinesis.KinesisIO;
import org.apache.beam.sdk.io.kinesis.KinesisPartitioner;
import org.apache.beam.sdk.io.kinesis.KinesisRecord;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

public class StarterPipeline {
    private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

    public static void main(String[] args) throws Exception {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();

        // Create pipeline
        Pipeline pipeline = createPipeline(options);

        // Read from Kinesis data stream
        PCollection<KinesisRecord> kinesisData = pipeline.apply(KinesisIO.read()
                .withStreamName("input-kinesis-stream")
                .withAWSClientsProvider(new DefaultCredentialsProviderClientsProvider(Regions.US_WEST_2))
                .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
                .withRequestRecordsLimit(5)
        );

        // Process elements using a custom DoFn
        PCollection<byte[]> processedData = kinesisData.apply(ParDo.of(new DoFn<KinesisRecord, byte[]>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws InvalidProtocolBufferException {
                KinesisRecord record = c.element();
                byte[] data = record.getData().array();
                AllDataStreaming.StreamMessageWrapper message = AllDataStreaming.StreamMessageWrapper.parseFrom(data);
                c.output(message.toByteArray());
            }
        }));

        processedData.apply("Writing to Kinesis",
                KinesisIO
                        .write()
                        .withStreamName("output-kinesis-stream")
                        .withAWSClientsProvider(new DefaultCredentialsProviderClientsProvider(Regions.US_WEST_2))
                        .withPartitionKey(new SimpleHashPartitioner().toString())
        );

        // Run pipeline
        pipeline.run();
    }

    private static Pipeline createPipeline(PipelineOptions options) {
        if (options.as(FlinkPipelineOptions.class).getRunner().getSimpleName().equals("FlinkRunner")) {
            LOG.info("Creating FlinkRunner pipeline");
            return Pipeline.create(options.as(FlinkPipelineOptions.class));
        } else if (options.as(SparkPipelineOptions.class).getRunner().getSimpleName().equals("SparkRunner")) {
            LOG.info("Creating SparkRunner pipeline");

            // Set Spark configuration
            SparkPipelineOptions sparkPipelineOptions = options.as(SparkPipelineOptions.class);
            sparkPipelineOptions.setSparkMaster("local[2]");
            sparkPipelineOptions.setStreaming(true);
            return Pipeline.create(sparkPipelineOptions);
        } else {
            throw new IllegalArgumentException("Unsupported runner specified: " + options.getRunner().getSimpleName());
        }
    }

    private static final class SimpleHashPartitioner implements KinesisPartitioner {
        @Override
        public @UnknownKeyFor @NonNull @Initialized String getPartitionKey(@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] value) {
            return String.valueOf(Arrays.hashCode(value));
        }

        @Override
        public @UnknownKeyFor @NonNull @Initialized String getExplicitHashKey(@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] value) {
            return null;
        }
    }
}

pomfile

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>bs_test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <beam.version>2.45.0</beam.version>
        <spark.version>3.1.3</spark.version>
        <java.version>11</java.version>
        <maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <maven-exec-plugin.version>3.1.0</maven-exec-plugin.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <slf4j.version>2.0.5</slf4j.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>${beam.version}</version>
        </dependency>

        <!-- Direct Runner -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
            <version>${beam.version}</version>
        </dependency>

        <!-- Flink Runner -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-flink-1.15</artifactId>
            <version>${beam.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.annotation</groupId>
                    <artifactId>javax.annotation-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>failureaccess</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parent</artifactId>
            <version>1.15.2</version>
            <type>pom</type>
        </dependency>

        <!-- Spark Runner -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-spark-3</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-hadoop-common</artifactId>
            <version>${beam.version}</version>
        </dependency>


        <!-- slf4j API frontend binding with JUL backend -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-jdk14</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <!-- AWS -->
        <dependency>
            <groupId>software.amazon.kinesis</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-kinesis</artifactId>
            <version>${beam.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>failureaccess</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <!-- Protobuf -->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.22.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>3.22.0</version>
        </dependency>

    </dependencies>
    <dependencyManagement>
        <dependencies>
            <!-- enforce jackson-databind version -->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.10.5</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.6.1</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>test-compile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <!--suppress UnresolvedMavenProperty -->
                    <protocArtifact>com.google.protobuf:protoc:3.22.0:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <!--suppress UnresolvedMavenProperty -->
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.53.0:exe:${os.detected.classifier}</pluginArtifact>
                    <outputDirectory>${project.build.directory}/generated-sources/protobuf/java</outputDirectory>
                    <clearOutputDirectory>false</clearOutputDirectory>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.4.1</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <shadedClassifierName>shaded</shadedClassifierName>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.7.1</version>
                <executions>
                    <execution>
                        <id>set-property</id>
                        <phase>initialize</phase>
                        <goals>
                            <goal>detect</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>3.0.0-M9</version>
                <configuration>
                    <systemPropertyVariables>
                        <org.slf4j.simpleLogger.defaultLogLevel>INFO</org.slf4j.simpleLogger.defaultLogLevel>
                    </systemPropertyVariables>
                </configuration>
            </plugin>
        </plugins>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>exec-maven-plugin</artifactId>
                    <version>${maven-exec-plugin.version}</version>
                    <dependencies>
                        <dependency>
                            <groupId>org.codehaus.plexus</groupId>
                            <artifactId>plexus-utils</artifactId>
                            <version>3.5.0</version>
                        </dependency>
                    </dependencies>
                    <configuration>
                        <cleanupDaemonThreads>false</cleanupDaemonThreads>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

</project>

As you can see in the code, I tried adding sparkconfig to the code if sparkrunner is selected, added hadoop related dependencies, but the stackoverflow error wont go away. How can i resolve it?

beam version:2.45.0 java version: 11 spark version: 3.1.3


Solution

  • Fix

    To fix the issue remove slf4j-jdk14 from your POM and if necessary, exclude it from other dependencies.

    Why

    There's an issue on your classpath. Spark forwards any logs to JUL to SLF4J (see jul-to-slf4j). If you attempt using JUL as logging backend for SLF4J logs are basically constantly send back and forth between the two. That's the Stackoverflow you're seeing.

    Spark uses log4j for logging. On the classpath you will find slf4j-log4j12 so that SLF4J emits log using Log4j. If using Spark I recommend using Log4j so that everything works seamlessly when running on a cluster as well.