I have a simple Java Spring Boot app that is supposed to consume the topic messages from Kafka as an object class, but it only gets called as a string. only the handleDefault is called.. why ?
@Component
@KafkaListener(topics="product-created-events-topic", groupId = "product-created-events")
public class ProductCreatedEventHandler {
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
@KafkaHandler(isDefault = true)
public void handle(ProductCreatedEvent productCreatedEvent){
LOGGER.info("get msg from kafka:"+productCreatedEvent.title());
}
@KafkaHandler(isDefault = true)
public void handleDefault(String message) {
LOGGER.warn("Received unknown message type from Kafka: " + message);
}
}
I like it to be consumed as the ProductCreatedEvent object not as string.
This is the class:
package com.test.ws.core;
import java.math.BigDecimal;
public class ProductCreatedEvent {
private String productId;
private String title;
private BigDecimal price;
private Integer quantity;
public ProductCreatedEvent(String productId, String title, BigDecimal price, Integer quantity) {
this.productId = productId;
this.title = title;
this.price = price;
this.quantity = quantity;
}
public String productId() {
return this.productId;
}
public ProductCreatedEvent setProductId(String productId) {
this.productId = productId;
return this;
}
public String title() {
return this.title;
}
public ProductCreatedEvent setTitle(String title) {
this.title = title;
return this;
}
public BigDecimal price() {
return this.price;
}
public ProductCreatedEvent setPrice(BigDecimal price) {
this.price = price;
return this;
}
public Integer quantity() {
return this.quantity;
}
public ProductCreatedEvent setQuantity(Integer quantity) {
this.quantity = quantity;
return this;
}
}
and the bootstrap class:
@SpringBootApplication
public class ConsumersApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumersApplication.class, args);
}
}
The configuration of the Spring Boot app:
spring.application.name=consumers
server.port=8083
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.group-id=product-created-events
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
This is the topic:
~/kafka/kafka_2.13-3.7.0/bin$ ./kafka-console-consumer.sh --topic product-created-events-topic --from-beginning --bootstrap-server localhost:9092 --property print.key=true --property print.value=true
ea59cd46-b205-4769-84d7-69cf37b3ba79 {"productId":"ea59cd46-b205-4769-84d7-69cf37b3ba79","title":"iphone1","price":222,"quantity":19}
ea59cd46-b205-4769-84d7-69cf37b3ba79 {"productId":"ea59cd46-b205-4769-84d7-69cf37b3ba79","title":"iphone1","price":222,"quantity":19}
b1f9dc43-58b4-44e5-9184-afe9159cd757 {"productId":"b1f9dc43-58b4-44e5-9184-afe9159cd757","title":"iphone2","price":212,"quantity":19}
I think you are missing the property spring.kafka.consumer.value-deserializer
, as the consumer deserializes a record, while the producer serializes it. I have normally used Apache Avro rather that JSON, but I found an example provided by Spring that does the following:
The producer uses a JsonSerializer; the consumer uses the ByteArrayDeserializer, together with a JsonMessageConverter which converts to the type of the listener method argument.
Then you have another more advanced example where the producer uses JsonSerializer
and there are several consumer listeners expecting different types. The consumer listener is selected dynamically based on a producer-consumer type mapping.
The producer uses a JsonSerializer; the consumer uses a ByteArrayDeserializer, together with a ByteArrayJsonMessageConverter which converts to the required type of the listener method argument. We can’t infer the type in this case (because the type is used to choose the method to call). We therefore configure type mapping on the producer and consumer side. See the application.yml for the producer side and the converter bean on the consumer side.