To test how our spring boot application handles it when the kafka cluster is not yet up, I would like to spin up an embedded kafka cluster in a junit test some time after the application starts up. How could I approach this?
As I understand it spring-kafka-test's @EmbeddedKafka
starts up the cluster before creating the application context of a SpringBootTest. Is there any way to configure that timing?
When defined as a bean (via @EmbeddedKafka
) or as a JUnit condition (again via @EmbeddedKafka
- when there is no test Spring ApplicationContext), the broker is started in afterPropertiesSet()
.
You should be able to create the broker manually and call afterPropertiesSet()
whenever you are ready.
Here is the code from the JUnit5 EmbeddedkafkaCondition
:
@SuppressWarnings("unchecked")
private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
EmbeddedKafkaBroker broker;
int[] ports = setupPorts(embedded);
broker = new EmbeddedKafkaBroker(embedded.count(), embedded.controlledShutdown(),
embedded.partitions(), embedded.topics())
.zkPort(embedded.zookeeperPort())
.kafkaPorts(ports)
.zkConnectionTimeout(embedded.zkConnectionTimeout())
.zkSessionTimeout(embedded.zkSessionTimeout());
Properties properties = new Properties();
for (String pair : embedded.brokerProperties()) {
if (!StringUtils.hasText(pair)) {
continue;
}
try {
properties.load(new StringReader(pair));
}
catch (Exception ex) {
throw new IllegalStateException("Failed to load broker property from [" + pair + "]",
ex);
}
}
if (StringUtils.hasText(embedded.brokerPropertiesLocation())) {
Resource propertiesResource = new PathMatchingResourcePatternResolver()
.getResource(embedded.brokerPropertiesLocation());
if (!propertiesResource.exists()) {
throw new IllegalStateException(
"Failed to load broker properties from [" + propertiesResource
+ "]: resource does not exist.");
}
try (InputStream in = propertiesResource.getInputStream()) {
Properties p = new Properties();
p.load(in);
p.forEach(properties::putIfAbsent);
}
catch (IOException ex) {
throw new IllegalStateException(
"Failed to load broker properties from [" + propertiesResource + "]", ex);
}
}
broker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
if (StringUtils.hasText(embedded.bootstrapServersProperty())) {
broker.brokerListProperty(embedded.bootstrapServersProperty());
}
broker.afterPropertiesSet();
return broker;
}