apache-flinkflink-batch

Apache Flink FileSink in BATCH execution mode: in-progress files are not transitioned to finished state


What we are trying to do: we are evaluating Flink to perform batch processing using DataStream API in BATCH mode.

Minimal application to reproduce the issue:

public class FlinkS3ProcessingDemoApplication {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        DataStreamSource<String> source = env.readTextFile("file:///Users/user1/any-text-file.txt");

        source.sinkTo(FileSink.forRowFormat(new Path("file:///Users/user1/output/"), new SimpleStringEncoder<String>("UTF-8")).build());

        env.execute("Test Flink application");
    }
}

Flink version: 1.12.2 or 1.13.0

Expected result: "finalized" files in the /Users/user1/output/ folder.

According to FileSink documentation:

Given that Flink sinks and UDFs in general do not differentiate between normal job termination (e.g. finite input stream) and termination due to failure, upon normal termination of a job, the last in-progress files will not be transitioned to the “finished” state.

specific note for BATCH mode:

Pending files are committed, i.e. transition to Finished state, after the whole input has been processed.

Actual result:

.
└── 2021-07-13--10
    ├── .part-707a8590-04cb-4c2d-97b2-5652697d9c76-0.inprogress.7e99df6f-703d-44b3-875a-283e12b31c8e
    ├── .part-a82bcabd-065d-4263-bee0-72f8673f3fd3-0.inprogress.65067b75-ef6c-4185-ae87-fe59de95c86a
    ├── .part-c7c36fd5-fb31-4d55-b783-5373ce69e216-0.inprogress.3e953235-09f1-487b-8229-2cdfa0e2daf4
    └── .part-e66b004a-271f-4aae-9604-e035b2c2cfe3-0.inprogress.add8b0d9-aa89-491e-9a9d-f07b73ab8256

and the following exceptions:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
    at akka.dispatch.OnComplete.internal(Future.scala:264)
    at akka.dispatch.OnComplete.internal(Future.scala:261)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    ... 4 more
Caused by: java.nio.channels.ClosedChannelException
    at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
    at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:253)
    at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:103)
    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:74)
    at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
    at org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:196)
    at org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:200)
    at org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:128)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:128)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:135)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:439)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:627)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.lang.Thread.run(Thread.java:748)

What we want to know: if it's possible to use Flink in batch mode in combination with FileSink or StreamingFileSink.

Thanks in advance!


Solution

  • The source interfaces where reworked in FLIP-27 to provide support for BATCH execution mode in the DataStream API. In order to get the FileSink to properly transition PENDING files to FINISHED when running in BATCH mode, you need to use a source that implements FLIP-27, such as the FileSource (instead of readTextFile): https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/file/src/FileSource.html.

    As you discovered, that looks like this:

    DataStreamSource<String> source = 
      env.fromSource(
        FileSource.forRecordStreamFormat(
          new TextLineFormat(),
          new Path("file:///Users/user/file.txt")
        ).build(),
        WatermarkStrategy.noWatermarks(),
        "MySourceName"
      );
    

    If instead you needed a bulk format, such as parquet, then you would do something more like this:

    DataStreamSource<String> source = 
      env.fromSource(
        FileSource.forBulkFileFormat(
          new ParquetColumnarRowInputFormat(...),
          new Path("file:///Users/me/data.parquet")
        ).build(),
        WatermarkStrategy.noWatermarks(),
        "MySourceName"
      );
    

    For parquet there's also the ParquetVectorizedInputFormat, and there are formats for orc, etc.