javaspringapache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

@StreamListener doesn't work for secured Kafka streaming, TopologyException


My demo application has:

application.yml

# some values to reduce duplicates in configuration
kafka-conf:
  server: kafka.host:9093
  topic-name: employee.test.incoming.json
  keystore: classpath:kafka_client_keystore.jks
  truststore: classpath:kafka_client_truststore.jks
  group: hr-test


spring:
  application:
    name: ${kafka-conf.group}-streaming
  kafka:
    security:
      protocol: SSL
    ssl:
      key-store-location: ${kafka-conf.keystore}
      key-store-password: ${KEY_STORE_PASSWORD}
      key-store-type: JKS
      key-password: ${KEY_PASSWORD}
      trust-store-location: ${kafka-conf.truststore}
      trust-store-password: ${TRUST_STORE_PASSWORD}
      trust-store-type: JKS
    topicName: ${kafka-conf.topic-name}
    consumer:
      bootstrap-servers: ${kafka-conf.server}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: ${kafka-conf.group}
      auto-offset-reset: earliest
    producer:
      bootstrap-servers: ${kafka-conf.server}
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    bootstrap-servers: ${kafka-conf.server}
  cloud:
    stream:
      bindings:
        kafka-input-channel:
          contentType: application/json
          destination: ${kafka-conf.topic-name}
          group: ${kafka-conf.group}-streaming # todo check me
      kafka:
        # todo check this
        bindings:
          kafka-input-channel:
            consumer:
              keySerde: org.apache.kafka.common.serialization.Serdes.StringSerde
              valueSerde: org.springframework.kafka.support.serializer.JsonSerde
        binder:
          # confirmed: https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.5/reference/html/spring-cloud-stream-binder-kafka.html#_example_security_configuration
          # + : https://kafka.apache.org/090/documentation.html#security_configclients
          configuration:
            [security.protocol]: SSL
            [ssl.truststore.location]: ${kafka-conf.truststore}
            [ssl.truststore.password]: ${TRUST_STORE_PASSWORD}
            [ssl.truststore.type]: JKS
            [ssl.keystore.location]: ${kafka-conf.keystore}
            [ssl.keystore.password]: ${KEY_STORE_PASSWORD}
            [ssl.key.password]: ${KEY_PASSWORD}
            [ssl.keystore.type]: JKS
            default:
              key.serde: org.apache.kafka.common.serialization.Serdes.StringSerde
              value.serde: org.springframework.kafka.support.serializer.JsonSerde
        streams:
          # confirmed: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_consumer_properties
          default:
            consumer:
              application.id: ${kafka-conf.group}-default
          binder:
            brokers: ${kafka-conf.server}

KafkaProducer

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j(topic = "Producer Logger")
public class KafkaProducer {

    @Value("${kafka-conf.topic-name}")
    private String topic;

    @Autowired
    private KafkaTemplate<String, Message> kafkaTemplate;

    public void sendMessage(String key, Message message) {
        log.debug("sending message. where key: <{}>, message: <{}>", key, message);
        kafkaTemplate.send(topic, key, message);
    }
}

KafkaConsumer

import lombok.extern.apachecommons.CommonsLog;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@Service
@CommonsLog(topic = "Consumer Logger")
public class KafkaConsumer {

    private final List<Message> messages = new CopyOnWriteArrayList<>();

    public List<Message> getMessages() {
        return messages;
    }

    @KafkaListener(topics = "${kafka-conf.topic-name}")
    public void receive(@Payload Message message,
//                        @Header(KafkaHeaders.RECEIVED_KEY) Integer key,
                        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
        messages.add(message);
        log.info(String.format("topic: %s", topic));
        log.info(String.format("Message saved: %s", message));
    }

}

KafkaStreamingConsumer

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@EnableBinding(KafkaSink.class)
@Service
@Slf4j(topic = "Streaming Consumer Logger")
public class KafkaStreamingConsumer {

    private final List<Message> messages = new CopyOnWriteArrayList<>();

    public List<Message> getMessages() {
        return messages;
    }

    @StreamListener(KafkaSink.INPUT)
    public void consume(KStream<String, Message> message) {
        log.info("income detected.");
        message.peek((k, v) -> log.info("Key: {}, value: {}", k, v));
        log.info("Streamed message saved: {}", message);
    }

    public interface KafkaSink {

        String INPUT = "kafka-input-channel";

        @Input(INPUT)
        SubscribableChannel kafkaInputChannel();
    }
}

KafkaStreamsConfig - is an extraconfiguration

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration
@Slf4j(topic = "Streaming Config Logger")
public class KafkaStreamsConfig {

    @Value("${kafka-conf.topic-name}")
    private String topic;

    @Bean
    public KStream<String, Message> kStream(StreamsBuilder streamsBuilder) {
        JsonSerde<Message> messageSerde = new JsonSerde<>(Message.class);

        KStream<String, Message> input = streamsBuilder.stream(topic,
                        Consumed.with(Serdes.String(), messageSerde))
                .peek((k, v) -> log.info("Received message <{}>, key: <{}>", k, v));

        return input;
    }
}

