spring-kafkaspring-kafka-test

EmbeddedKafka failing since Spring Boot 2.6.X : AccessDeniedException: ..\AppData\Local\Temp\spring.kafka*


e: this has been fixed through Spring Boot 2.6.5 (see https://github.com/spring-projects/spring-boot/issues/30243)

Since upgrading to Spring Boot 2.6.X (in my case: 2.6.1), I have multiple projects that now have failing unit-tests on Windows that cannot start EmbeddedKafka, that do run with Linux

There is multiple errors, but this is the first one thrown

...
  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.1)

2021-12-09 16:15:00.300  INFO 13864 --- [           main] k.utils.Log4jControllerRegistration$     : Registered kafka:type=kafka.Log4jController MBean
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : 
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :   ______                  _                                          
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :  |___  /                 | |                                         
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :     / /    ___     ___   | | __   ___    ___   _ __     ___   _ __   
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :    / /    / _ \   / _ \  | |/ /  / _ \  / _ \ | '_ \   / _ \ | '__|
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :   / /__  | (_) | | (_) | |   <  |  __/ |  __/ | |_) | |  __/ | |    
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :  /_____|  \___/   \___/  |_|\_\  \___|  \___| | .__/   \___| |_|
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :                                               | |                     
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :                                               |_|                     
2021-12-09 16:15:00.420  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : 
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:host.name=host.docker.internal
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:java.version=11.0.11
2021-12-09 16:15:00.422  INFO 13864 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:java.vendor=AdoptOpenJDK
...
2021-12-09 16:15:01.015  INFO 13864 --- [nelReaper-Fetch] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Fetch]: Starting
2021-12-09 16:15:01.015  INFO 13864 --- [lReaper-Produce] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Produce]: Starting
2021-12-09 16:15:01.016  INFO 13864 --- [lReaper-Request] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-Request]: Starting
2021-12-09 16:15:01.017  INFO 13864 --- [trollerMutation] lientQuotaManager$ThrottledChannelReaper : [ThrottledChannelReaper-ControllerMutation]: Starting
2021-12-09 16:15:01.037  INFO 13864 --- [           main] kafka.log.LogManager                     : Loading logs from log dirs ArraySeq(C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446)
2021-12-09 16:15:01.040  INFO 13864 --- [           main] kafka.log.LogManager                     : Attempting recovery for all logs in C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446 since no clean shutdown file was found
2021-12-09 16:15:01.043  INFO 13864 --- [           main] kafka.log.LogManager                     : Loaded 0 logs in 6ms.
2021-12-09 16:15:01.043  INFO 13864 --- [           main] kafka.log.LogManager                     : Starting log cleanup with a period of 300000 ms.
2021-12-09 16:15:01.045  INFO 13864 --- [           main] kafka.log.LogManager                     : Starting log flusher with a default period of 9223372036854775807 ms.
2021-12-09 16:15:01.052  INFO 13864 --- [           main] kafka.log.LogCleaner                     : Starting the log cleaner
2021-12-09 16:15:01.059  INFO 13864 --- [leaner-thread-0] kafka.log.LogCleaner                     : [kafka-log-cleaner-thread-0]: Starting
2021-12-09 16:15:01.224  INFO 13864 --- [name=forwarding] k.s.BrokerToControllerRequestThread      : [BrokerToControllerChannelManager broker=0 name=forwarding]: Starting
2021-12-09 16:15:01.325  INFO 13864 --- [           main] kafka.network.ConnectionQuotas           : Updated connection-accept-rate max connection creation rate to 2147483647
2021-12-09 16:15:01.327  INFO 13864 --- [           main] kafka.network.Acceptor                   : Awaiting socket connections on localhost:63919.
2021-12-09 16:15:01.345  INFO 13864 --- [           main] kafka.network.SocketServer               : [SocketServer listenerType=ZK_BROKER, nodeId=0] Created data-plane acceptor and processors for endpoint : ListenerName(PLAINTEXT)
2021-12-09 16:15:01.350  INFO 13864 --- [0 name=alterIsr] k.s.BrokerToControllerRequestThread      : [BrokerToControllerChannelManager broker=0 name=alterIsr]: Starting
2021-12-09 16:15:01.364  INFO 13864 --- [eaper-0-Produce] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-Produce]: Starting
2021-12-09 16:15:01.364  INFO 13864 --- [nReaper-0-Fetch] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-Fetch]: Starting
2021-12-09 16:15:01.365  INFO 13864 --- [0-DeleteRecords] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-DeleteRecords]: Starting
2021-12-09 16:15:01.365  INFO 13864 --- [r-0-ElectLeader] perationPurgatory$ExpiredOperationReaper : [ExpirationReaper-0-ElectLeader]: Starting
2021-12-09 16:15:01.374  INFO 13864 --- [rFailureHandler] k.s.ReplicaManager$LogDirFailureHandler  : [LogDirFailureHandler]: Starting
2021-12-09 16:15:01.390  INFO 13864 --- [           main] kafka.zk.KafkaZkClient                   : Creating /brokers/ids/0 (is it secure? false)
2021-12-09 16:15:01.400  INFO 13864 --- [           main] kafka.zk.KafkaZkClient                   : Stat of the created znode at /brokers/ids/0 is: 25,25,1639062901396,1639062901396,1,0,0,72059919267528704,204,0,25

