apache-beamapache-beam-ionetappbeam-sqlapache-beam-internals

Exception while writing multipart empty csv file from Apache Beam into netApp Storage Grid


Problem Statement

We are consuming multiple csv files into pcollections -> Apply beam SQL to transform data -> write resulted pcollection. This is working absolutely fine if we have some data in all the source pCollections and beam SQL generates new collection with some data. When Transform pCollection is generating empty pCollection and when writing that in netApp Storage Grid it is throwing below,

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
    at ECSOperations.main(ECSOperations.java:53)
Caused by: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
    at org.apache.beam.sdk.io.FileBasedSink$Writer.close(FileBasedSink.java:1076)
    at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.createMissingEmptyShards(FileBasedSink.java:759)
    at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalizeDestination(FileBasedSink.java:639)
    at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:1040)
Caused by: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Your proposed upload is smaller than the minimum allowed object size. (Service: Amazon S3; Status Code: 400; Error Code: EntityTooSmall; Request ID: 1643869619144605; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null

Following is sample code

ECSOptions options = PipelineOptionsFactory.fromArgs(args).as(ECSOptions.class);
setupConfiguration(options);
Pipeline p = Pipeline.create(options);
PCollection<String> pSource= p.apply(TextIO.read().from("src/main/resources/empty.csv"));    
pSource.apply(TextIO.write().to("s3://bucket-name/empty.csv").withoutSharding());    
p.run();

Observation

What I have tried

Ask


Solution

  • I came up with two more options:

    1. TextIO.write().withFooter(...) to always write a single empty line (or space or whatever) at the end of your file to ensure it's not empty.
    2. You could flatten your PCollection with an PCollection that has a single empty line iff the given PCollection is empty. (This is more complicated, but could be used more generally.) Specifically
    PCollection<String> pcollToWrite = ...
    
    // This will count the number of elements in toWriteSize at runtime.
    PCollectionView<Long> toWriteSize = pcollToWrite.apply(Count.globally().asSingletonView());
    
    PCollection<String> emptyOrSingletonPCollection = 
        p
          // Creates a PCollection with a single element.
          .apply(Create.of(Collections.singletonList(""))
          // Applies a ParDo that will emit this single element if and only if
          // toWriteSize is zero.
          .apply(ParDo.of(new DoFn<String, String>() {
              @ProcessElement
              public void processElement(@Element String mainElement, OutputReceiver<String> out, ProcessContext c) {
                if (c.sideInput(toWriteSize) == 0) {
                  out.output("");
                }
              }
          }).withSideInputs(toWriteSize));
    
    // We now flatten pcollToWrite and emptyOrSingletonPCollection together.
    // The right hand side has exactly one element whenever the left hand side
    // is empty, so there will always be at least one element.
    PCollectionList.of(pcollToWrite, emptyOrSingletonPCollection)
        .apply(Flatten.pCollections())
        .apply(TextIO.write(...))