javaspringproject-reactorspring-webfluxreactive-streams

How to correctly read Flux<DataBuffer> and convert it to a single inputStream


I'm using WebClient and custom BodyExtractorclass for my spring-boot application

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}

Above code works with small payload but not on a large payload, I think it's because I'm only reading a single flux value with next and I'm not sure how to combine and read all dataBuffer.

I'm new to reactor, so I don't know a lot of tricks with flux/mono.


Solution

  • I was able to make it work by using Flux#collect and SequenceInputStream

    @Override
    public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
      Flux<DataBuffer> body = response.getBody();
      return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
        .map(inputStream -> {
          try {
            JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
            Unmarshaller unmarshaller = jc.createUnmarshaller();
    
            return (T) unmarshaller.unmarshal(inputStream);
          } catch(Exception e){
            return null;
          }
      }).next();
    }
    

    InputStreamCollector.java

    public class InputStreamCollector {
      private InputStream is;
    
      public void collectInputStream(InputStream is) {
        if (this.is == null) this.is = is;
        this.is = new SequenceInputStream(this.is, is);
      }
    
      public InputStream getInputStream() {
        return this.is;
      }
    }