Update!
I have fixed minor bugs in the sample code after solving some of the problems that were irrelevant to the main question which is still about non-blocking streaming between services.
Background info:
I'm porting a Spring WebFlux service under Quarkus. The service runs long searches on multiple huge data sets and returns the partial results in a Flux (text/event-stream) as they become available.
Problem:
Right now I'm trying to use Mutiny Multi with Vert.x under Quarkus but cannot figure out how the consumer service could receive this stream without blocking.
In all examples the consumer is either a JS front end page or the producer's content type is application/json that seems to block until the Multi completes before sending it over in one JSON object (which makes no sense in my application).
Questions:
Here is a simplified example
Test entity
public class SearchResult implements Serializable {
private String content;
public SearchResult(String content) {
this.content = content;
}
//.. toString, getters and setters
}
Producer 1. simple infinite stream -> hangs
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2) .onItem().transform(n -> new SearchResult(n.toString()));
}
Producer 2. with Vertx Paths infinite stream -> hangs
@Route(path = "/routed", methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
log.info("routed run");
return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.onItem().transform(n -> new SearchResult(n.toString()));
}
Producer 3. simple finite stream -> blocks until completion
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.transform().byTakingFirstItems(5)
.onItem().transform(n -> new SearchResult(n.toString()));
}
Consumer:
Tried multiple different solutions on both producer and consumer sides, but in every case the the stream blocks until it is complete or hangs indefinitely without transferring data for infinite streams. I get the same results with httpie. Here is the latest iteration:
WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx, webClientOptions);
client.get("/string")
.send()
.onFailure().invoke(resp -> log.error("error: " + resp))
.onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
.toMulti()
.subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(), r.bodyAsString())));
The Vert.x Web Client does not work with SSE (Without configuration). From https://vertx.io/docs/vertx-web-client/java/:
Responses are fully buffered, use BodyCodec.pipe to pipe the response to a write stream
It waits until the response completes. You can either use the raw Vert.x HTTP Client or use the pipe
codec. Examples are given on https://vertx.io/docs/vertx-web-client/java/#_decoding_responses.
Alternatively, you can use an SSE client such as in: https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java#L27-L34