scalaapache-flinkflink-streaming

Unit Tests Failing when upgrading Flink to 1.18


I am in the process of updating my application from Flink 1.15.2 to 1.20, and ran into a snag along the way. I have a simple unit test that creates a stream of data (Long values), applies a map function to increment the value by 1, and sinks it to a list of Longs. I copied this unit test from the Flink documentation, and tried testing it incrementally. Originally in 1.15.2, next 1.16.2 etc and found that it is breaking in Flink 1.18.1

I am attempting to build using Scala 2.12 through sbt, tests are run with the "sbt test" command

class MappingTest extends AnyFlatSpec with Matchers with BeforeAndAfter {

  val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
    .setNumberSlotsPerTaskManager(2)
    .setNumberTaskManagers(1)
    .build)

  before {
    flinkCluster.before()
  }

  after {
    flinkCluster.after()
  }

  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // configure your test environment
    env.setParallelism(2)

    // values are collected in a static variable
    CollectSink.values.clear()

    // create a stream of custom elements and apply transformations
    env.fromElements(1, 21, 22)
      .map(x => x + 1L)
      .addSink(new CollectSink())

    // execute
    env.execute()

    // verify your results
    CollectSink.values should contain allOf(2, 22, 23)
  }
}

class CollectSink extends SinkFunction[Long] {
  override def invoke(value: Long, context: SinkFunction.Context): Unit = {
    CollectSink.values.add(value)
  }
}

object CollectSink {
  // must be static
  val values: util.List[Long] = Collections.synchronizedList(new util.ArrayList())
}

the error message that i'm receiving:

should incrementValues *** FAILED *** 
[info] org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
[info] at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) 
[info] at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) 
[info] at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) 
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
[info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) 
[info] at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268) 
[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) 
[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) 
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
[info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) 
[info] ... 
[info] Cause: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. 
[info] at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) 
[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) 
[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) 
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
[info] at java.base/java.lang.Thread.run(Thread.java:840) 
[info] ... 
[info] Cause: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig 
[info] at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) 
[info] at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) 
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
[info] at java.base/java.lang.Thread.run(Thread.java:840) 
[info] ... 
[info] Cause: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig 
[info] at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) 
[info] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) 
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
[info] at java.base/java.lang.Thread.run(Thread.java:840) 
[info] ... 
[info] Cause: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig 
[info] at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) 
[info] at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) 
[info] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525) 
[info] at java.base/java.lang.Class.forName0(Native Method) 
[info] at java.base/java.lang.Class.forName(Class.java:467) 
[info] at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) 
[info] at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2034) 
[info] at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1898) 
[info] at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2224) 
[info] at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)

Based on the research that i've done, ClassNotFoundException errors are typically related to mismatch versions. I've simplified my dependencies in my build.sbt file:

val flinkVersion = "1.18.1"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
  "org.apache.flink"  % "flink-test-utils"      % flinkVersion % Test, 
  "org.scalatest"    %% "scalatest"             % "3.2.19"     % Test,
  "com.typesafe"      % "config"                % "1.4.3"
)

Also, while looking through the release notes, it was mentioned that 1.18 was tested using Java 17. I have tried running tests in a Java 11 and Java 17 environment, both are receiving the same error.

Additional information here is the tutorial from apache that i'm following https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/testing/


Solution

  • From mail-archive - Getting java.lang.ClassNotFoundException in Tests (Flink 1.18.0) - 03 Dec 2023

    Forking in sbt solved the issue (Test / fork := true).

    So, adding that line to your build.sbt config file solves the problem.


    I was able to reproduce the problem locally using the code you provided. Once I added that line, the test succeeded. Also works for 1.19.1 and 1.20.0.

    ThisBuild / scalaVersion := "2.12.20"
    
    val flinkVersion = "1.20.0"
    
    val flinkDependencies: Seq[ModuleID] = Seq(
      "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
      "org.apache.flink"  % "flink-test-utils"      % flinkVersion % Test,
      "org.scalatest"    %% "scalatest"             % "3.2.19"     % Test,
      "com.typesafe"      % "config"                % "1.4.3"
    )
    
    lazy val root = (project in file("."))
      .settings(
        name := "stackoverflow-poc-apache-flink",
        libraryDependencies ++= flinkDependencies,
        Test / fork := true // without this line, the test fail with the issue you reported
      )
    
    
    import org.apache.flink.streaming.api.scala.createTypeInformation
    import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
    import org.apache.flink.streaming.api.functions.sink.SinkFunction
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.test.util.MiniClusterWithClientResource
    import org.scalatest.BeforeAndAfter
    import org.scalatest.flatspec.AnyFlatSpec
    import org.scalatest.matchers.should.Matchers
    
    import java.util.Collections
    
    class MappingTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
    
      val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
        .setNumberSlotsPerTaskManager(2)
        .setNumberTaskManagers(1)
        .build)
    
      before {
        flinkCluster.before()
      }
    
      after {
        flinkCluster.after()
      }
    
      "IncrementFlatMapFunction pipeline" should "incrementValues" in {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // configure your test environment
        env.setParallelism(2)
    
        // values are collected in a static variable
        CollectSink.values.clear()
    
        // create a stream of custom elements and apply transformations
        env.fromElements(1, 21, 22)
          .map(x => x + 1L)
          .addSink(new CollectSink())
    
        // execute
        env.execute()
    
        // verify your results
        CollectSink.values should contain allOf(2, 22, 23)
      }
    }
    
    class CollectSink extends SinkFunction[Long] {
      override def invoke(value: Long, context: SinkFunction.Context): Unit = {
        CollectSink.values.add(value)
      }
    }
    
    object CollectSink {
      // must be static
      val values: java.util.List[Long] = Collections.synchronizedList(new java.util.ArrayList())
    }