I have a Spring Cloud Stream Kafka Flow where I am chaining Functions together. My configuration is below:
spring:
cloud:
function:
definition: filterEvent|logEvent|determineSomething
stream:
bindings:
filterEventlogEventdetermineSomething-in-0:
binder: kafka
destination: my-event-stream
group: ${environment.type}-${spring.application.name}
With my Function Mappings being the following
@Bean
public Function<MyEvent, MyEvent> filterEvent() {
return event -> {
log.debug("Received event {}", event);
return Optional.of(event)
.filter(hasValidName().and(isEventCurrent()))
.orElse(null);
};
}
@Bean
public Function<MyEvent, ValidatedEvent> logEvent(EventProcessor processor) {
return processor::logEvent;
}
@Bean
public Consumer<ValidatedEvent> determineSomething(EventProcessor processor) {
return processor::determineSomething;
}
I'm trying to have the filterEvent function skip the event if it doesn't meet the proper criteria. Rightfully so, i'm getting an exception because the filterEvent is passing a NULL to the logEvent function.
2024-01-10T22:27:18.280Z ERROR 1 --- [my-event-processor] [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@64ca5fd0], failedMessage=GenericMessage [payload=byte[293], headers={kafka_offset=2064021, scst_nativeHeadersPresent=true, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@777746bf, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=[B@112f5e5d, kafka_receivedTopic=my-event-stream, kafka_receivedTimestamp=1704386505547, contentType=application/json, __TypeId__=[B@3d2b623, kafka_groupId=alpha-my-event-processor}]
2024-01-10T22:27:18.280Z ERROR 1 --- [my-event-processor] [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@64ca5fd0], failedMessage=GenericMessage [payload=byte[293], headers={kafka_offset=2064021, scst_nativeHeadersPresent=true, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@777746bf, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=[B@112f5e5d, kafka_receivedTopic=my-event-stream, kafka_receivedTimestamp=1704386505547, contentType=application/json, __TypeId__=[B@3d2b623, kafka_groupId=alpha-my-event-processor}]
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
2024-01-10T17:27:18.281-05:00 at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
2024-01-10T17:27:18.281-05:00 at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
2024-01-10T17:27:18.281-05:00 at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
2024-01-10T17:27:18.281-05:00 at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
2024-01-10T17:27:18.281-05:00 at io.micrometer.observation.Observation.observe(Observation.java:499)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:391)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:460)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:77)
2024-01-10T17:27:18.281-05:00 at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
2024-01-10T17:27:18.281-05:00 at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:70)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:457)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:422)
2024-01-10T17:27:18.281-05:00 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2857)
2024-01-10T17:27:18.281-05:00 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2835)
2024-01-10T17:27:18.281-05:00 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2753)
2024-01-10T17:27:18.281-05:00 at io.micrometer.observation.Observation.observe(Observation.java:565)
2024-01-10T17:27:18.281-05:00 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2751)
2024-01-10T17:27:18.281-05:00 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2604)
2024-01-10T17:27:18.281-05:00 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2490)
2024-01-10T17:27:18.281-05:00 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2132)
2024-01-10T17:27:18.281-05:00 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1487)
2024-01-10T17:27:18.281-05:00 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1451)
2024-01-10T17:27:18.281-05:00 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1322)
2024-01-10T17:27:18.281-05:00 at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
2024-01-10T17:27:18.281-05:00 at java.base/java.lang.Thread.run(Thread.java:840)
2024-01-10T17:27:18.281-05:00 Caused by: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "input" is null
2024-01-10T17:27:18.281-05:00 at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertNonMessageInputIfNecessary(SimpleFunctionRegistry.java:1297)
2024-01-10T17:27:18.281-05:00 at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:1135)
2024-01-10T17:27:18.281-05:00 at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:728)
2024-01-10T17:27:18.281-05:00 at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$andThen$0(SimpleFunctionRegistry.java:652)
2024-01-10T17:27:18.281-05:00 at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:731)
2024-01-10T17:27:18.281-05:00 at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:580)
2024-01-10T17:27:18.281-05:00 at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:92)
2024-01-10T17:27:18.281-05:00 at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:832)
2024-01-10T17:27:18.281-05:00 at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:661)
2024-01-10T17:27:18.281-05:00 at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
I've also tried throwing an Exception if the event is inValid. This also doesn't work as it stops the listener and does not just skip the invalid event.
I would ideally like the filter to send valid events to logEvent and basically skip invalid events. I've seen examples of routing to other types of streams, but none of the examples drops the event on the floor and moves to the next. I guess I could check if the event is valid at each stage either by passing Optionals or just adding an isValid method, but that would require all my current functions and future functions to always perform the check.
Any suggestions on how to implement the following using spring cloud stream?
OK. So looks like the problem in Spring Cloud Function with composition.
There is just no check for null of the result of the previous function call in the chain.
Please, consider to raise a GH issue in the https://github.com/spring-cloud/spring-cloud-function to get some insight from project maintainers.
The workaround is probably like it is explained in the docs: https://docs.spring.io/spring-cloud-function/docs/current/reference/html/spring-cloud-function.html#_function_filtering