?创建一个maven项目,pom.xml如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- zk依赖包 --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <!-- 单元测试包 --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
在包com.sorpion下面创建一个类:ZkClient.java
package com.zookeeper; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.List; import java.util.concurrent.TimeUnit; public class ZkClient { private ZooKeeper zkCli; private static final String CONNECT_STRING = "node1:2181,node2:2181,node3:2181"; private static final int SESSION_TIMEOUT = 5000; @Before public void before() throws IOException { zkCli = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, e -> { System.out.println("watcher默认回调"); }); } /** * 获取节点列 表 * * @throws KeeperException * @throws InterruptedException */ @Test public void ls() throws KeeperException, InterruptedException { // 回调默认的回调函数 List<String> nodes = zkCli.getChildren("/", true); for (String node : nodes) { System.out.println(node); } System.out.println("=========================================="); // 回调我们自己的细粒度的回调函数 nodes = zkCli.getChildren("/", e -> { System.out.println("watcher细粒度的回调"); }); for (String node : nodes) { System.out.println(node); } // 程序停在这里然后执行下面的create方法,会触发回调监听,输出: //watcher默认回调 //watch细粒度的回调 Thread.sleep(Long.MAX_VALUE); } /** * 创建节点 * <p> * OPEN_ACL_UNSAFE 任何人都可以访问的权限 * CreateMode 永久节点、序列节点、临时节点的组合 * * @throws KeeperException * @throws InterruptedException */ @Test public void create() throws KeeperException, InterruptedException { String path = zkCli.create("/myNode", "nodeData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(path); } /** * 获取节点数据 * * @throws KeeperException * @throws InterruptedException */ @Test public void get() throws KeeperException, InterruptedException { byte[] data = zkCli.getData("/myNode", false, new Stat()); System.out.println(new String(data)); } /** * 设置节点值 * 注意第三个参数version必须是当前版本,不能错,错了设置会报错。 * 所以最好好的方式是先获取版本信息,或者直接设置version为-1即可设置当前值 * * @throws KeeperException * @throws InterruptedException */ @Test public void set() throws KeeperException, InterruptedException { Stat stat = zkCli.exists("/myNode", false); if (null != stat) { System.out.println("version1=" + stat.getVersion()); Stat newStat = zkCli.setData("/myNode", "666".getBytes(), stat.getVersion()); System.out.println("version2=" + stat.getVersion()); // 修改后版本增加1 System.out.println("version3=" + newStat.getVersion()); } //version指定为-1就修改当前值 // Stat stat = zkCli.setData("/myNode", "444".getBytes(), -1); // System.out.println(stat.getDataLength()); } /** * 查看节点状态,通过判断是否存在获取 * * @throws KeeperException * @throws InterruptedException */ @Test public void stat() throws KeeperException, InterruptedException { Stat stat = zkCli.exists("/myNode", true); if (null == stat) { System.out.println("节点不存在"); } else { System.out.println("dateLength=" + stat.getDataLength()); System.out.println("version=" + stat.getVersion()); System.out.println("createTime=" + stat.getCtime()); } } /** * 只能删除没有子节点的节点 * * @throws KeeperException * @throws InterruptedException */ @Test public void delete() throws KeeperException, InterruptedException { Stat exists = zkCli.exists("/myNode", false); if (null != exists) { zkCli.delete("/myNode", exists.getVersion()); } else { System.out.println("节点不存在"); } // zkCli.delete("/myNode", -1); } /** * 递归删除带有子节点的节点 * * @throws KeeperException * @throws InterruptedException */ @Test public void rmr() throws KeeperException, InterruptedException { zkCli.create("/myNode", "nodeData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zkCli.create("/myNode/n1", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zkCli.create("/myNode/n2", "2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zkCli.create("/myNode/n3", "3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); revertDelete("/myNode"); } public void revertDelete(String path) throws KeeperException, InterruptedException { Stat exists = zkCli.exists(path, false); if (null != exists) { List<String> children = zkCli.getChildren(path, false); String childPath; for (String child : children) { if (path.equals("/")) { childPath = path + child; } else { childPath = path + "/" + child; } revertDelete(childPath); } System.out.println(path); zkCli.delete(path, -1); } else { System.out.println(path + "不存在"); } } /** * 循环注册监听节点变化,默认监听后触发监听就会将监听移除,如果要需要监听还要在执行一次监听 */ @Test public void watch() throws InterruptedException, KeeperException { zkCli.create("/myNode", "nodeData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); register("/myNode"); System.out.println("===================================="); // 测试监听到变化,下面会打印四次监听到变化 zkCli.setData("/myNode", "1".getBytes(), -1); TimeUnit.SECONDS.sleep(1); zkCli.setData("/myNode", "2".getBytes(), -1); TimeUnit.SECONDS.sleep(1); zkCli.setData("/myNode", "3".getBytes(), -1); TimeUnit.SECONDS.sleep(1); zkCli.delete("/myNode", -1); TimeUnit.SECONDS.sleep(2); } public void register(String path) { try { zkCli.exists(path, e -> { register(path); }); System.out.println("监听到变化"); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
原文:https://www.cnblogs.com/wkaca7114/p/java-zk.html