I'm trying to create a topic in my KafkaContainer using .withEnv("KAFKA_CREATE_TOPICS", "subs-topic:1:1") but topic not created
I have ContainersConfiguration class
@Configuration
public class ContainersConfiguration {
public static final DockerImageName KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka:5.2.1");
private final Network network = Network.newNetwork();
@Bean(initMethod = "start", destroyMethod = "stop")
public ZookeeperContainer zookeeperContainer(ZookeeperContainerConfiguration zookeeperContainerConfiguration) {
return new ZookeeperContainer()
.withNetwork(network)
.withNetworkAliases(zookeeperContainerConfiguration.getHost());
}
@Bean(initMethod = "start", destroyMethod = "stop")
public KafkaContainer kafkaContainer(KafkaContainerConfiguration kafkaContainerConfiguration,
ZookeeperContainer zookeeperContainer) {
return new KafkaContainer(KAFKA_IMAGE_NAME)
.withNetwork(network)
.withEnv("KAFKA_CREATE_TOPICS", "subs-topic:1:1").withNetworkAliases(kafkaContainerConfiguration.getBootstrapHost()).withStartupTimeout(Duration.ofSeconds(180))
.withExternalZookeeper(zookeeperContainer.getNetworkAliases().get(0) + ":" + CLIENT_PORT);
}
}
and KafkaContainerConfiguration class
@Configuration
@PropertySource(value = "classpath:services.properties")
@Getter
@Setter
public class KafkaContainerConfiguration {
@NotNull
@Value("${kafka.bootstrap.server.host}")
private String bootstrapHost;
@NotNull
@Value("${kafka.data.topic}")
private String dataTopic;
@NotNull
@Value("${kafka.notification.topic}")
private String notificationTopic;
}
In service.properties
I have the following:
kafka.data.topic=subs-topic
kafka.notification.topic=subs-topic-notifications
KafkaProducerConfigurationClass contains the following
@Configuration
public class KafkaProducerConfiguration {
@Bean(destroyMethod = "close")
public KafkaProducer<String, String> kafkaProducer(Properties kafkaProperties) {
return new KafkaProducer<>(kafkaProperties);
}
@Bean(destroyMethod = "close")
public AdminClient kafkaAdminClient(Properties kafkaProperties) {
return AdminClient.create(kafkaProperties);
}
@SneakyThrows
@Bean
public Properties kafkaProperties(KafkaContainer kafkaContainer) {
Properties properties = new Properties();
properties.load(new FileReader(new File(ClassLoader.getSystemResource("kafka.producer.properties").getFile())));
properties.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
return properties;
}
}
Why topic wasn't created? What should I do to create topic? I tried to use
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic)
);
but it didn't help
using
.withEnv("KAFKA_CREATE_TOPICS"
Only wurstmeister/kafka
supports the environment variable KAFKA_CREATE_TOPICS
, but this image is no longer maintained.
In service.properties I have the following: kafka.data.topic=subs-topic
Okay. That just defines a String in your config.
You need a @Bean NewTopic
method to be ran. https://docs.spring.io/spring-kafka/reference/kafka/configuring-topics.html
You also don't really need TestContainers with Spring-Kafka
https://docs.spring.io/spring-kafka/reference/testing.html
And your properties should be loaded from Spring's own resource files
spring:
kafka:
bootstrap-servers: ...
producer:
properties:
...