I have some issues after the old imperative programming type was deprecated.
I have two microservices (one as publisher and the other as subscriber) and in the old way, with the annotation @StreamListener(target = "events", condition = "headers['type']=='consumerPermissionEvent'")
i was able to have two functions listening only that records and now i don't know how to do it.
I was reading all the documentation event routing and trying with the routing-expression but the two consumers are reading all the records.
The application yaml of the first microservies:
spring:
cloud:
stream:
bindings:
output:
destination: topicEvents
The seconds application yaml is:
spring:
cloud:
function:
routing-expression: headers['type']
definition: consumerPermissionEvent;consumerApiEvent
stream:
bindings:
consumerPermissionEvent-in-0:
destination: topicUsers
consumerApiEvent-in-0:
destination: topicUsers
I'm sending from the first microservice like that:
@Autowired
private StreamBridge bridge;
public void send(PermissionEvent event){
Message<PermissionEvent> message = MessageBuilder.withPayload(event)
.setHeader("type","consumerPermissionEvent").build();
bridge.send("output", message);
}
And the second microservice has two consumers:
@Bean
public Consumer<Message<ApiEvent>> consumerApiEvent() {
return e -> log.debug("READED API EVENT: {}", e.getPayload());
}
@Bean
public Consumer<Message<PermissionEvent>> consumerPermissionEvent() {
return e -> log.debug("READED PERMISSION EVENT: {}", e.getPayload());
}
And the output logs from the second microservice:
[KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED API EVENT: ApiEvent(apiId=null)
[KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED PERMISSION EVENT: PermissionEvent(userRole=roleUseradsf)
Any ideas how to do it?
Thanks in advance
you will need to enable routing first by using following property:
--spring.cloud.stream.function.routing.enabled=true
for more details, refer to https://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html#spring_cloud_function