unit-testingapache-kafkaapache-kafka-streamstopology

Kafka Streams : Stream Thread failed to lock State Directory


I am trying to test my Kafka Streams application. I have built a simple topology where I read from an input topic and store the same data in a state store.

I tried writing unit tests for this topology using TopologyTestDriver. When I run the test, I got encountered with following error.

org.apache.kafka.streams.errors.LockException: stream-thread [main] task [0_0] Failed to lock the state directory for task 0_0
    at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:197)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:275)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:403)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:257)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:228)
    at streams.checkStreams.checkStreamsTest.setup(checkStreamsTest.java:99)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    ....

I can see state store getting created locally in /tmp/kafka-streams, but somehow streams thread is unable to get a lock over it. I searched and found that this error might be because of two streams threads are trying to acces it, one has the lock so that other has to wait. But I don't see two streams thread getting created in my code. I am new to this kafka streams and its testing, am I missing any thing here?


Solution

  • The TopologyTestDriver does not create any background threads, so multi-threading (from KafkaStreams itself) should not be an issue -- however, as @BartoszWardziƄski pointed out, if your testing framework executed tests in parallel, and you use the same application.id in different tests, it may lead to locking issues.

    The recommendation for tests is, to generate a random application.id to avoid this issue.