I am trying to implement s3select in a spring boot app to query parquet file in s3 bucket, I am only getting partial result from the s3select output, Please help to identify the issue, i have used aws java sdk v2.
Upon checking the json output(printed in the console), overall characters in the output is 65k.
I am using eclipse and tried unchecking "Limit console output" in the console preference, which did not help.
Code is here:-
import java.util.List;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CompressionType;
import software.amazon.awssdk.services.s3.model.EndEvent;
import software.amazon.awssdk.services.s3.model.ExpressionType;
import software.amazon.awssdk.services.s3.model.InputSerialization;
import software.amazon.awssdk.services.s3.model.JSONOutput;
import software.amazon.awssdk.services.s3.model.OutputSerialization;
import software.amazon.awssdk.services.s3.model.ParquetInput;
import software.amazon.awssdk.services.s3.model.RecordsEvent;
import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream;
import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream.EventType;
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponse;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;
public class ParquetSelect {
private static final String BUCKET_NAME = "<bucket-name>";
private static final String KEY = "<object-key>";
private static final String QUERY = "select * from S3Object s";
public static S3AsyncClient s3;
public static void selectObjectContent() {
Handler handler = new Handler();
SelectQueryWithHandler(handler).join();
RecordsEvent recordsEvent = (RecordsEvent) handler.receivedEvents.stream()
.filter(e -> e.sdkEventType() == EventType.RECORDS)
.findFirst()
.orElse(null);
System.out.println(recordsEvent.payload().asUtf8String());
}
private static CompletableFuture<Void> SelectQueryWithHandler(SelectObjectContentResponseHandler handler) {
InputSerialization inputSerialization = InputSerialization.builder()
.parquet(ParquetInput.builder().build())
.compressionType(CompressionType.NONE)
.build();
OutputSerialization outputSerialization = OutputSerialization.builder()
.json(JSONOutput.builder().build())
.build();
SelectObjectContentRequest select = SelectObjectContentRequest.builder()
.bucket(BUCKET_NAME)
.key(KEY)
.expression(QUERY)
.expressionType(ExpressionType.SQL)
.inputSerialization(inputSerialization)
.outputSerialization(outputSerialization)
.build();
return s3.selectObjectContent(select, handler);
}
private static class Handler implements SelectObjectContentResponseHandler {
private SelectObjectContentResponse response;
private List<SelectObjectContentEventStream> receivedEvents = new ArrayList<>();
private Throwable exception;
@Override
public void responseReceived(SelectObjectContentResponse response) {
this.response = response;
}
@Override
public void onEventStream(SdkPublisher<SelectObjectContentEventStream> publisher) {
publisher.subscribe(receivedEvents::add);
}
@Override
public void exceptionOccurred(Throwable throwable) {
exception = throwable;
}
@Override
public void complete() {
}
}
}
The problem is, that you may get multiple RecordsEvent events, each containing an InputStream with a part of the data - so you have to iterate through events and join them:
fun selectS3ObjectContent(socRequest: SelectObjectContentRequest): InputStream {
val handler = S3SelectObjectContentHandler()
asyncS3.selectObjectContent(socRequest, handler).join()
return if (handler.receivedEvents.filterIsInstance<RecordsEvent>().isEmpty())
InputStream.nullInputStream()
else
handler.receivedEvents.filterIsInstance<RecordsEvent>().map {
it.payload().asInputStream()
}.reduce { l, r ->
SequenceInputStream(l, r)
}
}
class S3SelectObjectContentHandler : SelectObjectContentResponseHandler {
private var response: SelectObjectContentResponse? = null
var receivedEvents = ArrayList<SelectObjectContentEventStream>()
private var exception: Throwable? = null
override fun responseReceived(response: SelectObjectContentResponse?) {
this.response = response
}
override fun onEventStream(publisher: SdkPublisher<SelectObjectContentEventStream>) {
publisher.subscribe { event ->
receivedEvents.add(event)
}
}
override fun exceptionOccurred(throwable: Throwable?) {
exception = throwable
}
override fun complete() { /* no-op */ }
}