2021-12-09 16:15:01.400  INFO 13864 --- [           main] kafka.zk.KafkaZkClient                   : Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT://localhost:63919, czxid (broker epoch): 25
2021-12-09 16:15:01.410 ERROR 13864 --- [           main] kafka.server.BrokerMetadataCheckpoint    : Failed to write meta.properties due to

java.nio.file.AccessDeniedException: C:\Users\ddrop\AppData\Local\Temp\spring.kafka.bf8e2b62-a1f2-4092-b292-a15e35bd31ad18378079390566696446
    at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:89) ~[na:na]
    at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) ~[na:na]
    at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:108) ~[na:na]

Reproduceable via spring Initializr + adding "Spring Kafka": https://start.spring.io/#!type=maven-project&language=java&platformVersion=2.6.1&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=kafka

And then have following test-class to execute:

package com.example.demo;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka
class ApplicationTest {

    @Test
    void run() {
        int i = 1 + 1; // just a line of code to set a debug-point
    }
}

I do not have this error when pinning kafka.version to 2.8.1 in pom.xml's properties.

It seems like the cause is in Kafka itself, but I have a hard time figuring out if it is spring-kafka intitializing Kafka via EmbeddedKafka incorrectly or if Kafka itself is the culrit here.

Anyone has an idea? Am I missing a test-parameter to set?


Solution

  • While I will wait till kafka 3.0.1 is released, for anyone who would just switch to Testcontainers, but is not familiar how they can be set up:

    Sample based on this initlzr: https://start.spring.io/#!type=maven-project&language=java&platformVersion=2.6.1&packaging=jar&jvmVersion=11&groupId=com.example&artifactId=demo&name=demo&description=Demo%20project%20for%20Spring%20Boot&packageName=com.example.demo&dependencies=kafka,testcontainers

    Runnable app

    package com.example.demo;
    
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    
    import java.time.LocalDateTime;
    import java.util.stream.IntStream;
    
    @SpringBootApplication
    public class DemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, args);
        }
    
        @KafkaListener(topics = "demo", groupId = "demo-group")
        public void listen(String in) {
            System.out.println("Processing: " + in);
        }
    
        @Bean
        public NewTopic topic() {
            return new NewTopic("demo", 5, (short) 1);
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                IntStream.range(0, 10).forEach(i -> {
                            String event = "foo" + i;
                            System.out.println("Sending " + event);
                            template.send("demo", i + "", event);
    
                        }
                );
    
            };
        }
    }
    

    Testcode with testcontainers, where Kafka will be spun up in Docker

    package com.example.demo;
    
    import org.junit.jupiter.api.BeforeAll;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.DynamicPropertyRegistry;
    import org.springframework.test.context.DynamicPropertySource;
    import org.testcontainers.containers.KafkaContainer;
    import org.testcontainers.junit.jupiter.Container;
    import org.testcontainers.junit.jupiter.Testcontainers;
    import org.testcontainers.utility.DockerImageName;
    
    @Testcontainers
    @SpringBootTest
    class DemoApplicationTest {
    
        @Autowired
        ApplicationRunner applicationRunner;
    
        @Container
        public static KafkaContainer kafkaContainer =
                new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
    
        @BeforeAll
        static void setUp() {
            kafkaContainer.start();
        }
    
        @DynamicPropertySource
        static void addDynamicProperties(DynamicPropertyRegistry registry) {
            registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
        }
    
    
        @Test
        void run() throws Exception {
            applicationRunner.run(null);
        }
    }
    

    Necessary additions to your pom.xml

        <dependencies>
    ...
            </dependency>
            <dependency>
                <groupId>org.testcontainers</groupId>
                <artifactId>junit-jupiter</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.testcontainers</groupId>
                <artifactId>kafka</artifactId>
                <scope>test</scope>
            </dependency>
    ...
        </dependencies>
    
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.testcontainers</groupId>
                    <artifactId>testcontainers-bom</artifactId>
                    <version>1.16.2</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>