apache-zookeeperapache-curator

Why doesn't Apache Curator fire all updates?


Please run the following against your Zookeeper server(s) after creating a blank /test/a path.

import static java.lang.String.valueOf;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryForever;

public class CacheUpdateTest {
    static final String connectString = "127.0.0.1:2181,127.0.0.1:2191,127.0.0.1:2201";
    static volatile boolean stop = false;

    public static void main(String[] args) throws Exception {
        new Listener().start();
        Thread.sleep(1000);
        new Updater().start();
    }

    private static class Listener extends Thread {
        @SuppressWarnings("resource")
        @Override
        public void run() {
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectString).retryPolicy(new RetryForever(100)).build();
            client.start();

            PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
            cache.getListenable().addListener(new PathChildrenCacheListener() {

                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                    if (event.getData() == null || event.getData().getData() == null) return;
                    int newI = Integer.parseInt(new String(event.getData().getData()));
                    System.err.println("Sensed update: " + newI);
                }
            });
            try {
                cache.start(StartMode.BUILD_INITIAL_CACHE);
                Thread.sleep(2000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static class Updater extends Thread {
        @Override
        public void run() {
            try {
                CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectString).retryPolicy(new RetryForever(100)).build();
                client.start();

                for (int i = 0; i < 10; i++) {
                    // Thread.sleep(100);
                    System.out.println("Updated child: " + i);
                    client.setData().forPath("/test/a", valueOf(i).getBytes());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

If I uncomment the Thread.sleep(100) line, I usually get the following output

Updated child: 0
Sensed update: 0
Updated child: 1
Sensed update: 1
Updated child: 2
Sensed update: 2
Updated child: 3
Sensed update: 3
Updated child: 4
Sensed update: 4
Updated child: 5
Sensed update: 5
Updated child: 6
Sensed update: 6
Updated child: 7
Sensed update: 7
Updated child: 8
Sensed update: 8
Updated child: 9
Sensed update: 9

And I get the following output when I comment it

Updated child: 0
Updated child: 1
Sensed update: 1 --> Missed 0
Updated child: 2
Updated child: 3
Updated child: 4
Sensed update: 3 --> Missed 2
Updated child: 5
Updated child: 6
Sensed update: 5 --> Missed 4
Updated child: 7
Updated child: 8
Sensed update: 7 --> Missed 6
Updated child: 9
Sensed update: 9 --> Missed 8

Why am I not always getting all notifications ? And why didn't I miss the first one ?


Solution

  • Curator is a library built to make easier to work with Apache Zookeeper. The way PathChildrenCache works is using ZK Watchers.

    A Watcher creates a one time Watch. If a Watcher is notified about a change (or any other operation it was subscribed to), then the Watch is consumed by the Watcher and it must creates again a new Watch to continue being notified in the future.

    In your case PathChildrenCache is looking for changes in a node. The way it works is to wait until it receives a notification from ZK and the recreate the Watch to continue looking for further changes.

    Since everything is asynchronous the data could have changed many times before you get notified for a change. That's why when you set a delay in the Updater you can see all changes, because the cache has time enough to detect a change and recreate the Watch before a new setData is called. When you omit the sleep thing happens so quickly that the cache miss some events.

    For further reading please check the official documentation about watchers, mainly this section:

    Because watches are one time triggers and there is latency between getting the event and sending a new request to get a watch you cannot reliably see every change that happens to a node in ZooKeeper. Be prepared to handle the case where the znode changes multiple times between getting the event and setting the watch again. (You may not care, but at least realize it may happen.)