
客户端构造
使用ZooKeeper客户端和ZooKeeper集群交互时主要使用啊org.apache.zookeeper.ZooKeeper类,此类常用构造器签名如下:
1
   | public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)throws IOException
   | 
 
ZooKeeper客户端建立连接为异步方式,其参数含义如下:
- connectString 逗号分隔的服务器列表,客户端会打乱顺序并逐个尝试连接直到可以成功连接
 
- sessionTimeout 会话超时时间,其区间范围为[2 * TickTime, 20 * TickTime]
 
- watcher 默认监听器,连接发生变化时将会调用此监听器的
process方法,有些异步方法指定watch参数为true时将使用此watcher监听节点变化 
使用ZooKeeper客户端必须谨慎处理连接状态变化,部分场景下客户端可以通过重连恢复会话,部分场景则必须手动重新建立连接,所以如果使用原生ZooKeeper客户端建议将ZooKeeper客户端包装一层再使用,在包装类里进行自动重连操作。
更简单的做法是用Apache Curator(https://curator.apache.org/index.html)框架。
ZooKeeper包装类示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
   | package com.will.zk;
  import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
  import java.io.Closeable; import java.io.IOException; import java.util.concurrent.CountDownLatch;
 
 
 
  public class ZKOperator implements Closeable, AutoCloseable, Watcher {     private static final Logger LOGGER = LoggerFactory.getLogger(ZKOperator.class);
      private final String servers;     private final int timeout;     private final CountDownLatch latch;
      private ZooKeeper zooKeeper;
      public ZKOperator(String servers, int timeout) {         this.servers = servers;         this.timeout = timeout;         this.latch = new CountDownLatch(1);
          try {             connect();         } catch (IOException e) {             LOGGER.error("failed to connect to servers: " + servers, e);         }
          try {             latch.await();         } catch (InterruptedException e) {             Thread.currentThread().interrupt();         }
          if (zooKeeper == null) {             throw new IllegalStateException("failed to init zookeeper connection");         }     }
      private void connect() throws IOException {         zooKeeper = new ZooKeeper(servers, timeout, this);     }
      private void reconnect() {         int retries = 0;         while (true) {             retries ++;             try {                 if (!zooKeeper.getState().equals(ZooKeeper.States.CLOSED)) {                     break;                 }
                  zooKeeper.close();                 LOGGER.info("ZooKeeper Connection Closed, Reconnect");                 connect();             } catch (InterruptedException e) {                 Thread.currentThread().interrupt();             }             catch (IOException e) {                 LOGGER.warn(String.format("failed to reconnect for %s times", retries));             }         }     }
      @Override     public void process(WatchedEvent event) {         Event.KeeperState state = event.getState();
          switch (state) {             case SyncConnected:                 LOGGER.info("ZooKeeper SyncConnected");                 latch.countDown();                 break;             case Expired:                 LOGGER.warn("ZooKeeper Session Expired, Reconnect");                 reconnect();                 break;             case Disconnected:                 LOGGER.warn("ZooKeeper Client Disconnected From Servers, Waiting For Auto-Reconnect");                 break;             default:                 LOGGER.info(String.format("ZooKeeper Client Current State: %s, Do Nothing", state));                 break;         }     }
      public ZooKeeper getZooKeeper() {         return zooKeeper;     }
      @Override     public void close() {         ZooKeeper copy = zooKeeper;         if (copy != null) {             try {                 copy.close();             } catch (InterruptedException e) {                 Thread.currentThread().interrupt();             }         }     }
      public static void main(String[] args) {         try(ZKOperator operator = new ZKOperator("localhost:2181", 1000)) {             System.out.println(operator.getZooKeeper().getSessionId());         }     } }
   | 
 
监听节点数据变化
ZooKeeper客户端监听数据变化方法为getData,一次只能监听一个节点,并且只触发一次,当数据变化事件触发之后需要重新注册watcher,此外当会话过期时watcher也不会恢复,需要重新注册。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
   | package com.will.zk;
  import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
  public abstract class ZNodeDataMonitor implements AsyncCallback.DataCallback {     private static final Logger LOGGER = LoggerFactory.getLogger(ZNodeDataMonitor.class);
      private final ZooKeeper zk;     private final String path;
      public ZNodeDataMonitor(ZooKeeper zk, String path) {         this.zk = zk;         this.path = path;     }
      protected void watchData() {         zk.getData(path, this::process, this, null);     }
      
 
 
      private void process(WatchedEvent event) {         if (event.getType() == Watcher.Event.EventType.NodeDataChanged                 || event.getState() == Watcher.Event.KeeperState.Expired) {             watchData();         }     }
      @Override     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {         Code code = Code.get(rc);
          if (code == Code.OK) {             try {                 onChange(path, data, stat.getVersion());             } catch (Exception e) {                 LOGGER.error("failed to handle data change: ", e);             }         }     }
      abstract void onChange(String path, byte[] data, int version); }
   | 
 
抽象类使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
   | package com.will.zk;
  import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
  import java.io.IOException; import java.util.concurrent.CountDownLatch;
  public class NumberNodeDataMonitor extends ZNodeDataMonitor {     private static final Logger LOGGER = LoggerFactory.getLogger(NumberNodeDataMonitor.class);
           private static final CountDownLatch latch = new CountDownLatch(3);
      public NumberNodeDataMonitor(ZooKeeper zk, String path) {         super(zk, path);     }
      @Override     void onChange(String path, byte[] data, int version) {         LOGGER.info(String.format("路径: %s 数据发生变化,新数据: %s, 版本号: %s", path, new String(data), version));         latch.countDown();     }
 
      public static void main(String[] args) throws InterruptedException, IOException {         String servers = "localhost:2181";         int timeout = 1000;         String path = "/numbers";         ZooKeeper zk = new ZooKeeper(servers, timeout, event -> {});         new NumberNodeDataMonitor(zk, path).watchData();         latch.await();     } }
   | 
 
监控子节点变化
ZooKeeper客户端方法getChildren可以接收到子节点创建和删除事件,子节点数据变化时不会通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
   | package com.will.zk;
  import org.apache.zookeeper.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
  import java.util.List;
  public abstract class ZNodeChildrenMonitor implements AsyncCallback.ChildrenCallback {     private static final Logger LOGGER = LoggerFactory.getLogger(ZNodeChildrenMonitor.class);
      private final ZooKeeper zk;     private final String path;
      public ZNodeChildrenMonitor(ZooKeeper zk, String path) {         this.zk = zk;         this.path = path;     }
      public void watchChildren(){         zk.getChildren(path, this::process, this, null);     }
      private void process(WatchedEvent event) {         if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged                 || event.getState() == Watcher.Event.KeeperState.Expired) {             watchChildren();         }     }
      @Override     public void processResult(int rc, String path, Object ctx, List<String> children) {         KeeperException.Code code = KeeperException.Code.get(rc);         if (code == KeeperException.Code.OK) {             try {                 onChange(path, children);             } catch (Exception e) {                 LOGGER.error("failed to handle children change: ", e);             }         }     }
      abstract void onChange(String path, List<String> childNodeNames); }
   | 
 
抽象类使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
   | package com.will.zk;
  import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
  import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch;
  public class NumberChildrenMonitor extends ZNodeChildrenMonitor {     private static final Logger LOGGER = LoggerFactory.getLogger(NumberChildrenMonitor.class);
      private static final CountDownLatch latch = new CountDownLatch(5);
      public NumberChildrenMonitor(ZooKeeper zk, String path) {         super(zk, path);     }
      @Override     void onChange(String path, List<String> childNodeNames) {         LOGGER.info(String.format("路径: %s 子节点发生变化, 当前节点数量: %s", path, childNodeNames.size()));         latch.countDown();     }
      public static void main(String[] args) throws InterruptedException, IOException {         String servers = "localhost:2181";         int timeout = 1000;         String path = "/numbers";         ZooKeeper zk = new ZooKeeper(servers, timeout, event -> {});
          NumberChildrenMonitor monitor = new NumberChildrenMonitor(zk, path);         monitor.watchChildren();         latch.await();     } }
   |