quarkusamqpsmallrye-reactive-messaging

How to publish messages to a topic in Solace broker using AMQP


We have a Quarkus app that uses the smallrye-amqp connector. I have the following code for publishing messages to a topic.

@ApplicationScoped
public class InsightLifecycleObserver {

    public static final String INSIGHTS_DELETED_TOPIC = "insightscentral/insight/deleted";

    @Inject
    @Channel(INSIGHTS_DELETED_TOPIC)
    @OnOverflow(value = Strategy.NONE)
    Emitter<byte[]> insightsEmitter;
    void onInsightRemoved(@Observes(during = AFTER_SUCCESS) InsightDeletedEvent event) {


        final var tenantId = tenantResolver.getTenantId();
        try {
          var insightLifeCycleEvent = convertToLifeCycleEvent(event);
          var eventEnvelope =
              EventEnvelope.builder()
                  .withId(UUID.randomUUID())
                  .withData(
                      PojoCloudEventData.wrap(insightLifeCycleEvent, objectMapper::writeValueAsBytes))
                  .build();

          // send the event to the Insights topic
          insightsEmitter
              .send(eventEnvelope.serialize())
              .whenComplete(
                  (result, throwable) -> {

                    if (throwable != null) {
                      LOGGER.error("Failed to emit InsightDeleted event", throwable);
                      metricsService.incrementErrorMessageCounter(
                          MessageType.INSIGHT_DELETED_EVENT, tenantId);
                    } else {
                      LOGGER.debug(
                          "Published InsightDeleted event for insight with id: {} and sri: {}",
                          event.insight().id(),
                          event.insight().sri());
                      metricsService.incrementMessageCounter(
                          MessageType.INSIGHT_DELETED_EVENT, tenantId);
                    }
                  });
        } catch (Exception e) {
          LOGGER.error("Failed to publish InsightDeleted event", e);
          metricsService.incrementErrorMessageCounter(MessageType.INSIGHT_DELETED_EVENT, tenantId);
        }
    }

when calling this hook I get the following exception:

io.vertx.core.impl.NoStackTraceThrowable: message rejected (REJECTED): Error{condition=amqp:not-allowed, description='SMF AD ack response error', info={solace.response_code=400, solace.response_text=Queue Not Found}}

What am I doing wrong?


Solution

  • A client specifies that messages are to be published to a topic by formatting the destination as follows:

    topic://topic-string
    

    docs: Solace AMQP ref

    thanks to Ozan Günalp for pointing this out here