I am trying to use RecordParser
for large response from WebClient
.
Vert.x documentation says:
When large response are expected, use the BodyCodec.pipe. This body codec pumps the response body buffers to a WriteStream and signals the success or the failure of the operation in the async result response
But I don't see an easy way to pass that WriteStream
to RecordParser
. I have used simplified code pasted below which works but implementing such is a source of potential bugs as async protocols are easier to mess up. Does vert.x offers out of box such integration.
RecordParser parser = RecordParser.newDelimited("\n", b -> log.info("r={}", b.toString()));
RecordParserWriteStream bridge = new RecordParserWriteStream(parser);
client
.get(sut.actualPort(), "localhost", "/stream?file=stream2.txt")
.as(BodyCodec.pipe(bridge))
.send(
ar -> {
if (ar.succeeded()) {
ctx.completeNow();
} else {
ctx.failNow(ar.cause());
}
});
@Slf4j
@RequiredArgsConstructor
public class RecordParserWriteStream implements WriteStream<Buffer> {
private final RecordParser recordParser;
@Override
public WriteStream<Buffer> exceptionHandler(@Nullable Handler<Throwable> handler) {
recordParser.exceptionHandler(handler);
return this;
}
@Override
public void write(Buffer data, Handler<AsyncResult<Void>> handler) {
log.info("write {}", data.length());
recordParser.handle(data);
Promise<Void> promise = Promise.promise();
promise.complete();
handler.handle(promise.future());
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
Promise<Void> promise = Promise.promise();
promise.complete();
handler.handle(promise.future());
}
@Override
public boolean writeQueueFull() {
return false;
}
@Override
public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) {
return this;
}
@Override
public Future<Void> write(Buffer data) {
throw new UnsupportedOperationException();
}
@Override
public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
throw new UnsupportedOperationException();
}
}
I found few older SO answers where it was suggested to use HttpClient
but official documentation still recommends WebClient
with BodyCodec.pipe/WriteStream
.
With using HttpClient
it looks as
RecordParser parser = RecordParser.newDelimited("\n", h -> log.info("r={}", h.toString()));
client
.request(HttpMethod.GET, sut.actualPort(), "localhost", "/stream?file=stream1.txt")
.compose(HttpClientRequest::send)
.onComplete(
ar -> {
if (ar.succeeded()) {
HttpClientResponse response = ar.result();
response.handler(parser);
response.endHandler(e -> ctx.completeNow());
} else {
ctx.failNow(ar.cause());
}
});
What is a best way to call HTTP service and pass the response to RecordParser
?
@tsegismont posted a comment that a answer for Vert.x httpClient/webClient process response chunk by chunk or as stream is still up to date and HttpClient
should be used when HTTP streaming must be connected with RecordParser. It means second solution from the question is preferred:
RecordParser parser = RecordParser.newDelimited("\n", h -> log.info("r={}", h.toString()));
client
.request(HttpMethod.GET, sut.actualPort(), "localhost", "/stream?file=stream1.txt")
.compose(HttpClientRequest::send)
.onComplete(
ar -> {
if (ar.succeeded()) {
HttpClientResponse response = ar.result();
response.handler(parser);
response.endHandler(e -> ctx.completeNow());
} else {
ctx.failNow(ar.cause());
}
});
Ideally PR for Vert.x documentation should clarify it.