I am trying to scan and get a chunk of data from my object in S3 using S3 select using Java SDK.
I am using scanRange
class to provide the start
and end
bytes range of the S3 object.
The s3 object is a 9MB parquet file having an uncompressed size of 84MB.
While getting the s3 object without scanRange
option in SelectObjectContentRequest
, I am getting a 84MB file as eexpectd.
But while using the scanRange
even then I am getting the entire 84MB file, irrespective of the start
or end
bytes I have provided.
Following is the code snippet I am using:
public class S3SelectParquetParser {
public Object getS3SelectResponse(AmazonS3 s3Client, S3SelectParserRequest s3SelectParserRequest) throws IOException {
SelectObjectContentRequest selectRequest = gets3SelectRequestObject(s3SelectParserRequest);
SelectObjectContentResult result = s3Client.selectObjectContent(selectRequest);
return massageResult(result.getPayload(), s3SelectParserRequest.getOutputFile());
}
private SelectObjectContentRequest gets3SelectRequestObject(S3SelectParserRequest s3SelectParserRequest) {
SelectObjectContentRequest requestObject = new SelectObjectContentRequest();
InputSerialization inputSerialization = new InputSerialization()
.withParquet(new ParquetInput())
.withCompressionType(CompressionType.NONE);
OutputSerialization outputSerialization = new OutputSerialization()
.withJson(new JSONOutput());
RequestProgress requestProgress = new RequestProgress();
if (s3SelectParserRequest.isShowProgress()) {
requestProgress.setEnabled(true);
} else {
requestProgress.setEnabled(false);
}
requestObject.setBucketName(s3SelectParserRequest.getBucketName());
requestObject.setKey(s3SelectParserRequest.getObjectPath());
requestObject.setInputSerialization(inputSerialization);
requestObject.setOutputSerialization(outputSerialization);
requestObject.setExpressionType(ExpressionType.SQL);
requestObject.setExpression(s3SelectParserRequest.getQuery());
requestObject.setRequestProgress(requestProgress);
if (s3SelectParserRequest.isRangedRequest()) {
ScanRange scanRange = new ScanRange();
scanRange.setStart(s3SelectParserRequest.getStartByteRange());
scanRange.setEnd(s3SelectParserRequest.getEndByteRange());
requestObject.setScanRange(scanRange);
}
return requestObject;
}
private Object massageResult(SelectObjectContentEventStream payload, String outputFile) throws IOException {
log.info("Starting reading the inputStream");
final AtomicBoolean isResultComplete = new AtomicBoolean(false);
OutputStream outputStream = null;
try {
outputStream = new FileOutputStream(outputFile);
} catch (Exception e) {
log.error("Exception occurred", e);
}
InputStream resultInputStream = payload.getRecordsInputStream(
new SelectObjectContentEventVisitor() {
@Override
public void visit(SelectObjectContentEvent.StatsEvent event) {
System.out.println(
"Received Stats, Bytes Scanned: " + event.getDetails().getBytesScanned()
+ " Bytes Processed: " + event.getDetails().getBytesProcessed());
}
/*
* An End Event informs that the request has finished successfully.
*/
@Override
public void visit(SelectObjectContentEvent.EndEvent event) {
isResultComplete.set(true);
System.out.println("Received End Event. Result is complete.");
}
@Override
public void visit(ProgressEvent event) {
log.info("{Progress}: Bytes processed: %s \t Bytes received: %s", event.getDetails().getBytesProcessed(),
event.getDetails().getBytesReturned());
}
}
);
copy(resultInputStream, outputStream);
return resultInputStream;
}
}
Where the S3SelectParserRequest
is initializd as:
S3SelectParserRequest s3SelectRequestRanged = S3SelectParserRequest.builder()
.bucketName(bucketName)
.objectPath(bucketPath)
.query(fullQuery)
.showProgress(false)
.isRangedRequest(true)
.startByteRange(0)
.endByteRange(4098)
.outputFile("ranged-0-400.json")
.build();
and the getS3SelectResponse
is called as:
new S3SelectParquetParser().getS3SelectResponse(s3Client, s3SelectRequestRanged);
I figured it out. S3 select provides the data for all the row groups which are contained in the provided byte scanRange
for a parquet object.
Initial 4 bytes contains the header and at the fifth byte onwards of the parquet object contains the first row group.
Since my first rowgroup was after 0th byte (my provided startByterange
), no matter what endByteRange
I provide after 4, it will return the first row group (and also others if they start in my specified range).
My parquet file contained only one single Row group, so no matter what datarange is provided it was providing entire file, i.e. First row group.