首页 > 编程语言 > 详细

java操作ZooKeeper

时间:2021-02-24 10:18:11      阅读:33      评论:0      收藏:0      [点我收藏+]

?创建一个maven项目,pom.xml如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- zk依赖包 -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>

        <!-- 单元测试包 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

 

在包com.sorpion下面创建一个类:ZkClient.java

package com.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class ZkClient {

    private ZooKeeper zkCli;
    private static final String CONNECT_STRING = "node1:2181,node2:2181,node3:2181";
    private static final int SESSION_TIMEOUT = 5000;

    @Before
    public void before() throws IOException {
        zkCli = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, e -> {
            System.out.println("watcher默认回调");
        });
    }

    /**
     * 获取节点列 表
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void ls() throws KeeperException, InterruptedException {
        // 回调默认的回调函数
        List<String> nodes = zkCli.getChildren("/", true);

        for (String node : nodes) {
            System.out.println(node);
        }

        System.out.println("==========================================");

        // 回调我们自己的细粒度的回调函数
        nodes = zkCli.getChildren("/", e -> {
            System.out.println("watcher细粒度的回调");
        });

        for (String node : nodes) {
            System.out.println(node);
        }

        // 程序停在这里然后执行下面的create方法,会触发回调监听,输出:
        //watcher默认回调
        //watch细粒度的回调
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 创建节点
     * <p>
     * OPEN_ACL_UNSAFE 任何人都可以访问的权限
     * CreateMode 永久节点、序列节点、临时节点的组合
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void create() throws KeeperException, InterruptedException {
        String path = zkCli.create("/myNode", "nodeData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(path);
    }

    /**
     * 获取节点数据
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void get() throws KeeperException, InterruptedException {
        byte[] data = zkCli.getData("/myNode", false, new Stat());
        System.out.println(new String(data));
    }

    /**
     * 设置节点值
     * 注意第三个参数version必须是当前版本,不能错,错了设置会报错。
     * 所以最好好的方式是先获取版本信息,或者直接设置version为-1即可设置当前值
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void set() throws KeeperException, InterruptedException {
        Stat stat = zkCli.exists("/myNode", false);
        if (null != stat) {
            System.out.println("version1=" + stat.getVersion());
            Stat newStat = zkCli.setData("/myNode", "666".getBytes(), stat.getVersion());
            System.out.println("version2=" + stat.getVersion());
            // 修改后版本增加1
            System.out.println("version3=" + newStat.getVersion());
        }

        //version指定为-1就修改当前值
//        Stat stat = zkCli.setData("/myNode", "444".getBytes(), -1);
//        System.out.println(stat.getDataLength());
    }

    /**
     * 查看节点状态,通过判断是否存在获取
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void stat() throws KeeperException, InterruptedException {
        Stat stat = zkCli.exists("/myNode", true);
        if (null == stat) {
            System.out.println("节点不存在");
        } else {
            System.out.println("dateLength=" + stat.getDataLength());
            System.out.println("version=" + stat.getVersion());
            System.out.println("createTime=" + stat.getCtime());
        }
    }

    /**
     * 只能删除没有子节点的节点
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void delete() throws KeeperException, InterruptedException {
        Stat exists = zkCli.exists("/myNode", false);
        if (null != exists) {
            zkCli.delete("/myNode", exists.getVersion());
        } else {
            System.out.println("节点不存在");
        }

//        zkCli.delete("/myNode", -1);
    }

    /**
     * 递归删除带有子节点的节点
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    @Test
    public void rmr() throws KeeperException, InterruptedException {
        zkCli.create("/myNode", "nodeData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zkCli.create("/myNode/n1", "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zkCli.create("/myNode/n2", "2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zkCli.create("/myNode/n3", "3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        revertDelete("/myNode");
    }

    public void revertDelete(String path) throws KeeperException, InterruptedException {
        Stat exists = zkCli.exists(path, false);
        if (null != exists) {
            List<String> children = zkCli.getChildren(path, false);
            String childPath;
            for (String child : children) {
                if (path.equals("/")) {
                    childPath = path + child;
                } else {
                    childPath = path + "/" + child;
                }
                revertDelete(childPath);
            }
            System.out.println(path);
            zkCli.delete(path, -1);
        } else {
            System.out.println(path + "不存在");
        }
    }

    /**
     * 循环注册监听节点变化,默认监听后触发监听就会将监听移除,如果要需要监听还要在执行一次监听
     */
    @Test
    public void watch() throws InterruptedException, KeeperException {
        zkCli.create("/myNode", "nodeData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        register("/myNode");
        System.out.println("====================================");
        // 测试监听到变化,下面会打印四次监听到变化
        zkCli.setData("/myNode", "1".getBytes(), -1);
        TimeUnit.SECONDS.sleep(1);

        zkCli.setData("/myNode", "2".getBytes(), -1);
        TimeUnit.SECONDS.sleep(1);

        zkCli.setData("/myNode", "3".getBytes(), -1);
        TimeUnit.SECONDS.sleep(1);

        zkCli.delete("/myNode", -1);
        TimeUnit.SECONDS.sleep(2);
    }

    public void register(String path) {
        try {
            zkCli.exists(path, e -> {
                register(path);
            });
            System.out.println("监听到变化");
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

java操作ZooKeeper

原文:https://www.cnblogs.com/wkaca7114/p/java-zk.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!