apache-flink

Flink 1.20: kafka sink job fails with endofinput exception when restoring from savepoint


i encountered one kafka sink exception when started from a savepoint. msg as below:
java.lang.IllegalStateException: Received element after endOfInput: Record @ (undef) : org.apache.flink.table.data.binary.BinaryRowData@3f797ef9

my env:
flink verison: 1.20.1
kafka flink connector version: 3.3.0
jdk version: 1.8

this flink sql job:
----------------------------------------------------

CREATE TEMPORARY TABLE `t1` (
  `org_id` VARCHAR
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
CREATE TEMPORARY TABLE `kafka_out` (

  `org_id` VARCHAR
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'xxx:9092',
  'topic' = 'test2',
  'value.format' = 'json',
  'properties.enable.idempotence' = 'false'
);
insert into kafka_out select * from t1;

------------------------------------------

steps to reproduce:
1. start this sql job and then stop with savepoint
2. using this savepoint to restart this sql job. then encounter:


2025-12-0817:14:44.002 [kafka_out[2]: Writer -> kafka_out[2]: Committer (1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - kafka_out[2]: Writer -> kafka_out[2]: Committer (1/1)#0 (54d08e14b76ac2f5d7f195c7732acee6_20ba6b65f97481d5570070de90e4e791_0_0) switched fromRUNNING to FAILED with failure cause:
java.lang.IllegalStateException: Received element after endOfInput: Record @ (undef) : org.apache.flink.table.data.binary.BinaryRowData@3f797ef9
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:206)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)

deep into the flink 1.20 source code SinkWriterOperator.class, i found that:
1. when savepoint was triggered, SourceOperator called the method stop and the method emitNextNotReading sent the status: DataInputStatus.END_OF_DATA
2. the END_OF_DATA finalliy called the SinkWriterOperator.endInput which set the variable endOfInput=true. the call stack:

org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.endInput(SinkWriterOperator.java:232)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:101)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$finish$0(StreamOperatorWrapper.java:154)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:154)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:161)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:115)
at org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:695)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:653)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)

3. the endOfInput was stored in flink status in the savepoint file.
4. when restored from this savepoint, the endOfInput variable was set to true. and the SinkWriterOperator throw an exception when processing new incoming record. the source code is as below:

@Override
public void processElement(StreamRecord<InputT> element) throws Exception {
checkState(!endOfInput, "Received element after endOfInput: %s", element);
context.element = element;
sinkWriter.write(element.getValue(), context);
}

is there any way to work around this issue?


Solution

  • this issue was fixed in flink 1.20.3

    for details refer to: https://issues.apache.org/jira/browse/FLINK-37605