We are running Apache Flink 1.9 in Kubernetes. We have a few jobs that consume Kafka events and collect the counts every minute. The jobs have been working quite well, but recently there are suddenly numerous errors
java.lang.RuntimeException: Partition already finished.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
The code that throws the error is from a Listener that gets events and emits watermark.
// We use an underlying API lib to get a source Context from Flink, sorry not to have source code here
import org.apache.flink.streaming.api.functions.source.SourceFunction
protected var context: SourceFunction.SourceContext[T] = ...
validEventsSorted.foreach { event =>
try {
context.collectWithTimestamp(event, event.occurredAt.toEpochMilli)
context.emitWatermark(new Watermark(event.occurredAt.minusSeconds(30).toEpochMilli))
} catch {
case e: Throwable =>
logger.error(
s"Failed to add to context. Event EID: ${event.nakadiMetaData.eid}." +
s" Event: $event",
e
)
}
}
Restarting Flink job manager and task manager will end the errors, but this issue may come again later.
As I understand and guess, the Partition already finished
is caused when an operator tries to deliver the events to next operator (partition), but I do not understand how this could happen.
And here is our code on Source
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
class SourceImpl[T: ClassTag](
listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {
@volatile private var isCancelled: Boolean = false
@volatile private var consumerFuture: java.util.concurrent.Future[_] = _
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
while (!isCancelled) {
val runnable = KafkaClient
.stream(subscription)
.withStreamParameters(streamParameters)
.runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)
val executorService = Executors.newSingleThreadExecutor()
consumerFuture = executorService.submit(runnable)
consumerFuture.get() // This is blocking
} catch {
case e: Throwable =>
logger.warn(s"Unknown error consuming events", e)
}
}
}
override def cancel(): Unit = {
isCancelled = true
consumerFuture.cancel(true)
}
}
Anybody has any idea why and how to solve this issue?
It turned out there is a bug in our SourceImpl
. When this job is cancelled by JobManager, cancel
method is called but may fail and executorService
is not shutdown and the runnable
is still running in TaskManager, which consumes events and emits WaterMark. Since the job is already marked as cancelled in JobManager and TaskManager, the watermark emission will lead to Partition already finished
exception.
So, we made a fix to shutdown ExecutoreService
explicitly
// Shutdown executorService
if (executorService != null && !executorService.isShutdown) {
executorService.shutdownNow()
}
The full code is below
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
class SourceImpl[T: ClassTag](
listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {
@volatile private var isCancelled: Boolean = false
@volatile private var consumerFuture: java.util.concurrent.Future[_] = _
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
val executorService = Executors.newSingleThreadExecutor()
while (!isCancelled) {
val runnable = KafkaClient
.stream(subscription)
.withStreamParameters(streamParameters)
.runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)
consumerFuture = executorService.submit(runnable)
consumerFuture.get() // This is blocking
} catch {
case e: Throwable =>
logger.warn(s"Unknown error consuming events", e)
}
}
// Shutdown executorService
if (executorService != null && !executorService.isShutdown) {
executorService.shutdownNow()
}
}
override def cancel(): Unit = {
isCancelled = true
consumerFuture.cancel(true)
}
}
BTW, the reason why we have a new ExecutorService
is to run the listener in a separate thread pool, which would not affect Flink thread pool.