javaspringjava-17spring-4spring2.x

Migration Spring v2 to Spring v4 for org.springframework.cloud.stream.annotation


Introduction

I'm currently using EnableBinding , StreamListener from Spring v2: https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/2.0.0.RELEASE/org/springframework/cloud/stream/annotation/package-summary.html

The code of my project that uses these annotations:


import java.time.Clock;
import java.time.Duration;
import java.time.Instant;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import com.enterprise.production.model.exceptions.ProductionStoreException;
import com.enterprise.production.model.message.EnergyProductionMessage;
import com.enterprise.production.stream.service.ProductionStoreService;

import lombok.extern.slf4j.Slf4j;

@EnableBinding(Sink.class)
@Slf4j
@Service
public class ProductionMessageConsumer {

    @Autowired
    private ProductionStoreService productionService;
    @Autowired
    private Clock clock;

    @StreamListener(target = Sink.INPUT)
    public void handleEnergyProductionMessage(@Payload EnergyProductionMessage energyProductionMessage) throws ProductionStoreException {
        Instant start = clock.instant();

        log.debug("Processing energy productions message with original interval: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
                    energyProductionMessage.getDeviceId());
        log.info("Processing {} energy productions ", energyProductionMessage.getSolarEnergies().size());

        productionService.saveProductions(energyProductionMessage);
        log.debug("Ending energy productions message with original interval: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
                 energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
                         .between(start, clock.instant()).toMillis());
        Instant startNormalization = clock.instant();
        log.debug("Processing energy productions message with normalization 30m: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
                 energyProductionMessage.getDeviceId());
        productionService.saveProductions30m(energyProductionMessage);
        log.debug("Ending energy productions message with normalization 30m: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
                 energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
                         .between(startNormalization, clock.instant()).toMillis());
    }

    @StreamListener("errorChannel")
    public void error(Message<?> message) {
        log.error("Fail to read message with error '{}'", message.getPayload());
    }

}


Problem

I need to migrate to Spring v4 but these annotations are not available in Spring v4: https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/4.0.0/org/springframework/cloud/stream/annotation/package-summary.html

Question

Someone does know how to migrate these annotation Spring v2 to Spring v4 ?


Solution

  • The annotation you are using are deprecated and mentionned in the appropriate section of 3.2.x of Spring Cloud Stream here

    Annotation-based programming model. Basically the @EnableBInding, @StreamListener and all related annotations are now deprecated in favor of the functional programming model. See Spring Cloud Function support for more details.

    When following the link above, you'll see that

    Since Spring Cloud Stream v2.1, another alternative for defining stream handlers and sources is to use build-in support for Spring Cloud Function where they can be expressed as beans of type java.util.function.[Supplier/Function/Consumer].

    In order to move to v4, you need to declare your actual configuration of @StreamListener to actual Spring beans defined in a specific class with @org.springframework.context.annotationConfiguration annotation

    @Configuration
    public class ProductionMessageConsumerConfiguration {
    
        @Autowired
        private ProductionStoreService productionService;
        @Autowired
        private Clock clock;
    
        @Bean
        public Consumer<EnergyProductionMessage> consumeEnergyProductionMessage() {
            return energyProductionMessage -> {
                Instant start = clock.instant();
    
                log.debug("Processing energy productions message with original interval: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
                        energyProductionMessage.getDeviceId());
                log.info("Processing {} energy productions ", energyProductionMessage.getSolarEnergies().size());
    
                productionService.saveProductions(energyProductionMessage);
                log.debug("Ending energy productions message with original interval: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
                     energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
                             .between(start, clock.instant()).toMillis());
                Instant startNormalization = clock.instant();
                log.debug("Processing energy productions message with normalization 30m: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
                     energyProductionMessage.getDeviceId());
                productionService.saveProductions30m(energyProductionMessage);
                log.debug("Ending energy productions message with normalization 30m: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
                     energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
                             .between(startNormalization, clock.instant()).toMillis());
            }
        }
    
    }
    

    For the error handling part, by defaut it creates a log with the message payload but you can follow this section on how to handle it manually