amazon-s3parquetaws-java-sdkamazon-s3-select

Aws S3 Select ScanRange is not working as expected and providing entire file result


I am trying to scan and get a chunk of data from my object in S3 using S3 select using Java SDK. I am using scanRange class to provide the start and end bytes range of the S3 object. The s3 object is a 9MB parquet file having an uncompressed size of 84MB.

While getting the s3 object without scanRange option in SelectObjectContentRequest, I am getting a 84MB file as eexpectd.

But while using the scanRange even then I am getting the entire 84MB file, irrespective of the start or end bytes I have provided.

Following is the code snippet I am using:

public class S3SelectParquetParser {
  public Object getS3SelectResponse(AmazonS3 s3Client, S3SelectParserRequest s3SelectParserRequest) throws IOException {
    SelectObjectContentRequest selectRequest = gets3SelectRequestObject(s3SelectParserRequest);
    SelectObjectContentResult result = s3Client.selectObjectContent(selectRequest);
    return massageResult(result.getPayload(), s3SelectParserRequest.getOutputFile());
  }

  private SelectObjectContentRequest gets3SelectRequestObject(S3SelectParserRequest s3SelectParserRequest) {
    SelectObjectContentRequest requestObject = new SelectObjectContentRequest();

    InputSerialization inputSerialization = new InputSerialization()
        .withParquet(new ParquetInput())
        .withCompressionType(CompressionType.NONE);

    OutputSerialization outputSerialization = new OutputSerialization()
        .withJson(new JSONOutput());

    RequestProgress requestProgress = new RequestProgress();
    if (s3SelectParserRequest.isShowProgress()) {
      requestProgress.setEnabled(true);
    } else {
      requestProgress.setEnabled(false);
    }

    requestObject.setBucketName(s3SelectParserRequest.getBucketName());
    requestObject.setKey(s3SelectParserRequest.getObjectPath());
    requestObject.setInputSerialization(inputSerialization);
    requestObject.setOutputSerialization(outputSerialization);
    requestObject.setExpressionType(ExpressionType.SQL);
    requestObject.setExpression(s3SelectParserRequest.getQuery());
    requestObject.setRequestProgress(requestProgress);

    if (s3SelectParserRequest.isRangedRequest()) {
      ScanRange scanRange = new ScanRange();
      scanRange.setStart(s3SelectParserRequest.getStartByteRange());
      scanRange.setEnd(s3SelectParserRequest.getEndByteRange());
      requestObject.setScanRange(scanRange);
    }
    return requestObject;
  }


  private Object massageResult(SelectObjectContentEventStream payload, String outputFile) throws IOException {
    log.info("Starting reading the inputStream");
    final AtomicBoolean isResultComplete = new AtomicBoolean(false);
    OutputStream outputStream = null;
    try {
      outputStream = new FileOutputStream(outputFile);
    } catch (Exception e) {
      log.error("Exception occurred", e);
    }
    InputStream resultInputStream = payload.getRecordsInputStream(
        new SelectObjectContentEventVisitor() {
          @Override
          public void visit(SelectObjectContentEvent.StatsEvent event) {
            System.out.println(
                "Received Stats, Bytes Scanned: " + event.getDetails().getBytesScanned()
                    + " Bytes Processed: " + event.getDetails().getBytesProcessed());
          }

          /*
           * An End Event informs that the request has finished successfully.
           */
          @Override
          public void visit(SelectObjectContentEvent.EndEvent event) {
            isResultComplete.set(true);
            System.out.println("Received End Event. Result is complete.");
          }

          @Override
          public void visit(ProgressEvent event) {
            log.info("{Progress}: Bytes processed: %s \t Bytes received: %s", event.getDetails().getBytesProcessed(),
                event.getDetails().getBytesReturned());
          }

        }
    );
    copy(resultInputStream, outputStream);
    return resultInputStream;
  }
}

Where the S3SelectParserRequest is initializd as:

  S3SelectParserRequest s3SelectRequestRanged = S3SelectParserRequest.builder()
        .bucketName(bucketName)
        .objectPath(bucketPath)
        .query(fullQuery)
        .showProgress(false)
        .isRangedRequest(true)
        .startByteRange(0)
        .endByteRange(4098)
        .outputFile("ranged-0-400.json")
        .build();

and the getS3SelectResponse is called as:

new S3SelectParquetParser().getS3SelectResponse(s3Client, s3SelectRequestRanged);

Solution

  • I figured it out. S3 select provides the data for all the row groups which are contained in the provided byte scanRange for a parquet object. Initial 4 bytes contains the header and at the fifth byte onwards of the parquet object contains the first row group. Since my first rowgroup was after 0th byte (my provided startByterange), no matter what endByteRange I provide after 4, it will return the first row group (and also others if they start in my specified range). My parquet file contained only one single Row group, so no matter what datarange is provided it was providing entire file, i.e. First row group.