exceptionkubernetesapache-flinkpartition

Apache Flink throws "Partition already finished" exceptions


We are running Apache Flink 1.9 in Kubernetes. We have a few jobs that consume Kafka events and collect the counts every minute. The jobs have been working quite well, but recently there are suddenly numerous errors

java.lang.RuntimeException: Partition already finished.
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

The code that throws the error is from a Listener that gets events and emits watermark.

    // We use an underlying API lib to get a source Context from Flink, sorry not to have source code here
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    protected var context: SourceFunction.SourceContext[T] = ...

    validEventsSorted.foreach { event =>
      try {
        context.collectWithTimestamp(event, event.occurredAt.toEpochMilli)
        context.emitWatermark(new Watermark(event.occurredAt.minusSeconds(30).toEpochMilli))
      } catch {
        case e: Throwable =>
          logger.error(
              s"Failed to add to context. Event EID: ${event.nakadiMetaData.eid}." +
                s" Event: $event",
              e
            )
      }

    }

Restarting Flink job manager and task manager will end the errors, but this issue may come again later.

As I understand and guess, the Partition already finished is caused when an operator tries to deliver the events to next operator (partition), but I do not understand how this could happen.

And here is our code on Source

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction

class SourceImpl[T: ClassTag](
    listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {

  @volatile private var isCancelled: Boolean = false

  @volatile private var consumerFuture: java.util.concurrent.Future[_] = _

  override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
    
    while (!isCancelled) {
        val runnable = KafkaClient
          .stream(subscription)
          .withStreamParameters(streamParameters)
          .runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)

        val executorService = Executors.newSingleThreadExecutor()
        consumerFuture = executorService.submit(runnable)
        consumerFuture.get() // This is blocking
      } catch {
        case e: Throwable =>
          logger.warn(s"Unknown error consuming events", e)
      }
    }
  }

  override def cancel(): Unit = {
    isCancelled = true
    consumerFuture.cancel(true)
  }
}

Anybody has any idea why and how to solve this issue?


Solution

  • It turned out there is a bug in our SourceImpl. When this job is cancelled by JobManager, cancel method is called but may fail and executorService is not shutdown and the runnable is still running in TaskManager, which consumes events and emits WaterMark. Since the job is already marked as cancelled in JobManager and TaskManager, the watermark emission will lead to Partition already finished exception.

    So, we made a fix to shutdown ExecutoreService explicitly

        // Shutdown executorService
        if (executorService != null && !executorService.isShutdown) {
          executorService.shutdownNow()
        }
    

    The full code is below

    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
    
    class SourceImpl[T: ClassTag](
        listener: KafkaListener[T]
    )
    extends RichParallelSourceFunction[T] {
    
      @volatile private var isCancelled: Boolean = false
    
      @volatile private var consumerFuture: java.util.concurrent.Future[_] = _
    
      override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
        
        val executorService = Executors.newSingleThreadExecutor()
      
        while (!isCancelled) {
            val runnable = KafkaClient
              .stream(subscription)
              .withStreamParameters(streamParameters)
              .runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)
    
            consumerFuture = executorService.submit(runnable)
            consumerFuture.get() // This is blocking
          } catch {
            case e: Throwable =>
              logger.warn(s"Unknown error consuming events", e)
          }
        }
    
        // Shutdown executorService
        if (executorService != null && !executorService.isShutdown) {
          executorService.shutdownNow()
        }
      }
    
      override def cancel(): Unit = {
        isCancelled = true
        consumerFuture.cancel(true)
      }
    }
    

    BTW, the reason why we have a new ExecutorService is to run the listener in a separate thread pool, which would not affect Flink thread pool.