1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| package com.will.zk;
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.KeeperException.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException;
public class ZKBarrier<T> implements AsyncCallback.StatCallback { private static final Logger LOGGER = LoggerFactory.getLogger(ZKBarrier.class);
private final ZooKeeper zk; private final String path; private final CompletableFuture<?> future;
private boolean needWatch;
public ZKBarrier(ZooKeeper zk, String path, CompletableFuture<?> future) { this.zk = zk; this.path = path; this.future = future; this.needWatch = true; }
public void watchBarrier() { zk.exists(path, this::process, this, future); }
private void process(WatchedEvent event) { LOGGER.info("接收到zk事件回调"); if (needWatch) { watchBarrier(); } if (event.getType() == Watcher.Event.EventType.NodeDeleted) { LOGGER.info("node被删除"); needWatch = false; } }
@Override public void processResult(int rc, String path, Object ctx, Stat stat) { Code code = Code.get(rc);
if (code == Code.NONODE) { CompletableFuture<?> future = ctx instanceof CompletableFuture<?> ? (CompletableFuture<?>) ctx: null; if (future != null) { future.complete(null); } } }
private static Thread createThread(ZooKeeper zk, String path, CountDownLatch latch) { Thread t = new Thread(() -> { CompletableFuture<Integer> future = new CompletableFuture<>(); ZKBarrier<Integer> barrier = new ZKBarrier<>(zk, path, future); LOGGER.info("watch barrier"); barrier.watchBarrier(); try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } catch (ExecutionException e) { e.printStackTrace(); }
LOGGER.info("barrier missing, begin to work"); latch.countDown(); });
t.setDaemon(true);
return t; }
public static void main(String[] args) throws InterruptedException, KeeperException, IOException { String servers = "localhost:2181"; int timeout = 1000; String path = "/barriers/barrier-1";
ZooKeeper zk = new ZooKeeper(servers, timeout, event -> {}); ZKOperator zkOperator = new ZKOperator(servers, timeout); zkOperator.create(path, "".getBytes(StandardCharsets.UTF_8));
CountDownLatch latch = new CountDownLatch(2); createThread(zk, path, latch).start(); createThread(zk, path, latch).start();
latch.await(); } }
|