What we are trying to do: we are evaluating Flink to perform batch processing using DataStream API in BATCH
mode.
Minimal application to reproduce the issue:
public class FlinkS3ProcessingDemoApplication {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStreamSource<String> source = env.readTextFile("file:///Users/user1/any-text-file.txt");
source.sinkTo(FileSink.forRowFormat(new Path("file:///Users/user1/output/"), new SimpleStringEncoder<String>("UTF-8")).build());
env.execute("Test Flink application");
}
}
Flink version: 1.12.2 or 1.13.0
Expected result: "finalized" files in the /Users/user1/output/
folder.
According to FileSink
documentation:
Given that Flink sinks and UDFs in general do not differentiate between normal job termination (e.g. finite input stream) and termination due to failure, upon normal termination of a job, the last in-progress files will not be transitioned to the “finished” state.
specific note for BATCH
mode:
Pending files are committed, i.e. transition to Finished state, after the whole input has been processed.
Actual result:
.
└── 2021-07-13--10
├── .part-707a8590-04cb-4c2d-97b2-5652697d9c76-0.inprogress.7e99df6f-703d-44b3-875a-283e12b31c8e
├── .part-a82bcabd-065d-4263-bee0-72f8673f3fd3-0.inprogress.65067b75-ef6c-4185-ae87-fe59de95c86a
├── .part-c7c36fd5-fb31-4d55-b783-5373ce69e216-0.inprogress.3e953235-09f1-487b-8229-2cdfa0e2daf4
└── .part-e66b004a-271f-4aae-9604-e035b2c2cfe3-0.inprogress.add8b0d9-aa89-491e-9a9d-f07b73ab8256
and the following exceptions:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.nio.channels.ClosedChannelException
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:253)
at org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:103)
at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:74)
at org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
at org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:196)
at org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:200)
at org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:128)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:128)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:135)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:439)
at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:627)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
What we want to know: if it's possible to use Flink in batch mode in combination with FileSink
or StreamingFileSink
.
Thanks in advance!
The source interfaces where reworked in FLIP-27 to provide support for BATCH execution mode in the DataStream API. In order to get the FileSink
to properly transition PENDING files to FINISHED when running in BATCH mode, you need to use a source that implements FLIP-27, such as the FileSource
(instead of readTextFile
): https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/file/src/FileSource.html.
As you discovered, that looks like this:
DataStreamSource<String> source =
env.fromSource(
FileSource.forRecordStreamFormat(
new TextLineFormat(),
new Path("file:///Users/user/file.txt")
).build(),
WatermarkStrategy.noWatermarks(),
"MySourceName"
);
If instead you needed a bulk format, such as parquet, then you would do something more like this:
DataStreamSource<String> source =
env.fromSource(
FileSource.forBulkFileFormat(
new ParquetColumnarRowInputFormat(...),
new Path("file:///Users/me/data.parquet")
).build(),
WatermarkStrategy.noWatermarks(),
"MySourceName"
);
For parquet there's also the ParquetVectorizedInputFormat
, and there are formats for orc, etc.