I have started an instance of EmbeddedKafka in a JUnit test. I can read the records that I have pushed to my stream correctly in my application, but one thing I have noticed is that I only have one partition per topic. Can anyone explain why?
In my application I have the following:
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
This returns a list with one item. When running against local Kafka with 3 partitions, it returns a list with 3 items as expected.
And my test looks like:
@EmbeddedKafka(partitions = 3)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
locations = "classpath:application-test.properties",
properties = {"app.onlyMonitorIfDataUpdated=true"})
public class MonitorRestKafkaIntegrationTest {
private EmbeddedKafkaBroker embeddedKafkaBroker;
private String embeddedBrokers;
private WebApplicationContext wac;
private JsonUtility jsonUtility;
private MockMvc mockMvc;
public void setup() {
mockMvc = webAppContextSetup(wac).build();
private ResultActions interactiveMonitoringREST(String eggID, String monitoringParams) throws Exception {
return mockMvc.perform(post(String.format("/eggs/%s/interactive", eggID)).contentType(MediaType.APPLICATION_JSON_VALUE).content(monitoringParams));
public void testEmbeddedKafka() throws Exception {
Producer<String, String> producer = getKafkaProducer();
sendRecords(producer, 3);
interactiveMonitoringREST(EGG_KAFKA, monitoringParams)
private void sendRecords(Producer<String, String> producer, int records) {
for (int i = 0; i < records; i++) {
String val = "{\"auto_age\":" + String.valueOf(i + 10) + "}";
producer.send(new ProducerRecord<>(testTopic, String.valueOf(i), val));
private Producer<String, String> getKafkaProducer() {
Map<String, Object> prodConfigs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
return new DefaultKafkaProducerFactory<>(prodConfigs, new StringSerializer(), new StringSerializer()).createProducer();
private void updateConn() throws Exception {
String conn = getConnectionREST(CONN_KAFKA).andReturn().getResponse().getContentAsString();
ConnectionDetail connectionDetail = jsonUtility.fromJson(conn, ConnectionDetail.class);
connectionDetail.getDetails().put(ConnectionDetailConstants.CONNECTION_SERVER, embeddedBrokers);
String updatedConn = jsonUtility.toJson(connectionDetail);
updateConnectionREST(CONN_KAFKA, updatedConn).andExpect(status().isOk());
You need to tell the broker to pre-create the topics...
@EmbeddedKafka(topics = "foo", partitions = 3)
class So57481979ApplicationTests {
void testPartitions(@Autowired KafkaAdmin admin) throws InterruptedException, ExecutionException {
AdminClient client = AdminClient.create(admin.getConfig());
Map<String, TopicDescription> map = client.describeTopics(Collections.singletonList("foo")).all().get();
Or set the num.partitions
broker property if you want the broker to auto-create the topics for you on first use.
We should probably automatically do that, based on the partitions property.