springspring-integration

No reply in wiretap


I would like to create an asynchrone flow. My main flow "jiraVersionWebhook" will return a response quickly, and with a wiretap I will execute long process with split and another component. For the second flow (in wiretrap) I have no need a reply.

public IntegrationFlow jiraVersionWebhook() {
        return IntegrationFlow.from(
                        WebFlux.inboundGateway("/jira/version")
                                .requestMapping(r -> r.methods(HttpMethod.POST)
                                        .consumes("application/json"))
                                .requestPayloadType(String.class)
                                .replyChannel(replyChannel)
                                .errorChannel(errorChannel)
                                .mappedRequestHeaders(parameter.getJiraHeaderSignature()))
                .wireTap(versionQueueChannel())
                .get();
    }

@Bean
public QueueChannel versionQueueChannel() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow versionFlow() {
    return IntegrationFlow.from(versionQueueChannel())
            .<VersionIssuesConnector>handle((p, h) -> p,
                    e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1))
                            .handleMessageAdvice(ignoreExceptionRequestHandlerAdvice)
                            .requiresReply(false))
            .split(new VersionIssueSplitter())
            .handle(longProcess)
            .aggregate()
            .get();
}

@Bean
public MethodInterceptor ignoreExceptionRequestHandlerAdvice() {
    final var advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setTrapException(true);
    return advice;
}

I think I'm not doing things the right way. I indicated requiresReply false but I get exception because there is no subscriber, so I add and ignoreExceptionRequestHandlerAdvice. Is there another way to achieve this ?? Trap exception to ignore the reply ... it's not correct i'm sure. Thanks

UPDATE #1 is there another way to add a poller without handle a useless handler like this :

<VersionIssuesConnector>handle((p, h) -> p,
                        e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1))

Solution

  • So, you do .wireTap(versionQueueChannel()), which, essentially, means send-and-forget. Therefore the flow subscribed to this channel has to be one-way.

    Looks like that one ends for you with the .aggregate(). This one is replying handler and if there is no channel where to send result, the exception like throw new DestinationResolutionException("no output-channel or replyChannel header available"); would be thrown.

    Consider to use nullChannel() operation after an .aggregate(). Although it is not clear why would one use an aggregator in the end of one-way flow...

    Please, share a stack trace with, if I misunderstood what exactly the problem in your case.