I would like to create an asynchrone flow. My main flow "jiraVersionWebhook" will return a response quickly, and with a wiretap I will execute long process with split and another component. For the second flow (in wiretrap) I have no need a reply.
public IntegrationFlow jiraVersionWebhook() {
return IntegrationFlow.from(
WebFlux.inboundGateway("/jira/version")
.requestMapping(r -> r.methods(HttpMethod.POST)
.consumes("application/json"))
.requestPayloadType(String.class)
.replyChannel(replyChannel)
.errorChannel(errorChannel)
.mappedRequestHeaders(parameter.getJiraHeaderSignature()))
.wireTap(versionQueueChannel())
.get();
}
@Bean
public QueueChannel versionQueueChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow versionFlow() {
return IntegrationFlow.from(versionQueueChannel())
.<VersionIssuesConnector>handle((p, h) -> p,
e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1))
.handleMessageAdvice(ignoreExceptionRequestHandlerAdvice)
.requiresReply(false))
.split(new VersionIssueSplitter())
.handle(longProcess)
.aggregate()
.get();
}
@Bean
public MethodInterceptor ignoreExceptionRequestHandlerAdvice() {
final var advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setTrapException(true);
return advice;
}
I think I'm not doing things the right way. I indicated requiresReply false but I get exception because there is no subscriber, so I add and ignoreExceptionRequestHandlerAdvice. Is there another way to achieve this ?? Trap exception to ignore the reply ... it's not correct i'm sure. Thanks
UPDATE #1 is there another way to add a poller without handle a useless handler like this :
<VersionIssuesConnector>handle((p, h) -> p,
e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(1))
So, you do .wireTap(versionQueueChannel())
, which, essentially, means send-and-forget. Therefore the flow subscribed to this channel has to be one-way.
Looks like that one ends for you with the .aggregate()
. This one is replying handler and if there is no channel where to send result, the exception like throw new DestinationResolutionException("no output-channel or replyChannel header available");
would be thrown.
Consider to use nullChannel()
operation after an .aggregate()
.
Although it is not clear why would one use an aggregator in the end of one-way flow...
Please, share a stack trace with, if I misunderstood what exactly the problem in your case.