Zookeeper 是?个分布式协调服务的开源框架。 主要?来解决分布式集群中应?系统的?致性问题, 例如怎样避免同时操作同?数据造成脏读的问题。分布式系统中数据存在?致性的问题!!
Leader
Follower
此外,针对访问量?较?的 zookeeper 集群, 还可新增观察者??。
Observer
ZK也是Master/slave架构,但是与之前不同的是zk集群中的Leader不是指定?来,?是通过选举产?。
Zookeeper 节点类型可以分为三?类:
在开发中在创建节点的时候通过组合可以?成以下四种节点类型:持久节点、持久顺序节点、临时节 点、临时顺序节点。不同类型的节点则会有不同的?命周期
持久节点:是Zookeeper中最常?的?种节点类型,所谓持久节点,就是指节点被创建后会?直存在服 务器,直到删除操作主动清除
持久顺序节点:就是有顺序的持久节点,节点特性和持久节点是?样的,只是额外特性表现在顺序上。 顺序特性实质是在创建节点的时候,会在节点名后?加上?个数字后缀,来表示其顺序。
临时节点:就是会被?动清理掉的节点,它的?命周期和客户端会话绑在?起,客户端会话结束,节点 会被删除掉。与持久性节点不同的是,临时节点不能创建?节点。
临时顺序节点:就是有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后?加上数字 后缀
在ZooKeeper中,事务是指能够改变ZooKeeper服务器状态的操作,我们也称之为事务操作或更新 操作,?般包括数据节点创建与删除、数据节点内容更新等操作。对于每?个事务请求,ZooKeeper都 会为其分配?个全局唯?的事务ID,? ZXID 来表示,通常是?个 64 位的数字。每?个 ZXID 对应?次 更新操作,从这些ZXID中可以间接地识别出ZooKeeper处理这些更新操作请求的全局顺序 zk中的事务指的是对zk服务器状态改变的操作(create,update data,更新字节点);zk对这些事务操作都 会编号,这个编号是?增?的被称为ZXID。
#使?bin/zkCli.sh 连接到zk集群 [zk: localhost:2181(CONNECTED) 2] get /zookeeper cZxid = 0x0 ctime = Wed Dec 31 19:00:00 EST 1969 mZxid = 0x0 mtime = Wed Dec 31 19:00:00 EST 1969 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1
含义:
cZxid 就是 Create ZXID,表示节点被创建时的事务ID。 ctime 就是 Create Time,表示节点创建时间。 mZxid 就是 Modified ZXID,表示节点最后?次被修改时的事务ID。 mtime 就是 Modified Time,表示节点最后?次被修改的时间。 pZxid 表示该节点的?节点列表最后?次被修改时的事务 ID。只有?节点列表变更才会更新 pZxid, ?节点内容变更不会更新。 cversion 表示?节点的版本号。 dataVersion 表示内容版本号。 aclVersion 标识acl版本 ephemeralOwner 表示创建该临时节点时的会话 sessionID,如果是持久性节点那么值为 0 dataLength 表示数据?度。 numChildren 表示直系?节点数。
在 ZooKeeper 中,引?了 Watcher 机制来实现这种分布式的通知功能。ZooKeeper 允许客户端向服务 端注册?个 Watcher 监听,当服务端的?些指定事件触发了这个 Watcher,那么Zk就会向指定客户端 发送?个事件通知来实现分布式的通知功能。
Zookeeper的Watcher机制主要包括客户端线程、客户端WatcherManager、Zookeeper服务器三部 分。
具体?作流程为:
注: 客户端负责watch的注册 和回调,zk服务器负责处理watch。
创建顺序节点:create -s /zk-test 123 创建临时节点:create -e /zk-temp 123 创建永久节点:create /zk-permanent 123 读取节点:ls path 其中,path表示的是指定数据节点的节点路径 获取内容:get path 更新节点:set path data 删除节点: delete path
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency>
创建会话:
package com.hust.grid.leesf.zkclient.examples; import java.io.IOException; import org.I0Itec.zkclient.ZkClient; public class CreateSession { /* 创建?个zkClient实例来进?连接 */ public static void main(String[] args) { ZkClient zkClient = new ZkClient("127.0.0.1:2181"); System.out.println("ZooKeeper session created."); } }
创建节点:
package com.hust.grid.leesf.zkclient.examples; import org.I0Itec.zkclient.ZkClient; public class Create_Node_Sample { public static void main(String[] args) { ZkClient zkClient = new ZkClient("127.0.0.1:2181"); System.out.println("ZooKeeper session established."); //createParents的值设置为true,可以递归创建节点 zkClient.createPersistent("/lg-zkClient/lg-c1",true); System.out.println("success create znode."); } }
删除节点:
package com.hust.grid.leesf.zkclient.examples; import org.I0Itec.zkclient.ZkClient; public class Del_Data_Sample { public static void main(String[] args) throws Exception { String path = "/lg-zkClient/lg-c1"; ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000); zkClient.deleteRecursive(path); System.out.println("success delete znode."); } }
监听节点变化:
import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.ZkClient; import org.apache.zookeeper.client.ZooKeeperSaslClient; import java.util.List; /* 演示zkClient如何使?监听器 */ public class Get_Child_Change { public static void main(String[] args) throws InterruptedException { //获取到zkClient final ZkClient zkClient = new ZkClient("linux121:2181"); //zkClient对指定?录进?监听(不存在?录:/lg-client),指定收到通知之后的逻辑 //对/lag-client注册了监听器,监听器是?直监听 zkClient.subscribeChildChanges("/lg-client", new IZkChildListener() { //该?法是接收到通知之后的执?逻辑定义 public void handleChildChange(String path, List<String> childs) throws Exception { //打印节点信息 System.out.println(path + " childs changes ,current childs " + childs); } }); //使?zkClient创建节点,删除节点,验证监听器是否运? zkClient.createPersistent("/lg-client"); Thread.sleep(1000); //只是为了?便观察结果数据 zkClient.createPersistent("/lg-client/c1"); Thread.sleep(1000); zkClient.delete("/lg-client/c1"); Thread.sleep(1000); zkClient.delete("/lg-client"); Thread.sleep(Integer.MAX_VALUE); /* 1 监听器可以对不存在的?录进?监听 2 监听?录下?节点发?改变,可以接收到通知,携带数据有?节点列表 3 监听?录创建和删除本身也会被监听到 */ } }
执行结果:
/lg-zkClient ‘s child changed, currentChilds:[] /lg-zkClient ‘s child changed, currentChilds:[c1] /lg-zkClient ‘s child changed, currentChilds:[] /lg-zkClient ‘s child changed, currentChilds:null
注:
监听节点数据变化:
import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; //使?监听器监听节点数据的变化 public class Get_Data_Change { public static void main(String[] args) throws InterruptedException { // 获取zkClient对象 final ZkClient zkClient = new ZkClient("linux121:2181"); //设置?定义的序列化类型,否则会报错!! zkClient.setZkSerializer(new ZkStrSerializer()); //判断节点是否存在,不存在创建节点并赋值 final boolean exists = zkClient.exists("/lg-client1"); if (!exists) { zkClient.createEphemeral("/lg-client1", "123"); } //注册监听器,节点数据改变的类型,接收通知后的处理逻辑定义 zkClient.subscribeDataChanges("/lg-client1", new IZkDataListener() { public void handleDataChange(String path, Object data) throws Exception { //定义接收通知之后的处理逻辑 System.out.println(path + " data is changed ,new data " + data); } //数据删除--》节点删除 public void handleDataDeleted(String path) throws Exception { System.out.println(path + " is deleted!!"); } }); //更新节点的数据,删除节点,验证监听器是否正常运? final Object o = zkClient.readData("/lg-client1"); System.out.println(o); zkClient.writeData("/lg-client1", "new data"); Thread.sleep(1000); //删除节点 zkClient.delete("/lg-client1"); Thread.sleep(Integer.MAX_VALUE); } }
zk 自定义字符串序列化:
import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; public class ZkStrSerializer implements ZkSerializer { //序列化,数据--》byte[] public byte[] serialize(Object o) throws ZkMarshallingError { return String.valueOf(o).getBytes(); } //反序列化,byte[]--->数据 public Object deserialize(byte[] bytes) throws ZkMarshallingError { return new String(bytes); } }
详细步骤:
集群首次启动:
半数前选myid 最大的机器。
非首次启动:
优先选择zxid值?的节点称为Leader!!
ZAB 协议是为分布式协调服务 Zookeeper 专?设计的?种?持崩溃恢复和原??播协议
具体流程:
总结: 第一步发送提议,如果提议获得半数以上机器的ack,然后发送commit给follower,同时自己commit。
Leader宕机后,被选举的新Leader需要解决的问题:
选举算法的关键点:保证选举出的新Leader拥有集群中所有节点最?编号(ZXID)的事务!!
总结:leader崩溃,新的leader必须拥有最大的事务id,这样才能保证数据最新。
原文:https://www.cnblogs.com/wanghzh/p/15218178.html