javaspringspring-integrationspring-integration-http

How to change poller frequency at runtime within a Http outbound gateway?


My scenario is a Http outbound gateway where I ask an external service for the next transition represented by a TransferRequest entity. The gateway is the endpoint of the "httpOutRequest" channel. The startpoint for "httpOutRequest" channel is a bean IntegrationFlow source() where I send an empty String message triggered by a poller. (Btw: Is this necessary? Could I add the poller directly to the Outbound Gateway? How?)

Then I have installed the errorHandler channel endpoint to catch any problem. If the number of problems (Exceptions) is MAX_COUNT_TO_REDUCE_POLLING - let's say because the external service is not accessible - then I would like to reduce polling from initally 5_000 to 60_000, at runtime.

Here is my code so far:

    public static final int MAX_COUNT_TO_REDUCE_POLLING = 3;

    private long period = 5000;
    private int problemCounter = 0;

    @Bean
    public IntegrationFlow outbound() {
        return IntegrationFlows.from("httpOutRequest")
                .handle(Http.outboundGateway("http://localhost:8080/harry-potter-service/next/request")
                        .httpMethod(HttpMethod.GET)
                        .expectedResponseType(TransferRequest.class)
                        )
                .channel("reply")
                .get();
    }

    @Bean
    public IntegrationFlow source() {
        return IntegrationFlows.from(
                () -> new GenericMessage<String>(""),
                        e -> e.poller(p -> p.fixedRate(period)))
                .channel("httpOutRequest")
                .get();
    }

    @Bean
    @ServiceActivator(inputChannel = "reply")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("myHandler: " + message.getPayload());
                System.out.println("myHandler: " + message.getHeaders());
                TransferRequest req = (TransferRequest) message.getPayload();
                System.out.println("myHandler: " + req);
            }
        };
    }

    @Bean
    @ServiceActivator(inputChannel = "errorChannel")
    public MessageHandler errorHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                LOG.error("message.payload: " + message.getPayload());
                MessageHandlingException e = (MessageHandlingException) message.getPayload();
                LOG.error("Exception: " + e);
                LOG.debug("exception counter = " + (++problemCounter));

                if (problemCounter >= MAX_COUNT_TO_REDUCE_POLLING) {
                    LOG.debug("would like to reduce poller frequence or stop");
                    period = 60_000;
                //  outbound().stop()
                }
            }
        };
    }

How can I reduce polling frequence at runtime when the threshold number of exceptions has been encountered?

How could I even stop the Integration flow?

EDIT 1

More specific: If I have a Messaging Gateway

@Bean
public IntegrationFlow source() {
    return IntegrationFlows.from(
            () -> new GenericMessage<String>(""),
                    e -> e.poller(p -> p.fixedRate(period)))
            .channel("httpOutRequest")
            .get();
}

How do I access the p within the second Lambda?

How can I set p.fixedRate using a Control Channel?


Solution

  • I might have solved this problem by myself, reading the manual.

    See here to change pollig rate at runtime. To do that you have to use the DynamicPeriodiyTimer from the org.springframework.integration.util package.

    To replace your delayed poller, do:

        private final DynamicPeriodicTrigger dynamicPeriodicTrigger =
                new DynamicPeriodicTrigger(5_000);
    
        @Bean
        public IntegrationFlow normalStateEntryPoint() {
            return IntegrationFlows.from(
                    () -> new GenericMessage<String>(""),
                            e -> e.poller(p -> p.trigger(dynamicPeriodicTrigger))
                            .id("normalStateSourcePollingChannelAdapter")
                            .autoStartup(true))
                    .channel("httpOutRequest")
                    .get();
        }
    

    To reduce polling from 5.000 to 60.000 milliseconds, do:

        dynamicPeriodicTrigger.setPeriod(60_000);
    

    That's it.