I have two endpoints:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/waitForEvent")
public Uni<Object> waitForEvent() {
return Uni.createFrom().emitter(em -> {
//wait for event from eventBus
// eventBus.consumer("test", msg -> {
// System.out.printf("receive event: %s\n", msg.body());
// em.complete(msg);
// });
}).ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}
@GET
@Path("/send")
public void test() {
System.out.println("send event");
eventBus.send("test", "send test event");
}
The waitForEvent() should only complete if it receives the event from the eventBus. How can I achieve this using vertx and mutiny?
In general, we avoid that kind of pattern and use the request/reply mechanism from the event bus:
@GET
@Path("/send")
public Uni<String> test() {
return bus.<String>request("test", name)
.onItem().transform(Message::body)
.ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}
When implementing with two endpoints (as in the question), it can become a bit more complicated as if you have multiple calls to the /waitForEvent
endpoint, you need to be sure that every "consumer" get the message.
It is still possible, but would will need something like this:
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/waitForEvent")
public Uni<String> waitForEvent() {
return Uni.createFrom().emitter(emitter -> {
MessageConsumer<String> consumer = bus.consumer("test");
consumer.handler(m -> {
emitter.complete(m.body());
consumer.unregisterAndForget();
})
.ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}
@GET
@Path("/send")
public void test() {
bus.publish("test", "send test event");
}
Be sure to use the io.vertx.mutiny.core.eventbus.EventBus
variant of the event bus.