Problem Statement
We are consuming multiple csv files into pcollections -> Apply beam SQL to transform data -> write resulted pcollection. This is working absolutely fine if we have some data in all the source pCollections and beam SQL generates new collection with some data. When Transform pCollection is generating empty pCollection and when writing that in netApp Storage Grid it is throwing below,
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at ECSOperations.main(ECSOperations.java:53)
Caused by: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
at org.apache.beam.sdk.io.FileBasedSink$Writer.close(FileBasedSink.java:1076)
at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.createMissingEmptyShards(FileBasedSink.java:759)
at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalizeDestination(FileBasedSink.java:639)
at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:1040)
Caused by: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Your proposed upload is smaller than the minimum allowed object size. (Service: Amazon S3; Status Code: 400; Error Code: EntityTooSmall; Request ID: 1643869619144605; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null
Following is sample code
ECSOptions options = PipelineOptionsFactory.fromArgs(args).as(ECSOptions.class);
setupConfiguration(options);
Pipeline p = Pipeline.create(options);
PCollection<String> pSource= p.apply(TextIO.read().from("src/main/resources/empty.csv"));
pSource.apply(TextIO.write().to("s3://bucket-name/empty.csv").withoutSharding());
p.run();
Observation
What I have tried
Ask
I came up with two more options:
PCollection<String> pcollToWrite = ...
// This will count the number of elements in toWriteSize at runtime.
PCollectionView<Long> toWriteSize = pcollToWrite.apply(Count.globally().asSingletonView());
PCollection<String> emptyOrSingletonPCollection =
p
// Creates a PCollection with a single element.
.apply(Create.of(Collections.singletonList(""))
// Applies a ParDo that will emit this single element if and only if
// toWriteSize is zero.
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(@Element String mainElement, OutputReceiver<String> out, ProcessContext c) {
if (c.sideInput(toWriteSize) == 0) {
out.output("");
}
}
}).withSideInputs(toWriteSize));
// We now flatten pcollToWrite and emptyOrSingletonPCollection together.
// The right hand side has exactly one element whenever the left hand side
// is empty, so there will always be at least one element.
PCollectionList.of(pcollToWrite, emptyOrSingletonPCollection)
.apply(Flatten.pCollections())
.apply(TextIO.write(...))