基础
Zookeeper = 文件系统 + 通知机制
Apach Hbase和 Apache solr 以及 Dubbo等项目都采用了Zookeeper
Zookeeper是一个分布式的、高性能的,开源的分布式系统的协调服务,是Google的Chubby一个开源的实现,是Hadoop 和 Hbase 的重要组件,是一个为分布式应用提供数据一致性服务的软件。具有严格顺序访问控制能力的分布式协调存储服务
是一个基于观察者模式设计的分布式服务管理框架,负责存储和管理大家都关心的数据,然后接收观察者Watch的注册,一旦这些数据的状态发生变化,Zookeeper负责通知已经在Zookeeper上注册的那些观察者做出相应的反应,从而实现集群中类似Master/Slave管理模式
Zookeeper应用场景:
Zookeeper提供了一套分布式集群管理机制,基于层次性的目录树的数据结构(文件系统),并对树中的结点进行有效管理,从而设计出各种分布式数据管理模型,作为分布式系统的沟通调度桥梁。
Zookeeper Service看作是班主任,client看作学生,watch看作是微信,config Data看作是通知信息,一处更新处处更新
ls -ltr :该linux命令工作中很常用,作用是按创建顺序显示当前目录下的文件和目录
5大参数:
./zkServer.sh start
,启动zookeeper服务./zkCli.sh
开启客户端echo ruok | nc 127.0.0.1 2181
:查看zookeeper的服务器端是否准备就绪echo stat | nc 127.0.0.1 2181
:查看zookeeper服务器端状态echo envi | nc 127.0.0.1 2181
:查看zookeeper服务器端环境Zookeeper数据模型结构与Unix文件系统很类似,整体看作一棵树,自身维护一套存储数据结构,每个节点是一个ZNode
每一个znode默认能存储1MB数据,每个ZNode都可以通过其路径唯一标识
create /node1/node2 val :报错,不支持多级创建节点
Znode = path + nodeValue + Stat 。即 set /path nodeValue,自带Stat,存放该节点的一些信息
znode中的存在类型:持久,临时。但细分要有四种
create -s /myNode v2 : -s 表示节点自动从myNode后面添加序列号,确保该节点不重复
create -e /myNode v2 : -e 代表临时节点,重启将失效,默认-p(可不写)表示持久化。-s和-e可同时用。
一个节点对应一个应用,节点存储的数据就是应用所需要的配置信息
客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变,被删除、子目录节点增加删除)时,zookeeper会通知客户端。
Session :通过Java建立Zookeeper会话,此时处于CONNECTING状态,当客户端连接Zookeeper服务成功后进入CONNECTED状态 ,结束状态CLOSED
Watch(观察者)
watch事件理解
依赖
<!-- 注册中心使用的是zookeeper,引入操作zookeeper的客户端 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>RELEASE</version>
</dependency>
WatchOne 一次性触发watch,数据监控
public class WatchOne {
private static final Logger logger = LoggerFactory.getLogger(WatchOne.class);
private final static String CONNECTSTRING = "192.168.137.133:2181";
private final static int SESSION_TIMEOUT = 50 * 1000;
private final static String PATH = "/atguigu";
private ZooKeeper zk = null;
public ZooKeeper startZK() throws Exception {
return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
public void stopZK() throws Exception {
if(zk != null) zk.close();
}
public void createZnode(String nodePath, String nodeValue) throws Exception {
// OPEN_ACL_UNSAFE:类似关闭防火墙
// CreateMode.PERSISTENT: 创建的节点由Java控制是持久的
zk.create(nodePath, nodeValue.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public String getZnode(String nodePath) throws Exception {
String result = null;
byte[] byteArray = zk.getData(PATH, new Watcher() { // 一次性触发
@Override
public void process(WatchedEvent event) {
try {
trigerValue(PATH);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new Stat());
result = new String(byteArray); // 转为字符串
return result;
}
public String trigerValue(String nodePath) throws Exception {
String result = null;
byte[] byteArray = zk.getData(PATH, false, new Stat());
result = new String(byteArray); // 转为字符串
return result;
}
public static void main(String[] args) throws Exception {
WatchOne watchOne = new WatchOne();
watchOne.setZk(watchOne.startZK());
if(watchOne.getZk().exists(PATH, false) == null){ // 没有该节点才能创建,不然会报错,节点是不能覆盖的
watchOne.createZnode( PATH, "AAA");
String retValue = watchOne.getZnode(PATH);
logger.info("retValue="+retValue);
Thread.sleep(Long.MAX_VALUE);
}else{
logger.info("I have no znode");
}
}
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
}
public class WatchMore {
private static final Logger logger = LoggerFactory.getLogger(WatchMore.class);
private final static String CONNECTSTRING = "192.168.137.133:2181";
private final static int SESSION_TIMEOUT = 50 * 1000;
private final static String PATH = "/atguigu";
private ZooKeeper zk = null;
private String oldValue = null;
public ZooKeeper startZK() throws Exception {
return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
public void stopZK() throws Exception {
if(zk != null) zk.close();
}
public void createZnode(String nodePath, String nodeValue) throws Exception {
// OPEN_ACL_UNSAFE:类似关闭防火墙
// CreateMode.PERSISTENT: 创建的节点由Java控制是持久的
zk.create(nodePath, nodeValue.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public String getZnode(String nodePath) throws Exception {
String result = null;
byte[] byteArray = zk.getData(PATH, new Watcher() { // 一次性触发
@Override
public void process(WatchedEvent event) {
try {
trigerValue(PATH);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new Stat());
result = new String(byteArray); // 转为字符串
oldValue = result; // 一开始初始值
return result;
}
// 每次所观察的节点改变就触发
public boolean trigerValue(String nodePath) throws Exception {
String result = null;
byte[] byteArray = zk.getData(PATH, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
trigerValue(nodePath);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new Stat());
result = new String(byteArray); // 转为字符串
String newValue = result;
if(oldValue.equals(newValue)){
logger.info("*********no changes");
return false;
}else{
logger.info("************oldValue:"+oldValue+"\t newValue:"+newValue);
oldValue = newValue;
return true;
}
}
// 监控节点 /atguigu下的数据
public static void main(String[] args) throws Exception {
WatchMore watchOne = new WatchMore();
watchOne.setZk(watchOne.startZK());
if(watchOne.getZk().exists(PATH, false) == null){ // 没有该节点才能创建,不然会报错,节点是不能覆盖的
watchOne.createZnode( PATH, "AAA");
String retValue = watchOne.getZnode(PATH);
logger.info("retValue="+retValue);
Thread.sleep(Long.MAX_VALUE);
}else{
logger.info("I have no znode");
}
}
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
public String getOldValue() {
return oldValue;
}
public void setOldValue(String oldValue) {
this.oldValue = oldValue;
}
}
package com.xuecheng.manage_course;
import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class WatchChild {
private static final Logger logger = LoggerFactory.getLogger(WatchChild.class);
private final static String CONNECTSTRING = "192.168.137.133:2181";
private final static int SESSION_TIMEOUT = 50 * 1000;
private final static String PATH = "/atguigu";
private ZooKeeper zk = null;
public ZooKeeper startZK() throws Exception {
return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() { // 开启监控
@Override
public void process(WatchedEvent event) {
// 子节点没改变
if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(PATH)){
showChildNode(PATH); // 子节点每次改变都会执行该方法
}else{
showChildNode(PATH); // 一开始会注册父亲节点并打印初始子节点
}
}
});
}
public void showChildNode(String nodePath) {
List<String> list = null;
try {
// 获取该节点所有子节点
list = zk.getChildren(PATH, true); // true表示该节点下全部监控,自带连续监控
logger.info("***********"+list);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
// 监控子节点
public static void main(String[] args) throws Exception {
WatchChild watchChild = new WatchChild();
watchChild.setZk(watchChild.startZK());
Thread.sleep(Long.MAX_VALUE);
}
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
}
分为服务器端和客户端,客户端之连接到某个服务器上,客户端使用并维护一个TCP连接,第一次连接会建立会话,当这个客户端连接到另外服务器时,这个会话会被新的服务器重新建立
配置项书写格式:server.N = YYY:A:B,其中,N为服务器编号,YYY表示服务器IP地址,A为LF(主从机)通信端口,即该服务器与集群中的leader交换的信息的端口。B为选举端口,即选举新leader时服务器间相互通信的端口(当leader挂掉后,其余服务器会相互通信并选举出新的leader)
真集群中每个服务器A端口和B端口都是一样的。
伪集群中每个服务器A端口和B端口都是不一样的。但IP一样
伪集群做法:
? 此时使用zk01和zk02作为服务器,zk03连接server作为客户端:./zkCli.sh -server 127.0.0.1:2193
。
? 此时使用zk03改变节点数据,zk01和zk02便可查到刚改变的数据,完成伪集群
原文:https://www.cnblogs.com/JayV/p/13311324.html