I am trying to start a basic project structure where multiple spring boot application will share resources using apache curator.
I am following the guides specified in documentation but changing the nodes doesnt trigger any events
Please, any help would be appreciated
pom.xml
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
docker-compose.yaml
version: '3.1'
services:
zoo1:
image: zookeeper
restart: always
hostname: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo2:
image: zookeeper
restart: always
hostname: zoo2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo3:
image: zookeeper
restart: always
hostname: zoo3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
Creator
package com.training.zoo.sss;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.System.out;
@Service
public class Client {
String connectionInfo = "127.0.0.1:2181";
String ZK_PATH = "/someapp/somemodule/someroute";
public Client() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
Stat stat1 = client.checkExists().creatingParentContainersIfNeeded().forPath(ZK_PATH);
if (stat1 == null) {
client.create().forPath(ZK_PATH, "sometdata".getBytes());
}
byte[] bytes = client.getData().forPath(ZK_PATH);
out.println(new String(bytes, StandardCharsets.UTF_8));
// Update value every half second
final AtomicInteger i = new AtomicInteger(0);
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1);
exec.scheduleAtFixedRate(new Runnable(){
@Override
public void run(){
i.set(i.get()+1);
System.out.println(i);
try {
client.setData().forPath(ZK_PATH, ("init_" + i ).getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}, 0, 500, TimeUnit.MILLISECONDS);
}
}
Listener
package com.training.bookstore.request;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.stereotype.Service;
@Service
public class Watcher2 {
String connectionInfo = "127.0.0.1:2181";
String ZK_PATH = "/someapp/somemodule/someroute";
public Watcher2() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
PathChildrenCache watcher = new PathChildrenCache(
client, ZK_PATH, true // if cache data
);
watcher.getListenable().addListener((client1, event) -> {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
});
watcher.start(PathChildrenCache.StartMode.NORMAL);
System.out.println("Register zk watcher successfully!");
}
}
Thank you
So yeah that class name PathChildrenCache sounded a bit off to me.
Solution is If producer produces on specified path
String connectionInfo = "127.0.0.1:2181";
String PATH = "/someapp/somemodule/whatever";
In Watcher class set path to "parent" of that node
String connectionInfo = "127.0.0.1:2181";
String PATH = "/someapp/somemodule";
And in case you need to listen to subnodes/subfolders of your producer path,
instead of using PathChildrenCache
use TreeCache