javaamazon-web-servicesamazon-s3aws-java-sdkamazon-s3-select

Getting partial json response for s3select with aws java sdk v2


I am trying to implement s3select in a spring boot app to query parquet file in s3 bucket, I am only getting partial result from the s3select output, Please help to identify the issue, i have used aws java sdk v2.

Upon checking the json output(printed in the console), overall characters in the output is 65k.

I am using eclipse and tried unchecking "Limit console output" in the console preference, which did not help.

Code is here:-

import java.util.List;
import java.util.concurrent.CompletableFuture;

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CompressionType;
import software.amazon.awssdk.services.s3.model.EndEvent;
import software.amazon.awssdk.services.s3.model.ExpressionType;
import software.amazon.awssdk.services.s3.model.InputSerialization;
import software.amazon.awssdk.services.s3.model.JSONOutput;
import software.amazon.awssdk.services.s3.model.OutputSerialization;
import software.amazon.awssdk.services.s3.model.ParquetInput;
import software.amazon.awssdk.services.s3.model.RecordsEvent;
import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream;
import software.amazon.awssdk.services.s3.model.SelectObjectContentEventStream.EventType;
import software.amazon.awssdk.services.s3.model.SelectObjectContentRequest;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponse;
import software.amazon.awssdk.services.s3.model.SelectObjectContentResponseHandler;

public class ParquetSelect {
    
    private static final String BUCKET_NAME = "<bucket-name>";
    private static final String KEY = "<object-key>";
    private static final String QUERY = "select * from S3Object s";
    public static S3AsyncClient s3;
    

    public static void selectObjectContent() {
        Handler handler = new Handler();
        
        SelectQueryWithHandler(handler).join();

        RecordsEvent recordsEvent = (RecordsEvent) handler.receivedEvents.stream()
                                                                         .filter(e -> e.sdkEventType() == EventType.RECORDS)
                                                                         .findFirst()
                                                                         .orElse(null);

        System.out.println(recordsEvent.payload().asUtf8String());

    }

    private static CompletableFuture<Void> SelectQueryWithHandler(SelectObjectContentResponseHandler handler) {
        InputSerialization inputSerialization = InputSerialization.builder()
                                                                  .parquet(ParquetInput.builder().build())
                                                                  .compressionType(CompressionType.NONE)
                                                                  .build();


        OutputSerialization outputSerialization = OutputSerialization.builder()
                                                                     .json(JSONOutput.builder().build())
                                                                     .build();


        SelectObjectContentRequest select = SelectObjectContentRequest.builder()
                                                                      .bucket(BUCKET_NAME)
                                                                      .key(KEY)
                                                                      .expression(QUERY)
                                                                      .expressionType(ExpressionType.SQL)
                                                                      .inputSerialization(inputSerialization)
                                                                      .outputSerialization(outputSerialization)
                                                                      .build();

        return s3.selectObjectContent(select, handler);
    }

    private static class Handler implements SelectObjectContentResponseHandler {
        private SelectObjectContentResponse response;
        private List<SelectObjectContentEventStream> receivedEvents = new ArrayList<>();
        private Throwable exception;

        @Override
        public void responseReceived(SelectObjectContentResponse response) {
            this.response = response;
        }

        @Override
        public void onEventStream(SdkPublisher<SelectObjectContentEventStream> publisher) {
            publisher.subscribe(receivedEvents::add);
        }

        @Override
        public void exceptionOccurred(Throwable throwable) {
            exception = throwable;
        }

        @Override
        public void complete() {
        }
    }
    
}

Solution

  • The problem is, that you may get multiple RecordsEvent events, each containing an InputStream with a part of the data - so you have to iterate through events and join them:

    fun selectS3ObjectContent(socRequest: SelectObjectContentRequest): InputStream {
        val handler = S3SelectObjectContentHandler()
        asyncS3.selectObjectContent(socRequest, handler).join()
    
        return if (handler.receivedEvents.filterIsInstance<RecordsEvent>().isEmpty())
            InputStream.nullInputStream()
        else
            handler.receivedEvents.filterIsInstance<RecordsEvent>().map {
                it.payload().asInputStream()
            }.reduce { l, r ->
                SequenceInputStream(l, r)
            }
    }
    
    class S3SelectObjectContentHandler : SelectObjectContentResponseHandler {
        private var response: SelectObjectContentResponse? = null
        var receivedEvents = ArrayList<SelectObjectContentEventStream>()
        private var exception: Throwable? = null
    
        override fun responseReceived(response: SelectObjectContentResponse?) {
            this.response = response
        }
    
        override fun onEventStream(publisher: SdkPublisher<SelectObjectContentEventStream>) {
            publisher.subscribe { event ->
                receivedEvents.add(event)
            }
        }
    
        override fun exceptionOccurred(throwable: Throwable?) {
            exception = throwable
        }
    
        override fun complete() { /* no-op */ }
    }