I'm currently using EnableBinding
, StreamListener
from Spring v2: https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/2.0.0.RELEASE/org/springframework/cloud/stream/annotation/package-summary.html
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import com.enterprise.production.model.exceptions.ProductionStoreException;
import com.enterprise.production.model.message.EnergyProductionMessage;
import com.enterprise.production.stream.service.ProductionStoreService;
import lombok.extern.slf4j.Slf4j;
@EnableBinding(Sink.class)
@Slf4j
@Service
public class ProductionMessageConsumer {
@Autowired
private ProductionStoreService productionService;
@Autowired
private Clock clock;
@StreamListener(target = Sink.INPUT)
public void handleEnergyProductionMessage(@Payload EnergyProductionMessage energyProductionMessage) throws ProductionStoreException {
Instant start = clock.instant();
log.debug("Processing energy productions message with original interval: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
energyProductionMessage.getDeviceId());
log.info("Processing {} energy productions ", energyProductionMessage.getSolarEnergies().size());
productionService.saveProductions(energyProductionMessage);
log.debug("Ending energy productions message with original interval: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
.between(start, clock.instant()).toMillis());
Instant startNormalization = clock.instant();
log.debug("Processing energy productions message with normalization 30m: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
energyProductionMessage.getDeviceId());
productionService.saveProductions30m(energyProductionMessage);
log.debug("Ending energy productions message with normalization 30m: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
.between(startNormalization, clock.instant()).toMillis());
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
log.error("Fail to read message with error '{}'", message.getPayload());
}
}
I need to migrate to Spring v4 but these annotations are not available in Spring v4: https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/4.0.0/org/springframework/cloud/stream/annotation/package-summary.html
Someone does know how to migrate these annotation Spring v2 to Spring v4 ?
The annotation you are using are deprecated and mentionned in the appropriate section of 3.2.x of Spring Cloud Stream here
Annotation-based programming model. Basically the @EnableBInding, @StreamListener and all related annotations are now deprecated in favor of the functional programming model. See Spring Cloud Function support for more details.
When following the link above, you'll see that
Since Spring Cloud Stream v2.1, another alternative for defining stream handlers and sources is to use build-in support for Spring Cloud Function where they can be expressed as beans of type
java.util.function.[Supplier/Function/Consumer]
.
In order to move to v4, you need to declare your actual configuration of @StreamListener
to actual Spring beans defined in a specific class with @org.springframework.context.annotationConfiguration
annotation
@Configuration
public class ProductionMessageConsumerConfiguration {
@Autowired
private ProductionStoreService productionService;
@Autowired
private Clock clock;
@Bean
public Consumer<EnergyProductionMessage> consumeEnergyProductionMessage() {
return energyProductionMessage -> {
Instant start = clock.instant();
log.debug("Processing energy productions message with original interval: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
energyProductionMessage.getDeviceId());
log.info("Processing {} energy productions ", energyProductionMessage.getSolarEnergies().size());
productionService.saveProductions(energyProductionMessage);
log.debug("Ending energy productions message with original interval: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
.between(start, clock.instant()).toMillis());
Instant startNormalization = clock.instant();
log.debug("Processing energy productions message with normalization 30m: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
energyProductionMessage.getDeviceId());
productionService.saveProductions30m(energyProductionMessage);
log.debug("Ending energy productions message with normalization 30m: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
.between(startNormalization, clock.instant()).toMillis());
}
}
}
For the error handling part, by defaut it creates a log with the message payload but you can follow this section on how to handle it manually