Consider the following IntegrationFlow:
@Bean
public IntegrationFlow mongoFlow(MongoTemplate mongoTemplate) {
return IntegrationFlow
.from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
.update(Update.update("status", "DONE"))
.collectionName("test").entityClass(Document.class),
p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
.split()
.handle((GenericHandler<Document>) (p, h) -> {
if(p.get("message").toString().contains("james"))
throw new IllegalArgumentException("no talking to james allowed");
// process message
return p;
}).nullChannel();
}
This gets a lock on the record and sets it to DONE. If the flow succeeds the transactions is committed otherwise it rolls back. In this case a message to my co-worker James will never succeed, so the message fails over and over again. My first attempt was to use an error channel to mark it FAILED after three failed attempts.
@Bean
public IntegrationFlow mongoFlow(MongoTemplate mongoTemplate) {
return IntegrationFlow
.from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
.update(Update.update("status", "DONE"))
.collectionName("test").entityClass(Document.class),
p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
.enrichHeaders(h -> h.errorChannel("testError"))
.split()
.handle((GenericHandler<Document>) (p, h) -> {
if(p.get("message").toString().contains("james"))
throw new IllegalArgumentException("no talking to james allowed");
// process message
return p;
}).nullChannel();
}
@Bean
public IntegrationFlow errorFlow(MongoTemplate mongoTemplate) {
return IntegrationFlow.from("testError")
.transform(MessageHandlingException::getFailedMessage)
.handle((GenericHandler<Document>)(p, h) ->
p.append("fails", p.getInteger("fails", 0) + 1)
.append("status", p.getInteger("fails") > 2 ? "FAILED" : "READY"))
.handle(MongoDb.outboundGateway(mongoTemplate).collectionName("test")
.entityClass(Document.class)
.collectionCallback((c, m) ->
c.findOneAndUpdate(Filters.eq("uuid",
((Document) m.getPayload()).get("uuid")),
Updates.combine(
Updates.set("status", ((Document) m.getPayload()).get("status")),
Updates.set("fails", ((Document) m.getPayload()).get("fails" ))
)
)
)
)
.nullChannel();
}
This works but the transaction is terminated when the message leaves the mongoFlow
and is sent to the testError
channel. I want to update the record to either increment the failure count with a status of READY or set the status to FAILED and then commit.
My next attempt was to use ExpressionEvaluatingRequestHandlerAdvice
:
@Bean
public ExpressionEvaluatingRequestHandlerAdvice handleErrorAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setReturnFailureExpressionResult(true);
advice.setOnSuccessExpression(new FunctionExpression<GenericMessage<Document>> (
gm -> gm.getPayload().append("status", "DONE")
));
advice.setOnFailureExpression(new FunctionExpression<GenericMessage<Document>> (
gm -> gm.getPayload().append("fails", p.getInteger("fails", 0) + 1)
.append("status", p.getInteger("fails") > 2 ? "FAILED" : "READY")
));
return advice;
}
@Bean
public IntegrationFlow mongoFlow(MongoTemplate mongoTemplate) {
return IntegrationFlow
.from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
.update(Update.update("status", "PROCESSING"))
.collectionName("test").entityClass(Document.class),
p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
.split()
.handle((GenericHandler<Document>) (p, h) -> {
if(p.get("message").toString().contains("james"))
throw new IllegalArgumentException("no talking to james allowed");
// process message
return p;
}, e -> e.advice(handleErrorAdvice()))
.handle(MongoDb.outboundGateway(mongoTemplate).collectionName("test")
.entityClass(Document.class)
.collectionCallback((c, m) ->
c.findOneAndUpdate(Filters.eq("uuid",
((Document) m.getPayload()).get("uuid")),
Updates.combine(
Updates.set("status", ((Document) m.getPayload()).get("status")),
Updates.set("fails", ((Document) m.getPayload()).get("fails" ))
)
)
)
)
.nullChannel();
}
This updates the record within the same transaction and then saves it like I want. But I'm not sure if this is the right way of doing things. One odd thing about it that I don't really like is that I have to set the status to some intermediate value just for the sake of locking the record. Then I need the advice to set the status accordingly before the flow uses the outbound gateway to save it. I thought the error channel approach seemed more elegant but I'm not sure if it's possible to have the transaction boundary span both flows as long as the errorChannel is a DirectChannel.
Am I on the right track here or am I completely lost?
I should note that when testing the transactions, I had put some Thread.sleep()
lines in there so I had time to try and steal the locked record. I removed it from the post just to make it cleaner.
EDIT: Latest iteration using RequestHandlerRetryAdvice
combined with ExpressionEvaluatingRequestHandlerAdvice
:
@Bean
public IntegrationFlow mongoFlow(MongoTemplate mongoTemplate) {
return IntegrationFlow
.from(MongoDb.inboundChannelAdapter(mongoTemplate, "{'status' : 'READY'}")
.collectionName("test").entityClass(Document.class)
.update(Update.update("status", "PROCESSING")),
p -> p.poller(pm -> pm.fixedDelay(1000L).transactional()))
.split()
.handle((GenericHandler<Document>) (p, h) -> {
if(p.get("message").toString().contains("james"))
throw new IllegalArgumentException("no talking to james allowed");
return p;
}, e -> e.advice(statusAdvice(), retryAdvice()))
.log( m -> "\n------AFTER ADVICE------\n" + m.getPayload())
.handle(MongoDb.outboundGateway(mongoTemplate).collectionName("test").entityClass(Document.class)
.collectionCallback((c, m) ->
c.findOneAndUpdate(Filters.eq("uuid", ((Document) m.getPayload()).get("uuid")),
Updates.set("status", ((Document) m.getPayload()).get("status"))
)
)
)
.nullChannel();
}
@Bean
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
advice.setRetryTemplate(RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(1000L, 2, 10000)
.build());
return advice;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice statusAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setReturnFailureExpressionResult(true);
advice.setOnSuccessExpression(new FunctionExpression<GenericMessage<Document>>(gm -> {
System.out.println("============================== SUCCESS ADVICE ============================");
return gm.getPayload().append("status", "DONE");
}));
advice.setOnFailureExpression(new FunctionExpression<GenericMessage<Document>>(gm -> {
System.out.println("============================== FAILURE ADVICE ============================");
return gm.getPayload().append("status", "FAILED");
}));
return advice;
}
That's correct. The error handling in the Polling Endpoint is done after transaction is rolled back. The transaction interceptor is applied to the Callable<Message<?>> pollingTask
as an AOP advice. This one is called from a pollForMessage()
like this:
private Runnable createPoller() {
return () ->
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (this.maxMessagesPerPoll == 0) {
logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
break;
}
if (pollForMessage() == null) {
break;
}
count++;
}
});
}
The error handling is done as a decoration on tasks scheduled to that this.taskExecutor
.
Therefore to keep transaction and handle such a business error is not possible via errorChannel
configuration.
You may consider to use a RequestHandlerRetryAdvice
on your handle()
to retry those business errors. This way the original transaction will be held and no new messages are going to be pulled from MongoDB.
You indeed might also need to combine it with the ExpressionEvaluatingRequestHandlerAdvice
as a first interceptor to handle errors already after all retries.
See more info in docs: https://docs.spring.io/spring-integration/reference/handler-advice/classes.html