My scenario is a Http outbound gateway where I ask an external service for the next transition represented by a TransferRequest
entity. The gateway is the endpoint of the "httpOutRequest" channel. The startpoint for "httpOutRequest" channel is a bean IntegrationFlow source()
where I send an empty String message triggered by a poller. (Btw: Is this necessary? Could I add the poller directly to the Outbound Gateway? How?)
Then I have installed the errorHandler channel endpoint to catch any problem. If the number of problems (Exceptions) is MAX_COUNT_TO_REDUCE_POLLING
- let's say because the external service is not accessible - then I would like to reduce polling from initally 5_000 to 60_000, at runtime.
Here is my code so far:
public static final int MAX_COUNT_TO_REDUCE_POLLING = 3;
private long period = 5000;
private int problemCounter = 0;
@Bean
public IntegrationFlow outbound() {
return IntegrationFlows.from("httpOutRequest")
.handle(Http.outboundGateway("http://localhost:8080/harry-potter-service/next/request")
.httpMethod(HttpMethod.GET)
.expectedResponseType(TransferRequest.class)
)
.channel("reply")
.get();
}
@Bean
public IntegrationFlow source() {
return IntegrationFlows.from(
() -> new GenericMessage<String>(""),
e -> e.poller(p -> p.fixedRate(period)))
.channel("httpOutRequest")
.get();
}
@Bean
@ServiceActivator(inputChannel = "reply")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("myHandler: " + message.getPayload());
System.out.println("myHandler: " + message.getHeaders());
TransferRequest req = (TransferRequest) message.getPayload();
System.out.println("myHandler: " + req);
}
};
}
@Bean
@ServiceActivator(inputChannel = "errorChannel")
public MessageHandler errorHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
LOG.error("message.payload: " + message.getPayload());
MessageHandlingException e = (MessageHandlingException) message.getPayload();
LOG.error("Exception: " + e);
LOG.debug("exception counter = " + (++problemCounter));
if (problemCounter >= MAX_COUNT_TO_REDUCE_POLLING) {
LOG.debug("would like to reduce poller frequence or stop");
period = 60_000;
// outbound().stop()
}
}
};
}
How can I reduce polling frequence at runtime when the threshold number of exceptions has been encountered?
How could I even stop the Integration flow?
EDIT 1
More specific: If I have a Messaging Gateway
@Bean
public IntegrationFlow source() {
return IntegrationFlows.from(
() -> new GenericMessage<String>(""),
e -> e.poller(p -> p.fixedRate(period)))
.channel("httpOutRequest")
.get();
}
How do I access the p within the second Lambda?
How can I set p.fixedRate
using a Control Channel
?
I might have solved this problem by myself, reading the manual.
See here to change pollig rate at runtime. To do that you have to use the DynamicPeriodiyTimer
from the org.springframework.integration.util
package.
To replace your delayed poller, do:
private final DynamicPeriodicTrigger dynamicPeriodicTrigger =
new DynamicPeriodicTrigger(5_000);
@Bean
public IntegrationFlow normalStateEntryPoint() {
return IntegrationFlows.from(
() -> new GenericMessage<String>(""),
e -> e.poller(p -> p.trigger(dynamicPeriodicTrigger))
.id("normalStateSourcePollingChannelAdapter")
.autoStartup(true))
.channel("httpOutRequest")
.get();
}
To reduce polling from 5.000 to 60.000 milliseconds, do:
dynamicPeriodicTrigger.setPeriod(60_000);
That's it.