本片介绍两方面内容,一方面是命令行操作,另一方面是Java调用API。
ZooKeeper集群环境的搭建在Hadoop集群搭建里已经讲过了,这里不再赘述,本篇内容基于zk3.5.8。
这里补充一点,除了前面提到的leader和follower之外,zk还有一种角色叫observer。可以这么理解,follower是候选人,leader是当前的领导者,observer是普通人。leader只能从follower中选出,observer不参与选举但可以提供读服务。
observer的配置,只需要在zoo.cfg中增加server.1=server01:2888:3888:observer就行,可以在不影响写性能的前提下扩展读并发。
首先启动zk集群,进入命令行。
[hadoop@server01 ~]$ zkServer.sh start /bin/java ZooKeeper JMX enabled by default Using config: /usr/apache-zookeeper-3.5.8/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [hadoop@server01 ~]$ zkServer.sh status /bin/java ZooKeeper JMX enabled by default Using config: /usr/apache-zookeeper-3.5.8/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower [hadoop@server01 ~]$ zkCli.sh
官网说From the shell, type help
to get a listing of commands that can be executed from the client,我试着输入help命令,命令列表确实输出了,但是注意看最后一行。
[zk: server01:2181(CONNECTED) 0] help ZooKeeper -server host:port cmd args addauth scheme auth close config [-c] [-w] [-s] connect host:port create [-s] [-e] [-c] [-t ttl] path [data] [acl] delete [-v version] path deleteall path delquota [-n|-b] path get [-s] [-w] path getAcl [-s] path history listquota path ls [-s] [-w] [-R] path ls2 path [watch] printwatches on|off quit reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*] redo cmdno removewatches path [-c|-d|-a] [-l] rmr path set [-s] [-v version] path data setAcl [-s] [-v version] [-R] path acl setquota -n|-b val path stat [-w] path sync path Command not found: Command not found help
最后一行提示根本就没有help这个命令。官网这不是误导人嘛。
先试试ls命令,我这个zk已经有hadoop和yarn两个集群用来选主了,所有有它们的目录。
[zk: server01:2181(CONNECTED) 1] ls / [hadoop-ha, yarn-leader-election, zookeeper]
创建一个目录,并给它赋值。再把值取出来。
[zk: server01:2181(CONNECTED) 2] create /game "this is a game directory" Created /game [zk: server01:2181(CONNECTED) 3] get /game this is a game directory [zk: server01:2181(CONNECTED) 4] ls / [game, hadoop-ha, yarn-leader-election, zookeeper]
现在有没有感性认识了?这个目录树的非叶子可以存储数据,是一个键值对。
[zk: server01:2181(CONNECTED) 7] create /game/pal "XianJianXiLie" Created /game/pal [zk: server01:2181(CONNECTED) 8] get /game/pal XianJianXiLie [zk: server01:2181(CONNECTED) 9] stat /game/pal cZxid = 0xd0000000a ctime = Wed Mar 10 22:16:50 CST 2021 mZxid = 0xd0000000a mtime = Wed Mar 10 22:16:50 CST 2021 pZxid = 0xd0000000a cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 13 numChildren = 0
把/game/pal的值修改成李逍遥,注意观察,mZxid和dataVersion 加1了,数据长度变了,其他属性没变化。
[zk: server01:2181(CONNECTED) 10] set /game/pal "LiXiaoYao" [zk: server01:2181(CONNECTED) 11] stat /game/pal cZxid = 0xd0000000a ctime = Wed Mar 10 22:16:50 CST 2021 mZxid = 0xd0000000b mtime = Wed Mar 10 22:18:08 CST 2021 pZxid = 0xd0000000a cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 9 numChildren = 0 [zk: server01:2181(CONNECTED) 12] stat /game cZxid = 0xd00000009 ctime = Wed Mar 10 22:09:04 CST 2021 mZxid = 0xd00000009 mtime = Wed Mar 10 22:09:04 CST 2021 pZxid = 0xd0000000a cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 24 numChildren = 1
现在把znode删掉,它提示不能删除非空目录。我们还有个deleteall命令,这个可以级联删除。
[zk: server01:2181(CONNECTED) 13] delete /game Node not empty: /game [zk: server01:2181(CONNECTED) 14] deleteall /game [zk: server01:2181(CONNECTED) 15] ls / [hadoop-ha, yarn-leader-election, zookeeper]
上面创建的是永久节点。还可以用-e参数创建临时节点,当客户端断开连接后,节点自动消失。用-s创建序列节点,zk自动在节点名后面补十位递增数字。
[zk: server01:2181(CONNECTED) 4] create -e /game/3DO Created /game/3DO [zk: server01:2181(CONNECTED) 5] ls /game [3DO] [zk: server01:2181(CONNECTED) 6] create -s /game/3DO Created /game/3DO0000000001 [zk: server01:2181(CONNECTED) 7] ls /game [3DO, 3DO0000000001]
上面的命令在server01的客户端上创建了一个临时节点/game/3DO和一个序列节点/game/3DO0000000001,从server02上再连一个客户端看看。
[zk: server02:2181(CONNECTED) 0] ls /game [3DO, 3DO0000000001]
server02上这两个目录也存在。现在把server01客户端断开连接。
[zk: server01:2181(CONNECTED) 8] quit WATCHER:: WatchedEvent state:Closed type:None path:null 2021-03-11 10:39:09,535 [myid:] - INFO [main:ZooKeeper@1422] - Session: 0x10000138b4a0001 closed 2021-03-11 10:39:09,537 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0x10000138b4a0001
从server02上再看看这个目录下有什么。
[zk: server02:2181(CONNECTED) 1] ls /game [3DO0000000001]
可以看到临时节点/game/3DO消失了。
在pom.xml文件里增加配置
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.8</version> </dependency>
先试试创建持久化节点。所有代码如下。
import org.apache.zookeeper.*; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class ZooKeeperTest { ZooKeeper client = null; @Before public void init() throws InterruptedException, IOException { String url = "server01:2181,server02:2181,server03:2181"; CountDownLatch latch = new CountDownLatch(1); client = new ZooKeeper(url, 5000, (event) -> {if(event.getState() == Watcher.Event.KeeperState.SyncConnected) latch.countDown();}); latch.await(); } @Test public void createZnode() throws KeeperException, InterruptedException { //持久化节点 String znode1 = client.create("/game/ThreeKingdom","The story of many heros".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @After public void destroy() throws InterruptedException { if(client != null) client.close(); } }
在初始化方法里,创建了zk的客户端,但是这个客户端并不是new ZooKeeper( )返回之后就立马可用,必须等到连接创建完毕后才能用。所以用到了Java的CountDownLatch,在watch事件状态为连接成功之后主线程才被唤醒。
执行createZnode( )方法之后,控制台输出如下
Process finished with exit code 0
通过命令行看看目录创建成功没有
[zk: server02:2181(CONNECTED) 4] ls /game [3DO0000000001, ThreeKingdom] [zk: server02:2181(CONNECTED) 5] get -s /game/ThreeKingdom The story of many heros cZxid = 0x250000000d ctime = Thu Mar 11 11:51:04 CST 2021 mZxid = 0x250000000d mtime = Thu Mar 11 11:51:04 CST 2021 pZxid = 0x250000000d cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 23 numChildren = 0
@Test public void deleteZnode() throws KeeperException, InterruptedException { //删除节点,这里的version参数必须与zk服务器最新的版本号一直,否则会抛异常BadVersionException。 //版本号是为了控制分布式更新的并发问题。如果不关心版本号,可以设置为-1 client.delete("/game/ThreeKingdom", -1); }
删除之后再通过命令行看看,/game下已经没有三国的目录了。
[zk: server02:2181(CONNECTED) 8] ls /game
[3DO0000000001]
@Test public void setZnode() throws KeeperException, InterruptedException { // final Stat stat = client.setData("/game", "childhood memory".getBytes(), -1); System.out.println(stat.toString()); }
控制台输出结果:
158913789957,158913789974,1615430142891,1615444561053,1,5,0,0,16,1,158913789971
Process finished with exit code 0
来看看setData返回的Stat包含哪些字段信息
public String toString() { try { ByteArrayOutputStream s = new ByteArrayOutputStream(); CsvOutputArchive a_ = new CsvOutputArchive(s); a_.startRecord(this, ""); a_.writeLong(this.czxid, "czxid"); a_.writeLong(this.mzxid, "mzxid"); a_.writeLong(this.ctime, "ctime"); a_.writeLong(this.mtime, "mtime"); a_.writeInt(this.version, "version"); a_.writeInt(this.cversion, "cversion"); a_.writeInt(this.aversion, "aversion"); a_.writeLong(this.ephemeralOwner, "ephemeralOwner"); a_.writeInt(this.dataLength, "dataLength"); a_.writeInt(this.numChildren, "numChildren"); a_.writeLong(this.pzxid, "pzxid"); a_.endRecord(this, ""); return new String(s.toByteArray(), "UTF-8"); } catch (Throwable var3) { var3.printStackTrace(); return "ERROR"; } }
通过命令行获取/game的状态信息,与IDE控制台返回的一致
[zk: server02:2181(CONNECTED) 10] get -s /game childhood memory cZxid = 0x2500000005 ctime = Thu Mar 11 10:35:42 CST 2021 mZxid = 0x2500000016 mtime = Thu Mar 11 14:36:01 CST 2021 pZxid = 0x2500000013 cversion = 5 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 16 numChildren = 1
@Test public void getZnode() throws KeeperException, InterruptedException { // final byte[] data = client.getData("/game", false, new Stat()); System.out.println(new String(data)); }
控制台输出结果
childhood memory
Process finished with exit code 0
演示一下对一个已经存在的节点注册监听。先定义一个监听类,监听节点改变事件。
public class ZnodeChangeWatcher implements Watcher { @Override public void process(WatchedEvent watchedEvent) { if(Event.EventType.NodeDataChanged == watchedEvent.getType()){ System.out.println("znode:"+watchedEvent.getPath()+"发生变化"); } } }
再添加一个测试方法,监听/game/ThreeKingdom节点。
@Test public void watchZnode() throws KeeperException, InterruptedException { // Watcher watcher = new ZnodeChangeWatcher(); client.exists("/game/ThreeKingdom",watcher); Thread.sleep(Integer.MAX_VALUE); }
通过命令行修改这个节点的值,先看看原值是The story of many heros,修改为The Story of three heros。
[zk: localhost:2181(CONNECTED) 1] get -s /game/ThreeKingdom The story of many heros cZxid = 0xf00000005 ctime = Sat Mar 13 22:43:41 CST 2021 mZxid = 0xf00000005 mtime = Sat Mar 13 22:43:41 CST 2021 pZxid = 0xf00000005 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 23 numChildren = 0 [zk: localhost:2181(CONNECTED) 2] set /game/ThreeKingdom "The Story of three heros"
切换到IDE控制台,看看输出结果。
znode:/game/ThreeKingdom发生变化
原文:https://www.cnblogs.com/burningblade/p/14489493.html