My demo application has:
application.yml
# some values to reduce duplicates in configuration
kafka-conf:
server: kafka.host:9093
topic-name: employee.test.incoming.json
keystore: classpath:kafka_client_keystore.jks
truststore: classpath:kafka_client_truststore.jks
group: hr-test
spring:
application:
name: ${kafka-conf.group}-streaming
kafka:
security:
protocol: SSL
ssl:
key-store-location: ${kafka-conf.keystore}
key-store-password: ${KEY_STORE_PASSWORD}
key-store-type: JKS
key-password: ${KEY_PASSWORD}
trust-store-location: ${kafka-conf.truststore}
trust-store-password: ${TRUST_STORE_PASSWORD}
trust-store-type: JKS
topicName: ${kafka-conf.topic-name}
consumer:
bootstrap-servers: ${kafka-conf.server}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: ${kafka-conf.group}
auto-offset-reset: earliest
producer:
bootstrap-servers: ${kafka-conf.server}
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
bootstrap-servers: ${kafka-conf.server}
cloud:
stream:
bindings:
kafka-input-channel:
contentType: application/json
destination: ${kafka-conf.topic-name}
group: ${kafka-conf.group}-streaming # todo check me
kafka:
# todo check this
bindings:
kafka-input-channel:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes.StringSerde
valueSerde: org.springframework.kafka.support.serializer.JsonSerde
binder:
# confirmed: https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.5/reference/html/spring-cloud-stream-binder-kafka.html#_example_security_configuration
# + : https://kafka.apache.org/090/documentation.html#security_configclients
configuration:
[security.protocol]: SSL
[ssl.truststore.location]: ${kafka-conf.truststore}
[ssl.truststore.password]: ${TRUST_STORE_PASSWORD}
[ssl.truststore.type]: JKS
[ssl.keystore.location]: ${kafka-conf.keystore}
[ssl.keystore.password]: ${KEY_STORE_PASSWORD}
[ssl.key.password]: ${KEY_PASSWORD}
[ssl.keystore.type]: JKS
default:
key.serde: org.apache.kafka.common.serialization.Serdes.StringSerde
value.serde: org.springframework.kafka.support.serializer.JsonSerde
streams:
# confirmed: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_consumer_properties
default:
consumer:
application.id: ${kafka-conf.group}-default
binder:
brokers: ${kafka-conf.server}
KafkaProducer
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j(topic = "Producer Logger")
public class KafkaProducer {
@Value("${kafka-conf.topic-name}")
private String topic;
@Autowired
private KafkaTemplate<String, Message> kafkaTemplate;
public void sendMessage(String key, Message message) {
log.debug("sending message. where key: <{}>, message: <{}>", key, message);
kafkaTemplate.send(topic, key, message);
}
}
KafkaConsumer
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@Service
@CommonsLog(topic = "Consumer Logger")
public class KafkaConsumer {
private final List<Message> messages = new CopyOnWriteArrayList<>();
public List<Message> getMessages() {
return messages;
}
@KafkaListener(topics = "${kafka-conf.topic-name}")
public void receive(@Payload Message message,
// @Header(KafkaHeaders.RECEIVED_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
messages.add(message);
log.info(String.format("topic: %s", topic));
log.info(String.format("Message saved: %s", message));
}
}
KafkaStreamingConsumer
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@EnableBinding(KafkaSink.class)
@Service
@Slf4j(topic = "Streaming Consumer Logger")
public class KafkaStreamingConsumer {
private final List<Message> messages = new CopyOnWriteArrayList<>();
public List<Message> getMessages() {
return messages;
}
@StreamListener(KafkaSink.INPUT)
public void consume(KStream<String, Message> message) {
log.info("income detected.");
message.peek((k, v) -> log.info("Key: {}, value: {}", k, v));
log.info("Streamed message saved: {}", message);
}
public interface KafkaSink {
String INPUT = "kafka-input-channel";
@Input(INPUT)
SubscribableChannel kafkaInputChannel();
}
}
KafkaStreamsConfig
- is an extraconfigurationimport lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonSerde;
@Configuration
@Slf4j(topic = "Streaming Config Logger")
public class KafkaStreamsConfig {
@Value("${kafka-conf.topic-name}")
private String topic;
@Bean
public KStream<String, Message> kStream(StreamsBuilder streamsBuilder) {
JsonSerde<Message> messageSerde = new JsonSerde<>(Message.class);
KStream<String, Message> input = streamsBuilder.stream(topic,
Consumed.with(Serdes.String(), messageSerde))
.peek((k, v) -> log.info("Received message <{}>, key: <{}>", k, v));
return input;
}
}
KafkaPocApplication
the enter point
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafkaStreams;
@SpringBootApplication
@EnableKafkaStreams
public class KafkaPocApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaPocApplication.class, args);
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.demo</groupId>
<artifactId>kafka-poc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-poc</name>
<description>PoC project that demonstrates how to work with kafka using java app</description>
<properties>
<java.version>17</java.version>
<spring-cloud.version>2020.0.4</spring-cloud.version>
<spring-boot.version>3.1.5</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Spring Boot Dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>spring-releases</id>
<url>https://repo.spring.io/libs-release</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<image>
<builder>paketobuildpacks/builder-jammy-base:latest</builder>
</image>
</configuration>
</plugin>
</plugins>
</build>
</project>
KafkaStreamingConsumer
doesn't work correctly, no logged information about problem, no input stream data. Checked documentation and yml configguration @StreamListener(KafkaSink.INPUT)
should working in my case but it doesn't.
To reduce research cases i've created KafkaStreamsConfig
and it works.
The main question is - how to make working secured kafka stream for case when topology configured in yml
without extra java configurations (expect @StreamListener
)?
The important points (i think important), i highlighted using header font.
Experimented to start only secured streaming kafka consumer.
`yml reduced to
# some values to reduce duplicates in configuration
kafka-conf:
server: kafka.host:9093
topic-name: employee.test.incoming.json
keystore: classpath:kafka_client_keystore.jks
truststore: classpath:kafka_client_truststore.jks
group: hr-test
spring:
application:
name: ${kafka-conf.group}-streaming
cloud:
stream:
bindings:
kafka-input-channel:
destination: ${kafka-conf.topic-name}
group: ${kafka-conf.group}-streaming # todo check me
kafka:
# todo check this
bindings:
kafka-input-channel:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes.StringSerde
valueSerde: org.springframework.kafka.support.serializer.JsonSerde
binder:
# confirmed: https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.5/reference/html/spring-cloud-stream-binder-kafka.html#_example_security_configuration
# + : https://kafka.apache.org/090/documentation.html#security_configclients
configuration:
[security.protocol]: SSL
[ssl.truststore.location]: ${kafka-conf.truststore}
[ssl.truststore.password]: ${TRUST_STORE_PASSWORD}
[ssl.truststore.type]: JKS
[ssl.keystore.location]: ${kafka-conf.keystore}
[ssl.keystore.password]: ${KEY_STORE_PASSWORD}
[ssl.key.password]: ${KEY_PASSWORD}
[ssl.keystore.type]: JKS
[ssl.enabled.protocols]: TLSv1.2
default:
key.serde: org.apache.kafka.common.serialization.Serdes.StringSerde
value.serde: org.springframework.kafka.support.serializer.JsonSerde
streams:
# confirmed: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_consumer_properties
default:
consumer:
application.id: ${kafka-conf.group}-default
binder:
brokers: ${kafka-conf.server}
KafkaStreamingConsumer
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@EnableBinding(KafkaStreamingConsumer.KafkaSink.class)
@Service
@Slf4j(topic = "Streaming Consumer Logger")
public class KafkaStreamingConsumer {
private final List<Message> messages = new CopyOnWriteArrayList<>();
public List<Message> getMessages() {
return messages;
}
@StreamListener(KafkaSink.INPUT)
public void consume(KStream<String, Message> message) {
// public void consume(KStream<String, String> message) {
log.info("income detected.");
message.peek((k, v) -> log.info("Key: {}, value: {}", k, v));
log.info("Streamed message saved: {}", message);
}
public interface KafkaSink {
String INPUT = "kafka-input-channel";
@Input(INPUT)
SubscribableChannel kafkaInputChannel();
}
}
and failed with TopologyException
(similar configuration worked for me if kafka streams are not secured):
01:34:14.776 [main] INFO o.s.b.a.l.ConditionEvaluationReportLogger -
Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
01:34:14.806 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'defaultKafkaStreamsBuilder'
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:357)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:156)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:124)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:966)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:619)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:738)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:440)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1295)
at com.bayer.hrkafkapoc.KafkaPocApplication.main(KafkaPocApplication.java:12)
Caused by: org.springframework.kafka.KafkaException: Could not start stream:
at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:354)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:179)
... 13 common frames omitted
Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
at org.apache.kafka.streams.processor.internals.TopologyMetadata.getNumStreamThreads(TopologyMetadata.java:377)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:930)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:856)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:762)
at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:335)
... 14 common frames omitted
Indeed @StreamListener
does not work. It's been deprecated for over 5 years and has now been completely removed. Please upgrade to functional style.