When using spring-webflux
with reactor-kafka
receiver how can I manually move/commit offset when RecordDeserializationException
occurs? From RecordDeserializationException
I can get partition and offset, but I cannot manualy create a ReceiverOffset
object which would allow me to commit (as it has private implementation).
reactiveKafkaReceiver
.receiveBatch()
.onErrorResume(e -> {
RecordDeserializationException rde = (RecordDeserializationException) e;
TopicPartition topicPartition = rde.topicPartition();
long offset = rde.offset();
// how can I commit this offset?
return Flux.empty();
})
.delayUntil(flux -> flux
.collectList()
.delayUntil(this::process)
.doOnNext(records -> records.forEach(record -> record.receiverOffset()
.commit()
.subscribeOn(Schedulers.boundedElastic())
.subscribe())))
.retryWhen((Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true)))
.repeat()
.subscribe();
Is there any solution here?
Following Gary Russell's answer and its link to https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer here is a solution which transforms deserialization errors into null-value (it also automatically adds error info into kafka headers). In such case we should just add null check condition when we process the records in the main receiver stream.
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
private final KafkaConsumerProperty KafkaConsumerProperty;
@Bean
public KafkaReceiver<String, KafkaMessageDto> reactiveKafkaReceiver() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperty.getBootstrapServers());
...
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedDeserializationFunction.class);
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, KafkaMessageDto.class);
ReceiverOptions<String, KafkaMessageDto> receiverOptions = ReceiverOptions.<String, KafkaMessageDto>create(config)
.subscription(Collections.singletonList(kafkaConsumerProperty.getTopicName()));
return KafkaReceiver.create(receiverOptions);
}
}
@Slf4j
public class FailedDeserializationFunction implements Function<FailedDeserializationInfo, Object> {
@Override
public Object apply(FailedDeserializationInfo info) {
log.warn("Fail to deserialize kafka message from topic=" + info.getTopic() + ". Set it to null. ",
info.getException());
return null;
}
}