quarkusvert.xmutinyvertx-eventbus

Uni wait for Vertx eventBus message


I have two endpoints:

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/waitForEvent")
    public Uni<Object> waitForEvent() {
        return Uni.createFrom().emitter(em -> {
            //wait for event from eventBus
    //            eventBus.consumer("test", msg -> {
    //                System.out.printf("receive event: %s\n", msg.body());
    //                em.complete(msg);
    //            });
        }).ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
    }
    
    @GET
    @Path("/send")
    public void test() {
        System.out.println("send event");
        eventBus.send("test", "send test event");
    }

The waitForEvent() should only complete if it receives the event from the eventBus. How can I achieve this using vertx and mutiny?


Solution

  • In general, we avoid that kind of pattern and use the request/reply mechanism from the event bus:

    @GET
    @Path("/send")
    public Uni<String> test() {
       return bus.<String>request("test", name)        
            .onItem().transform(Message::body)
            .ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
    }
    

    When implementing with two endpoints (as in the question), it can become a bit more complicated as if you have multiple calls to the /waitForEvent endpoint, you need to be sure that every "consumer" get the message.

    It is still possible, but would will need something like this:

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/waitForEvent")
    public Uni<String> waitForEvent() {
      return Uni.createFrom().emitter(emitter -> {
        MessageConsumer<String> consumer = bus.consumer("test");
          consumer.handler(m -> {
          emitter.complete(m.body());
          consumer.unregisterAndForget();
       })
            .ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
    
    }
        
    @GET
    @Path("/send")
    public void test() {
      bus.publish("test", "send test event");
    }
    

    Be sure to use the io.vertx.mutiny.core.eventbus.EventBus variant of the event bus.