springspring-bootapache-zookeeperapache-curator

Apache Curator + Spring Boot: Simple Observer pattern example


I am trying to start a basic project structure where multiple spring boot application will share resources using apache curator.

I am following the guides specified in documentation but changing the nodes doesnt trigger any events

Please, any help would be appreciated

pom.xml

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

docker-compose.yaml

version: '3.1'

services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181

Creator

package com.training.zoo.sss;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.System.out;

@Service
public class Client {
    String connectionInfo = "127.0.0.1:2181";
    String ZK_PATH = "/someapp/somemodule/someroute";

    public Client() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectionInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .namespace("base")
                        .build();
        client.start();

        Stat stat1 = client.checkExists().creatingParentContainersIfNeeded().forPath(ZK_PATH);
        if (stat1 == null) {
            client.create().forPath(ZK_PATH, "sometdata".getBytes());
        }

        byte[] bytes = client.getData().forPath(ZK_PATH);
        out.println(new String(bytes, StandardCharsets.UTF_8));

        // Update value every half second
        final AtomicInteger i = new AtomicInteger(0);
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1);
        exec.scheduleAtFixedRate(new Runnable(){
            @Override
            public void run(){
                i.set(i.get()+1);
                System.out.println(i);
                try {
                    client.setData().forPath(ZK_PATH, ("init_" + i ).getBytes());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
}

Listener

package com.training.bookstore.request;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.stereotype.Service;

@Service
public class Watcher2 {
    String connectionInfo = "127.0.0.1:2181";
    String ZK_PATH = "/someapp/somemodule/someroute";

    public Watcher2() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectionInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .namespace("base")
                        .build();
        client.start();

        PathChildrenCache watcher = new PathChildrenCache(
                client, ZK_PATH, true    // if cache data
        );

        watcher.getListenable().addListener((client1, event) -> {
            ChildData data = event.getData();
            if (data == null) {
                System.out.println("No data in event[" + event + "]");
            } else {
                System.out.println("Receive event: "
                        + "type=[" + event.getType() + "]"
                        + ", path=[" + data.getPath() + "]"
                        + ", data=[" + new String(data.getData()) + "]"
                        + ", stat=[" + data.getStat() + "]");
            }
        });
        watcher.start(PathChildrenCache.StartMode.NORMAL);
        System.out.println("Register zk watcher successfully!");
    }
}

Thank you


Solution

  • So yeah that class name PathChildrenCache sounded a bit off to me.

    Solution is If producer produces on specified path

        String connectionInfo = "127.0.0.1:2181";
        String PATH = "/someapp/somemodule/whatever";
    

    In Watcher class set path to "parent" of that node

        String connectionInfo = "127.0.0.1:2181";
        String PATH = "/someapp/somemodule";
    

    And in case you need to listen to subnodes/subfolders of your producer path, instead of using PathChildrenCache use TreeCache