javaapache-kafkajunittestcontainers

Kafka topic not created in KafkaContainer


I'm trying to create a topic in my KafkaContainer using .withEnv("KAFKA_CREATE_TOPICS", "subs-topic:1:1") but topic not created

I have ContainersConfiguration class

@Configuration
public class ContainersConfiguration {
    public static final DockerImageName KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka:5.2.1");
    private final Network network = Network.newNetwork();

    @Bean(initMethod = "start", destroyMethod = "stop")
    public ZookeeperContainer zookeeperContainer(ZookeeperContainerConfiguration zookeeperContainerConfiguration) {
        return new ZookeeperContainer()
                .withNetwork(network)
                .withNetworkAliases(zookeeperContainerConfiguration.getHost());
    }


    @Bean(initMethod = "start", destroyMethod = "stop")
    public KafkaContainer kafkaContainer(KafkaContainerConfiguration kafkaContainerConfiguration,
                                         ZookeeperContainer zookeeperContainer) {
        return new KafkaContainer(KAFKA_IMAGE_NAME)
                .withNetwork(network)
                .withEnv("KAFKA_CREATE_TOPICS", "subs-topic:1:1").withNetworkAliases(kafkaContainerConfiguration.getBootstrapHost()).withStartupTimeout(Duration.ofSeconds(180))
                .withExternalZookeeper(zookeeperContainer.getNetworkAliases().get(0) + ":" + CLIENT_PORT);
    }
}

and KafkaContainerConfiguration class


@Configuration
@PropertySource(value = "classpath:services.properties")
@Getter
@Setter
public class KafkaContainerConfiguration {

    @NotNull
    @Value("${kafka.bootstrap.server.host}")
    private String bootstrapHost;

    @NotNull
    @Value("${kafka.data.topic}")
    private String dataTopic;

    @NotNull
    @Value("${kafka.notification.topic}")
    private String notificationTopic;

}

In service.properties I have the following: kafka.data.topic=subs-topic kafka.notification.topic=subs-topic-notifications

KafkaProducerConfigurationClass contains the following

@Configuration
public class KafkaProducerConfiguration {

    @Bean(destroyMethod = "close")
    public KafkaProducer<String, String> kafkaProducer(Properties kafkaProperties) {
        return new KafkaProducer<>(kafkaProperties);
    }

    @Bean(destroyMethod = "close")
    public AdminClient kafkaAdminClient(Properties kafkaProperties) {
        return AdminClient.create(kafkaProperties);
    }

    @SneakyThrows
    @Bean
    public Properties kafkaProperties(KafkaContainer kafkaContainer) {
        Properties properties = new Properties();
        properties.load(new FileReader(new File(ClassLoader.getSystemResource("kafka.producer.properties").getFile())));
        properties.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
        return properties;
    }

}

Why topic wasn't created? What should I do to create topic? I tried to use

CreateTopicsResult result = admin.createTopics(
  Collections.singleton(newTopic)
);

but it didn't help


Solution

  • using .withEnv("KAFKA_CREATE_TOPICS"

    Only wurstmeister/kafka supports the environment variable KAFKA_CREATE_TOPICS, but this image is no longer maintained.

    In service.properties I have the following: kafka.data.topic=subs-topic

    Okay. That just defines a String in your config.

    You need a @Bean NewTopic method to be ran. https://docs.spring.io/spring-kafka/reference/kafka/configuring-topics.html


    You also don't really need TestContainers with Spring-Kafka

    https://docs.spring.io/spring-kafka/reference/testing.html

    And your properties should be loaded from Spring's own resource files

    spring:
      kafka:
        bootstrap-servers: ...
        producer:
           properties:
              ...
    

    https://docs.spring.io/spring-boot/reference/messaging/kafka.html#messaging.kafka.additional-properties