I am currently using Apache Curator to externalize locking of a shared resource (a row within a database). To summarize the issue, I am running 2 instances of a service (using Spring Boot), lets call this Service A, and lets call the instances A1 and A2, which are deployed in different areas. I lock on the id (primary key) of a table on a shared database which represents a file.
Within the code of Service A, I have created a singleton (BaseLockService) which handles all the locking within the project. This also means that for the 2 running instances, they each contain a singleton for processing the locking. The recipe i'm using is Shared Reentrant Lock which is using the InterProcessMutex class, however there is never a case for a reentrant lock. Its the class with the description closest to my needs.
The main process that runs is a @Scheduled one, and there is a 30 second delay between execution times. In addition, I have created a bean for the ThreadPoolTaskScheduler which appends a UUID to the thread name and the pool size is 1. Reason for this UUID is because without it, when A1 and A2 run concurrently, they both contain a Thread with the name "task-scheduler-1". This originally caused my issue with locking because A1 might possess the lock and then at the same time while processing the file, A2 requests the lock and since they share the same name, Curator returns true on lock.acquire(), thus two instances possessing the same lock.
When running one instance, this isn't an issue. I see within ZooKeeper the ZNodes are being created, and I see the UUID which Curator generates for Ephemeral locks. When running two or more instances, the process sometimes gets into a race condition where A1 possesses the lock, then runs a lengthy process. Then A2 somehow obtains the lock, finishes the process quickly and releases the lock. Then when A1 is finished and tries to unlock, I get the following Exception:
[2019-07-09 21:53:54,485] ERROR [08c598b9-7254-408c-8ed2-0e5849ca2b19_task-scheduler-1] c.m.c.myApp.lock.BaseLockService.unlock - Can't unlock lock #com.myApp.lock.BaseLockService$LockableHandle@4ca8ddab
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /myapp/lock/files/1376112
at org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:873)
at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:274)
at org.apache.curator.framework.imps.DeleteBuilderImpl$5.call(DeleteBuilderImpl.java:268)
at org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64)
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100)
at org.apache.curator.framework.imps.DeleteBuilderImpl.pathInForeground(DeleteBuilderImpl.java:265)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:249)
at org.apache.curator.framework.imps.DeleteBuilderImpl.forPath(DeleteBuilderImpl.java:34)
at com.myApp.lock.BaseLockService.unlock(BaseLockService.java:174)
at com.myApp.lock.BaseLockService.lambda$unlockAllIDs$0(BaseLockService.java:143)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.myApp.lock.BaseLockService.unlockAllIDs(BaseLockService.java:139)
Here is my Unit Test to replicate the situation:
@Test
public void baseLockTest() {
List<Lockable> filesToProcess = new ArrayList<>();
//For now only 1 to limit complexity
Lockable fileToLock = FileSource.builder()
.id(1)
.build();
filesToProcess.add(fileToLock);
Runnable task = () -> {
log.info("ATTEMPT LOCK");
Set<BaseLockService.LockableHandle> lockedBatch = lockService.lockBatch(filesToProcess, 1);
if (!lockedBatch.isEmpty()) {
try {
log.info("ATTEMPT FAKE PROCESS TIME SLEEP 100 MS");
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("ATTEMPT UNLOCK");
lockService.unlockAll(lockedBatch);
}
};
System.out.println("**********************************************************");
//Simulate two Service instances of 1 thread
int totalThreads = 2;
ExecutorService executorService = Executors.newFixedThreadPool(totalThreads);
List<Future> locksProcessed = new ArrayList<>(totalThreads);
for (int i = 0; i < 1000; i++) {
locksProcessed.add(executorService.submit(task));
}
Future f;
while(!locksProcessed.isEmpty()){
Iterator<Future> iterator = locksProcessed.iterator();
while(iterator.hasNext()){
f = iterator.next();
if(f.isDone()){
iterator.remove();
}
}
}
System.out.println("ALL DONE!!!");
}
Here is the lock and unlock methods within the BaseLockService:
public Set<LockableHandle> lockBatch(final List<Lockable> desiredLock, final int batchSize) {
Set<LockableHandle> effectivelyLocked = new HashSet<>();
Iterator<Lockable> desiredLockIterator = desiredLock.iterator();
while ((desiredLockIterator.hasNext()) && (effectivelyLocked.size() <= batchSize)) {
Lockable toLock = desiredLockIterator.next();
String lockPath = ZKPaths.makePath(getLockPath(), String.valueOf(toLock.getId()));
InterProcessMutex lock = createMutex(lockPath);
try {
if (lock.acquire(0, TimeUnit.SECONDS)) {
LockableHandle handle = new LockableHandle(toLock, lock);
effectivelyLocked.add(handle);
locks.put(handle.getId(), handle);
} else {
log.warn(String.format("Object was not locked. Object id is %d, lock path is %s.",
toLock.getId(),
lockPath));
}
} catch (Exception e) {
log.error("Cannot lock path " + lockPath, e);
}
}
log.info(String.format("%d object(s) were requested to lock. %d were effectively locked.",
desiredLock.size(),
effectivelyLocked.size()));
return effectivelyLocked;
}
public void unlock(final LockableHandle lockHandle) {
boolean success = false;
try {
InterProcessMutex lock = lockHandle.getMutex();
if (lock != null) {
lock.release();
client.delete()
.deletingChildrenIfNeeded()
.forPath(ZKPaths.makePath(getLockPath(), String.valueOf(lockHandle.getId())));
success = true;
}
} catch (Exception e) {
log.error("Can't unlock lock #" + lockHandle, e);
} finally {
locks.remove(lockHandle.getId());
}
log.info(String.format("The lock #%d was requested to be unlocked. Success = %b",
lockHandle.getId(),
success));
}
This is the init() method that is called after the service is instantiated:
public void init() {
log.info("Stating initialization of the Lock Service");
locks = new HashMap<>();
client = createClient();
client.start();
try {
client.blockUntilConnected();
if (client.isZk34CompatibilityMode()) {
log.info("The Curator Framework is running in ZooKeeper 3.4 compatibility mode.");
}
} catch (InterruptedException ie) {
log.error("Cannot connect to ZooKeeper.", ie);
}
log.info("Completed initialization of the Lock Service");
}
I'm not sure what is missing, but all out of options. Thanks for any comments/suggestions
I found a number of issues in the Locking Issue Example you sent. It could be these are particular to the example but if these are also in your code it will explain the problems you're seeing.
locks
field in BaseLockService
should be a ConcurrentHashMap
BaseLockService#unlock
is trying to clean up the lock path by calling client.delete()...
. This cannot work. There is an inherent race in this kind of code and is why Curator has the "Reaper" classes and also why I pushed Container Nodes into Zookeeper 3.5.x. Notice that it is this line of code that's producing the NoNode
exception and not the Curator lock code. I suggest you get rid of that code and just not worry about it or move to Zookeeper 3.5.x.BaseLockService
should keep re-creating the InterProcessMutex
. It should keep a map of them or something.When I applied 1-3 above the test passes successfully (I tried multiple times). I've opened a PR on your test project with the 3 changes.