KafkaPocApplication

the enter point
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafkaStreams;

@SpringBootApplication
@EnableKafkaStreams
public class KafkaPocApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaPocApplication.class, args);
    }

}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.1.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.demo</groupId>
    <artifactId>kafka-poc</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-poc</name>
    <description>PoC project that demonstrates how to work with kafka using java app</description>

    <properties>
        <java.version>17</java.version>
        <spring-cloud.version>2020.0.4</spring-cloud.version>
        <spring-boot.version>3.1.5</spring-boot.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <!-- Spring Boot Dependencies -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <repositories>
        <repository>
            <id>spring-releases</id>
            <url>https://repo.spring.io/libs-release</url>
        </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <image>
                        <builder>paketobuildpacks/builder-jammy-base:latest</builder>
                    </image>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Issue

KafkaStreamingConsumer doesn't work correctly, no logged information about problem, no input stream data. Checked documentation and yml configguration @StreamListener(KafkaSink.INPUT) should working in my case but it doesn't. To reduce research cases i've created KafkaStreamsConfig and it works. The main question is - how to make working secured kafka stream for case when topology configured in yml without extra java configurations (expect @StreamListener)?

P.S.

The important points (i think important), i highlighted using header font.

P.S.2

Experimented to start only secured streaming kafka consumer.

`yml reduced to

# some values to reduce duplicates in configuration
kafka-conf:
  server: kafka.host:9093
  topic-name: employee.test.incoming.json
  keystore: classpath:kafka_client_keystore.jks
  truststore: classpath:kafka_client_truststore.jks
  group: hr-test


spring:
  application:
    name: ${kafka-conf.group}-streaming
  cloud:
    stream:
      bindings:
        kafka-input-channel:
          destination: ${kafka-conf.topic-name}
          group: ${kafka-conf.group}-streaming # todo check me
      kafka:
        # todo check this
        bindings:
          kafka-input-channel:
            consumer:
              keySerde: org.apache.kafka.common.serialization.Serdes.StringSerde
              valueSerde: org.springframework.kafka.support.serializer.JsonSerde
        binder:
          # confirmed: https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.5/reference/html/spring-cloud-stream-binder-kafka.html#_example_security_configuration
          # + : https://kafka.apache.org/090/documentation.html#security_configclients
          configuration:
            [security.protocol]: SSL
            [ssl.truststore.location]: ${kafka-conf.truststore}
            [ssl.truststore.password]: ${TRUST_STORE_PASSWORD}
            [ssl.truststore.type]: JKS
            [ssl.keystore.location]: ${kafka-conf.keystore}
            [ssl.keystore.password]: ${KEY_STORE_PASSWORD}
            [ssl.key.password]: ${KEY_PASSWORD}
            [ssl.keystore.type]: JKS
            [ssl.enabled.protocols]: TLSv1.2
            default:
              key.serde: org.apache.kafka.common.serialization.Serdes.StringSerde
              value.serde: org.springframework.kafka.support.serializer.JsonSerde
        streams:
          # confirmed: https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_consumer_properties
          default:
            consumer:
              application.id: ${kafka-conf.group}-default
          binder:
            brokers: ${kafka-conf.server}

KafkaStreamingConsumer

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@EnableBinding(KafkaStreamingConsumer.KafkaSink.class)
@Service
@Slf4j(topic = "Streaming Consumer Logger")
public class KafkaStreamingConsumer {

    private final List<Message> messages = new CopyOnWriteArrayList<>();

    public List<Message> getMessages() {
        return messages;
    }

    @StreamListener(KafkaSink.INPUT)
    public void consume(KStream<String, Message> message) {
//    public void consume(KStream<String, String> message) {
        log.info("income detected.");
        message.peek((k, v) -> log.info("Key: {}, value: {}", k, v));
        log.info("Streamed message saved: {}", message);
    }

    public interface KafkaSink {

        String INPUT = "kafka-input-channel";

        @Input(INPUT)
        SubscribableChannel kafkaInputChannel();
    }
}

and failed with TopologyException (similar configuration worked for me if kafka streams are not secured):

01:34:14.776 [main] INFO  o.s.b.a.l.ConditionEvaluationReportLogger - 

Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
01:34:14.806 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'defaultKafkaStreamsBuilder'
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:357)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:156)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:124)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:966)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:619)
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:738)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:440)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1295)
    at com.bayer.hrkafkapoc.KafkaPocApplication.main(KafkaPocApplication.java:12)
Caused by: org.springframework.kafka.KafkaException: Could not start stream: 
    at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:354)
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:179)
    ... 13 common frames omitted
Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
    at org.apache.kafka.streams.processor.internals.TopologyMetadata.getNumStreamThreads(TopologyMetadata.java:377)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:930)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:856)
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:762)
    at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:335)
    ... 14 common frames omitted

Solution

  • Indeed @StreamListener does not work. It's been deprecated for over 5 years and has now been completely removed. Please upgrade to functional style.