I am in the process of updating my application from Flink 1.15.2 to 1.20, and ran into a snag along the way. I have a simple unit test that creates a stream of data (Long values), applies a map function to increment the value by 1, and sinks it to a list of Longs. I copied this unit test from the Flink documentation, and tried testing it incrementally. Originally in 1.15.2, next 1.16.2 etc and found that it is breaking in Flink 1.18.1
I am attempting to build using Scala 2.12 through sbt, tests are run with the "sbt test" command
class MappingTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build)
before {
flinkCluster.before()
}
after {
flinkCluster.after()
}
"IncrementFlatMapFunction pipeline" should "incrementValues" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// configure your test environment
env.setParallelism(2)
// values are collected in a static variable
CollectSink.values.clear()
// create a stream of custom elements and apply transformations
env.fromElements(1, 21, 22)
.map(x => x + 1L)
.addSink(new CollectSink())
// execute
env.execute()
// verify your results
CollectSink.values should contain allOf(2, 22, 23)
}
}
class CollectSink extends SinkFunction[Long] {
override def invoke(value: Long, context: SinkFunction.Context): Unit = {
CollectSink.values.add(value)
}
}
object CollectSink {
// must be static
val values: util.List[Long] = Collections.synchronizedList(new util.ArrayList())
}
the error message that i'm receiving:
should incrementValues *** FAILED ***
[info] org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
[info] at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
[info] at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
[info] at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
[info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
[info] at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
[info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
[info] ...
[info] Cause: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
[info] at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] ...
[info] Cause: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig
[info] at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
[info] at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] ...
[info] Cause: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig
[info] at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
[info] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] ...
[info] Cause: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig
[info] at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
[info] at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
[info] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
[info] at java.base/java.lang.Class.forName0(Native Method)
[info] at java.base/java.lang.Class.forName(Class.java:467)
[info] at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
[info] at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2034)
[info] at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1898)
[info] at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2224)
[info] at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
Based on the research that i've done, ClassNotFoundException errors are typically related to mismatch versions. I've simplified my dependencies in my build.sbt file:
val flinkVersion = "1.18.1"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" % "flink-test-utils" % flinkVersion % Test,
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
"com.typesafe" % "config" % "1.4.3"
)
Also, while looking through the release notes, it was mentioned that 1.18 was tested using Java 17. I have tried running tests in a Java 11 and Java 17 environment, both are receiving the same error.
Additional information here is the tutorial from apache that i'm following https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/testing/
From mail-archive - Getting java.lang.ClassNotFoundException in Tests (Flink 1.18.0) - 03 Dec 2023
Forking in sbt solved the issue
(Test / fork := true)
.
So, adding that line to your build.sbt
config file solves the problem.
I was able to reproduce the problem locally using the code you provided. Once I added that line, the test succeeded. Also works for 1.19.1
and 1.20.0
.
build.sbt
ThisBuild / scalaVersion := "2.12.20"
val flinkVersion = "1.20.0"
val flinkDependencies: Seq[ModuleID] = Seq(
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" % "flink-test-utils" % flinkVersion % Test,
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
"com.typesafe" % "config" % "1.4.3"
)
lazy val root = (project in file("."))
.settings(
name := "stackoverflow-poc-apache-flink",
libraryDependencies ++= flinkDependencies,
Test / fork := true // without this line, the test fail with the issue you reported
)
MappingTest.scala
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.util.Collections
class MappingTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build)
before {
flinkCluster.before()
}
after {
flinkCluster.after()
}
"IncrementFlatMapFunction pipeline" should "incrementValues" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// configure your test environment
env.setParallelism(2)
// values are collected in a static variable
CollectSink.values.clear()
// create a stream of custom elements and apply transformations
env.fromElements(1, 21, 22)
.map(x => x + 1L)
.addSink(new CollectSink())
// execute
env.execute()
// verify your results
CollectSink.values should contain allOf(2, 22, 23)
}
}
class CollectSink extends SinkFunction[Long] {
override def invoke(value: Long, context: SinkFunction.Context): Unit = {
CollectSink.values.add(value)
}
}
object CollectSink {
// must be static
val values: java.util.List[Long] = Collections.synchronizedList(new java.util.ArrayList())
}