vert.xvert.x-webclient

Use vert.x RecordParser with WebClient streaming


I am trying to use RecordParser for large response from WebClient.

Vert.x documentation says:

When large response are expected, use the BodyCodec.pipe. This body codec pumps the response body buffers to a WriteStream and signals the success or the failure of the operation in the async result response

But I don't see an easy way to pass that WriteStream to RecordParser. I have used simplified code pasted below which works but implementing such is a source of potential bugs as async protocols are easier to mess up. Does vert.x offers out of box such integration.

RecordParser parser = RecordParser.newDelimited("\n", b -> log.info("r={}", b.toString()));
RecordParserWriteStream bridge = new RecordParserWriteStream(parser);
client
    .get(sut.actualPort(), "localhost", "/stream?file=stream2.txt")
    .as(BodyCodec.pipe(bridge))
    .send(
        ar -> {
          if (ar.succeeded()) {
            ctx.completeNow();
          } else {
            ctx.failNow(ar.cause());
          }
        });


@Slf4j
@RequiredArgsConstructor
public class RecordParserWriteStream implements WriteStream<Buffer> {

    private final RecordParser recordParser;

    @Override
    public WriteStream<Buffer> exceptionHandler(@Nullable Handler<Throwable> handler) {
        recordParser.exceptionHandler(handler);
        return this;
    }

    @Override
    public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
        log.info("write {}", data.length());
        recordParser.handle(data);
        Promise<Void> promise = Promise.promise();
        promise.complete();
        handler.handle(promise.future());
    }

    @Override
    public void end(Handler<AsyncResult<Void>> handler) {
        Promise<Void> promise = Promise.promise();
        promise.complete();
        handler.handle(promise.future());
    }

    @Override
    public boolean writeQueueFull() {
        return false;
    }

    @Override
    public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) {
        return this;
    }

    @Override
    public Future<Void> write(Buffer data) {
        throw new UnsupportedOperationException();
    }

    @Override
    public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
        throw new UnsupportedOperationException();
    }
}

I found few older SO answers where it was suggested to use HttpClient but official documentation still recommends WebClient with BodyCodec.pipe/WriteStream.

With using HttpClient it looks as

RecordParser parser = RecordParser.newDelimited("\n", h -> log.info("r={}", h.toString()));
client
    .request(HttpMethod.GET, sut.actualPort(), "localhost", "/stream?file=stream1.txt")
    .compose(HttpClientRequest::send)
    .onComplete(
        ar -> {
          if (ar.succeeded()) {
            HttpClientResponse response = ar.result();
            response.handler(parser);
            response.endHandler(e -> ctx.completeNow());
          } else {
            ctx.failNow(ar.cause());
          }
        });

What is a best way to call HTTP service and pass the response to RecordParser?


Solution

  • @tsegismont posted a comment that a answer for Vert.x httpClient/webClient process response chunk by chunk or as stream is still up to date and HttpClient should be used when HTTP streaming must be connected with RecordParser. It means second solution from the question is preferred:

    RecordParser parser = RecordParser.newDelimited("\n", h -> log.info("r={}", h.toString()));
    client
        .request(HttpMethod.GET, sut.actualPort(), "localhost", "/stream?file=stream1.txt")
        .compose(HttpClientRequest::send)
        .onComplete(
            ar -> {
              if (ar.succeeded()) {
                HttpClientResponse response = ar.result();
                response.handler(parser);
                response.endHandler(e -> ctx.completeNow());
              } else {
                ctx.failNow(ar.cause());
              }
            });
    

    Ideally PR for Vert.x documentation should clarify it.