I have to integrate Kafka to Cuba and I thought it would as easy as adding the spring kafka dependency and creating a Configuration
annotated class to initialize the Kafka Consumer since Cuba is based on Spring.
When I added a Configuration, I found out that it is not scanned when Cuba is started. When I switch to CUBA view, I noticed that only those classes annotated as Service
or Component
will be read. However, even If I add a Component
class, it is still not scanned properly (I added a field annotated with @Value
that looks for a non-existing property but Cuba did not throw any error when I start it)
There is a simple example on CUBA+Kafka integration, you can find it here: https://github.com/cuba-labs/kafka-sample
The configuration process is taken from the official Spring documentation.
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-kafka");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Inject
private KafkaTemplate<Integer, String> template;
@Override
public void sendMessage(String message) {
log.info("Sending {} using Kafka", message);
long id = uniqueNumbersService.getNextNumber("users");
ListenableFuture<SendResult<Integer, String>> send = template.send("users", (int) id, message);
send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable ex) {
log.info("Failed to send message {}, error {}", message, ex.getMessage());
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
log.info("Message {} sent", message);
}
});
}
@KafkaListener
annotation. For example, the example below saves kafka messages to the database.@Component
@DependsOn("consumerFactory")
public class MessageListener {
@Inject
private DataManager dataManager;
@KafkaListener(id = "sample-kafka", topics = "users")
public void listen1(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int id) {
KafkaMessage kafkaMessage = dataManager.create(KafkaMessage.class);
kafkaMessage.setKafkaID(id);
kafkaMessage.setContent(foo);
dataManager.commit(kafkaMessage);
}