I am tying to setup kafka consumer using reactor kafka . Producer is integrated with kafka schema registry
@Value("${spring.kafka.schemaRegistryUrls}")
private String schemaRegistryEnvVarValue;
@Bean
public ReceiverOptions<String, MyProto> kafkaReceiverOptionsFloor(
KafkaProperties kafkaProperties) {
final Map<String, Object> kafkaConsumerProperties =
kafkaProperties.buildConsumerProperties();
for (Map.Entry<String, KafkaProperties.Consumer> entry :
kafkaConsumerPropertiesConfig.getConsumer().entrySet()) {
if (kafkaTopics.contains(entry.getKey())) {
kafkaConsumerProperties.putAll(entry.getValue().buildProperties());
}
}
kafkaConsumerProperties.put("schema.registry.url", schemaRegistryEnvVarValue);
final ReceiverOptions<String, MyProto> basicReceiverOptions =
ReceiverOptions.<String, MyProto>create(
kafkaConsumerProperties)
.withValueDeserializer(new MyProtoDeserializer())
// disabling auto commit, since we are managing committing once
// record is
// processed
.commitInterval(Duration.ZERO)
.commitBatchSize(0);
kafkaConsumerProperties.forEach((k, v) -> log.debug("k2 {} v2 {}", k, v));
return basicReceiverOptions
.subscription(kafkaTopicsFloor)
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, MyProto>
reactiveKafkaConsumerTemplate(
ReceiverOptions<String, MyProto>
kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
}
I am getting exception as Protocol message contained an invalid tag (zero). Its able to parse in my Unit tests (without schema registry)
Looks like schemaregistry is not being used . what am i doing wrong here .
Deserializer looks like below
@Slf4j
public class MyProtoDeserializer implements Deserializer<MyProto> {
public MyProtoDeserializer() {}
/**
* Deserializes the data to my_proto from byte array.
*
* @param topic
* @param data
* @return
*/
@Override
public MyProto deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
// TODO: Use schemaregistry and kpow
try {
return MyProto.getDefaultInstance()
.getParserForType()
.parseFrom(data);
} catch (Exception ex) {
log.debug("Exception in MyProto parse {}", ex.getMessage());
return MyProto.getDefaultInstance();
}
}
}
Reactor isn't the issue.
schema.registry.url
is only a property of the Confluent Deserializer class. You are not implementing configure
function in the Deserializer, therefore you are ignoring that property. Similarly, directly calling parseFrom
isn't using any HTTP client to interact with a Registry.
Import the library, rather than write your own
https://mvnrepository.com/artifact/io.confluent/kafka-protobuf-serializer/7.4.0
Also, this is how to auto configure Spring Boot with that property
spring:
kafka:
consumer:
value-deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
properties:
"[schema.registry.url]": http://...