Right now I'm trying to make my two microservices communicate with each other. The first one is sending message like this:
@PostMapping("/testPost")
public ResponseEntity<?> testPost(@RequestBody UserCreatedEvent userCreatedEvent) {
try {
ProducerRecord<String, Object> producerRecord =
new ProducerRecord<>("user", "user-created", userCreatedEvent);
producerRecord.headers().add("spring.kafka.serialization.selector", "userCreated".getBytes());
SendResult<String, Object> result =
kafkaTemplate.send(producerRecord).get();
return ResponseEntity.ok(string);
} catch (Exception e) {
return ResponseEntity.internalServerError().body("Broker error!");
}
}
Here is producer config:
@Configuration
public class ProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String serverAddress;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
serverAddress);
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG,
"all");
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
true);
configProps.put(
org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG,
5);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
So when my consumer microservice recieves the message, I'm getting a lot of exceptions, caused by:
Caused by: java.lang.IllegalArgumentException: The class 'org.example.testsender.dto.UserCreatedEvent' is not in the trusted packages: [java.util, java.lang, org.example.userservice.event, org.example.userservice.event.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
Also, for some reason, my consumer can't see ErrorHandlingDeserializer
:
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
Here is my consumer config:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConsumerFactory<String, Object> consumerFactory(DelegatingDeserializer delegatingDeserializer) {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
JsonDeserializer.TRUSTED_PACKAGES,
"*"
);
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(
ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS,
StringDeserializer.class
);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
DelegatingDeserializer.class
);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), delegatingDeserializer);
}
@Bean
public DelegatingDeserializer delegatingDeserializer() {
Map<String, Deserializer<?>> delegates = new HashMap<>();
delegates.put("userCreated", new JsonDeserializer<>(UserCreatedEvent.class));
return new DelegatingDeserializer(delegates);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory,
DefaultErrorHandler errorHandler) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
DefaultErrorHandler errorHandler =
new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(3000, 3));
errorHandler.addRetryableExceptions(ServiceUnavailable.class);
errorHandler.addNotRetryableExceptions(
ServerSideException.class,
ClientSideException.class,
DeserializationException.class,
JsonProcessingException.class,
MismatchedInputException.class);
return errorHandler;
}
}
So, when I'm setting consumer properties like this:
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ErrorHandlingDeserializer.class);
props.put(
ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS,
JsonDeserializer.class
);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, UserCreatedEvent.class);
My message is being sent to the dead letter topic. Why? Why it wasnt sent to dlt with previous configuration? And the main question is: how to make generic consumer factory? For now I have only one event type, but later I will have more, should I make consumer factory for each? I dont' think it's good idea. According to the docs, as I understood, there was a possibility to use
DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
But now there is no such thing anymore, so I replaced it the @Bean of DelegatingDeserializer (you can see it in my consumer configuration), maybe I did it wrong? Which is the right way to make generic consumerfactory which supports multiple event types for one topic? By the way, here is my listnere:
@Component
@Slf4j
public class UserCreatedKafkaEventListener {
@KafkaHandler(isDefault = true)
public void defaultHandler(Object o) {
log.info("Received unknown event: {}", o);
}
@KafkaListener(topics = "user", groupId = "users")
public void userCreatedEventHandler(ConsumerRecord<String, UserCreatedEvent> consumerRecord) {
String key = consumerRecord.key();
UserCreatedEvent event = consumerRecord.value();
log.info("Handling event with key: {}", key);
}
}
UPDATED I created a github repository with test project to reproduce the problem https://github.com/Denstran/TestKafkaProject
UPDATED 2
I fixed the issue with deserialization by adding this containerFactory = "containerFactory"
to @KafkaListener
. But now I'm dealing with another problem. I have different classes annotated with @KafkaListener
and those classes have @KafkaHandler
methods for specific event object types like this
@KafkaListener(topics = "users", groupId = "user")
@Component
@Slf4j
public class UserCreatedKafkaEventListener {
@KafkaHandler
public void handleEvent(UserCreatedEvent userCreatedEvent) {
log.info("Handling user created event: {}, {}",
userCreatedEvent.getClass().getCanonicalName(), userCreatedEvent);
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
log.info("Handling default object: {}", object);
log.info("Handling object with class name: {}, object: {}",
object.getClass().getCanonicalName(), object);
}
}
So the problem is, that now all events I'm sending via producer are getting to @KafkaHandler(isDefault = true)
of UserCreatedKafkaEventListener
class, but not to other classes with handlers for their types. Here is updated consumer config and producer config of my new project, link for which I already provided:
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "userCreated:org.example.userservice.dto.UserCreatedEvent, userUpdated:org.example.userservice.dto.UserUpdatedEvent");
consumerProps.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, true);
consumerProps.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);
return new DefaultKafkaConsumerFactory<>(consumerProps);
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
producerConfig.put(JsonSerializer.TYPE_MAPPINGS,
"userCreated:org.example.producerservice.dto.UserCreatedEvent, userUpdated:org.example.producerservice.dto.UserUpdatedEvent");
producerConfig.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 5);
return new DefaultKafkaProducerFactory<>(producerConfig);
}
So, following the words of Atem Bilan, I checked if my consumer factory is out of use in Spring Boot or not and found that it definitely is. How so? Well, as Artem said, Spring Boot provides @Bean
of ConcurrentKafkaListenerContainerFactory
out of box and, as I googled, if you want to override it, the bean name should be "kafkaListenerContainerFactory" (according to this), otherwise you need to mention which container factory to use in containerFactory
parameter of @KafkaListener
like this @KafkaListener(topics = "users", groupId = "user", containerFactory = "containerFactory")
. Also, to fix deserialization problem, it's important to specify type mappings like this:
producerConfig.put(JsonSerializer.TYPE_MAPPINGS, "userCreated:org.example.producerservice.dto.UserCreatedEvent");
consumerConfig.put(JsonDeserializer.TYPE_MAPPINGS, "userCreated:org.example.userservice.dto.UserCreatedEvent");
This allows our consumer to understand to witch type it should deserialize object due to token "userCreated", after which you should specify the class name of your object (read Mapping Types) in spring docs