I'm trying to send message to DLQ when the message i'm processing in my Function doesn't respect some logic.
For some context, I have this configuration in my application.yaml My function looks like this :
function:
definition: filterConsumption|EnrichConsumption;
#Functions and topic binding
stream:
bindings:
filterConsumptionEnrichConsumption-in-0:
destination: input
filterConsumptionEnrichConsumption-out-0:
destination: output
kafka:
streams:
bindings:
filterConsumptionEnrichConsumption-in-0:
consumer:
enable-dlq: true
dlqName: input_dlq
application-id: input-application-id
filterConsumptionEnrichConsumption-out-1:
consumer:
enable-dlq: false
application-id: output-application-id
binder:
#Kafka consumer config
replicationFactor: ${KAFKA_STREAM_REPLICATION_FACTOR:1}
brokers: ${BOOTSTRAP_SERVERS_CONFIG:localhost:9092}
deserialization-exception-handler: sendToDlq
And my function looks like this :
@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {
return input ->
input.filter((key, consumptions) -> !getSomething(consumptions).orElse("").isBlank())
.merge(
//filter consumptions having a tradingName
input.filter((key, consumptions) -> getSomething(consumptions).orElse("").isBlank())
//enrich consumptions with missing tradingName
.mapValues(this::setSomething)
);
}
During the "setSomething", some exceptions could occurr due to logic rules.
I tried two things : First using StreamBridge, but I keep getting the following error :
Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
I tried to configure the streamBridge as such :
private final StreamBridge streamBridge;
@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {
return input ->
input.filter((key, consumptions) ->
!getSomething(consumptions).orElse("").isBlank())
.merge(
//filter consumptions having a tradingName
input.filter((key, consumptions) ->
getSomething(consumptions).orElse("").isBlank())
//enrich consumptions with missing tradingName
.mapValues((ConsumptionSchema value) -> {
try {
return setSomething(value);
} catch (DlqException e) {
Message<ConsumptionSchema> mess = MessageBuilder.withPayload(value).build();
streamBridge.send("filterConsumptionEnrichConsumption-in-1", mess);
return null;
}
})
);
}
I also tried using SendToDlqAndContinue, using a processor like this :
@Autowired
private final SendToDlqAndContinue dlqHandler;
@Bean("EnrichConsumption")
public Function<KStream<String, ConsumptionSchema>, KStream<String, ConsumptionSchema>> EnrichConsumption() {
return input ->
input.process(() -> new Processor<String, ConsumptionSchema, String, ConsumptionSchema>() {
ProcessorContextImpl context;
@Override
public void init(ProcessorContext context) {
this.context = (ProcessorContextImpl) context;
}
@Override
public void process(Record<String, ConsumptionSchema> processInput) {
input.filter((key, consumptions) ->
!getSomething(consumptions).orElse("").isBlank())
.merge(
//filter consumptions having a tradingName
input.filter((key, consumptions) ->
getSomething(consumptions).orElse("").isBlank())
//enrich consumptions with missing tradingName
.mapValues((ConsumptionSchema value) -> {
try {
return setSomething(value);
} catch (DlqException e) {
log.error("Exception during handling of consumption message : {}, message : {}",
processInput.key(), e.getMessage());
dlqHandler.sendToDlq(
new ConsumerRecord<>(
context.topic(),
context.partition(),
context.offset(),
processInput.key(),
processInput.value()), e);
return null;
}
})
);
}
In this case, and I don't understand why, the process method doesn't seems to be called.
Anyone could help me make it works using either SendToDlqAndContinue (preferred solution) or StreamBridge ?
EDIT :
Using the same application.yaml as in the first part, I tried the DltAwareProcessor :
@Configuration
@Data
@Slf4j
public class BillableConsumptionFilterStreaming {
@Bean("filterConsumption")
public Function<KStream<String, ConsumptionSchema>,
KStream<String, ConsumptionSchema>> filterConsumption(DltSenderContext dltSenderContext) {
return input ->
input.process(() ->
new DltAwareProcessor<>(
(BiFunction<String, ConsumptionSchema, KeyValue<String, ConsumptionSchema>>) (s, c) -> {
throw new RuntimeException("Exception that won't kill stream");
}, "input_dlq", dltSenderContext));
}
Using break point, the DltAwareProcessor is correctly called, until this line : streamBridge.send(this.dltDestination, r.value());
No exception is thrown or whatsoever but I get the following logs :
Using kafka topic for outbound: input_dlq
Node -1 disconnected.
Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
In my application, BOOTSTRAP_SERVERS_CONFIG is overridden with the adress of our kafka, and when there is not exception, the messages are correctly routed to output topic. So maybe I'm missing some configuration in my application.yaml to configure the broker for StreamBridge.
The SendToDlqAndContinue
is built explicitly for deserialization exception handling purposes. You cannot use this for runtime error handling as the way you are trying. We recently (in the 4.1.0-SNAPSHOT
line) a new feature, that may help with this use case. Please see the following issues for more details on that.
https://github.com/spring-cloud/spring-cloud-stream/issues/2779
https://github.com/spring-cloud/spring-cloud-stream/issues/2802
Here are the docs for this feature: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-streams-binder/error-handling.html#runtime-error-handling
See if that works for your use case. If you find any room for improvements on this feature, please comment on the issue (2802) above, and we can still make it, as we will do the 4.1.0-RC1
release later this month.
When using DltAwareProcessor
, you must include the regular Kafka binder in addition to the Kafka Streams binder. By regular, I meant the message channel-based binder (spring-cloud-stream-binder-kafka
) as StreamBridge
requires that. You also need to set the proper configuration against this binder - for e.g, if you are using a non-default server/port, you need to set it on spring.cloud.stream.kafka.binder.brokers
.