zookeeper中的节点可以持久化/有序的两个维度分为四种类型:
PERSIST:持久化无序(保存在磁盘中)
PERSIST_SEQUENTIAL:持久化有序递增
EPHEMERAL:非持久化的无序的,保存在内存中,当客户端关闭后消失。
EPHEMERAL_SEQUENTIAL:非持久有序递增,保存在内存中,当客户端关闭后消失
每个节点都可以注册Watch操作,用于监听节点的变化,有四种事件类型如下:
Created event: Enabled with a call to exists
Deleted event: Enabled with a call to exists, getData, and getChildren
Changed event: Enabled with a call to exists and getData
Child event: Enabled with a call to getChildren
Watch的基本特征是客户端先得到通知,然后才能得到数据,Watch被fire之后就立即取消了,不会再有Watch后续变化,想要监听只能重新注册;
使用原生Zookeeper创建节点和监听节点变化代码如下:
1. 引入依赖,pom.xml
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> </dependency>
2. 客户端连接类
package com.wangx.kafka.zk; import org.apache.zookeeper.*; import java.io.IOException; public class ZkDemo { public static void main(String[] args) throws IOException, KeeperException, InterruptedException { //创建链接,并监听连接状态 ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("链接客户端"); System.out.println(watchedEvent.getState()); } }); //创建节点,/parent:节点路径, data.xx:数据,Ids:设置权限CreateNode.PERSISTENT:创建节点类型 String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //监听节点变化 zooKeeper.exists("/testRoot", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("state" + watchedEvent.getState()); } }); System.out.println(parent); Thread.sleep(10000000); } }
运行创建一个持久化的节点。
查看客户端可以看到:
parent节点创建成功。
删除parent节点,观察watche变化。
控制台打印:
表示监听了删除节点事件,此时再在客户端手动创建节点,观察变化
控制台并没有打印任何创建信息,说明没有监听到,这就是我们说的一旦watche被fire之后就会被关闭,此时改造一下代码:
package com.wangx.kafka.zk; import org.apache.zookeeper.*; import java.io.IOException; public class ZkDemo { public static void main(String[] args) throws IOException, KeeperException, InterruptedException { //创建链接,并监听连接状态 final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("链接客户端"); System.out.println(watchedEvent.getState()); } }); //创建节点 String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //监听节点变化 zooKeeper.exists("/parent", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("state" + watchedEvent.getState()); try { //重新注册监听事件 zooKeeper.exists("/parent", this); } catch (KeeperException e) { } catch (InterruptedException e) { e.printStackTrace(); } } }); // System.out.println(newNode); Thread.sleep(10000000); } }
删除节点,再手动创建节点:
控制台打印如下:
这样创建节点的事件就又被重新注册并监听到了。
1. 抢注Leader节点——非公平模式
编码流程:
1. 创建Leader父节点,如/chroot,并将其设置为persist节点
2. 各客户端通过在/chroot下创建Leader节点,如/chroot/leader,来竞争Leader。该节点应被设置为ephemeral
3. 若某创建Leader节点成功,则该客户端成功竞选为Leader
4. 若创建Leader节点失败,则竞选Leader失败,在/chroot/leader节点上注册exist的watch,一旦该节点被删除则获得通知
5. Leader可通过删除Leader节点来放弃Leader
6. 如果Leader宕机,由于Leader节点被设置为ephemeral,Leader节点会自行删除。而其它节点由于在Leader节点上注册了watch,故可得到通知,参与下一轮竞选,从而保证总有客户端以Leader角色工作。
实现代码如下:
package com.wangx.kafka.zk; import org.apache.zookeeper.*; import java.io.IOException; public class ZkDemo { public static void main(String[] args) throws IOException, KeeperException, InterruptedException { //创建链接,并监听连接状态 final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("链接客户端"); System.out.println(watchedEvent.getState()); } }); //创建节点 String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //监听节点变化 zooKeeper.exists("/parent", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("state" + watchedEvent.getState()); try { zooKeeper.exists("/parent", this); } catch (KeeperException e) { } catch (InterruptedException e) { e.printStackTrace(); } } }); String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); // System.out.println(newNode); Thread.sleep(10000000); } }
当存在节点之后,会抛出异常,这样就会导致节点创建不成功,所以只有创建成功的node才能成为leader。使用watcher监听可以在节点被删除或宕机之后来抢占leader.
2. 先到先得,后者监视前者——公平模式
1. 创建Leader父节点,如/chroot,并将其设置为persist节点
2. 各客户端通过在/chroot下创建Leader节点,如/chroot/leader,来竞争Leader。该节点应被设置为ephemeral_sequential
3. 客户端通过getChildren方法获取/chroot/下所有子节点,如果其注册的节点的id在所有子节点中最小,则当前客户端竞选Leader成功
4. 否则,在前面一个节点上注册watch,一旦前者被删除,则它得到通知,返回step 3(并不能直接认为自己成为新Leader,因为可能前面的节点只是宕机了)
5. Leader节点可通过自行删除自己创建的节点以放弃Leader
代码实现如下:
package com.wangx.kafka.zk; import org.apache.zookeeper.*; import java.io.IOException; public class ZkDemo { public static void main(String[] args) throws IOException, KeeperException, InterruptedException { //创建链接,并监听连接状态 final ZooKeeper zooKeeper = new ZooKeeper("node1:2181", 5000, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("链接客户端"); System.out.println(watchedEvent.getState()); } }); //创建节点 String parent = zooKeeper.create("/parent","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //监听节点变化 zooKeeper.exists("/parent", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("state" + watchedEvent.getState()); try { zooKeeper.exists("/parent", this); } catch (KeeperException e) { } catch (InterruptedException e) { e.printStackTrace(); } } }); String newNode1 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); String newNode2 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); String newNode3 = zooKeeper.create("/parent/node","data".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); // System.out.println(newNode); Thread.sleep(10000000); } }
可以看到zk中的parent下多出了三个节点:
默认以node+十个十进制数命名节点名称,数据递增。
当id在所有子节点中最小,选举成为leader.
手下引入Curator依赖,pom.xml如下:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>3.2.1</version> </dependency>
1. Curator LeaderLatch特点及api的作用:
1. 竞选为Leader后,不可自行放弃领导权
2. 只能通过close方法放弃领导权
3. 强烈建议增加ConnectionStateListener,当连接SUSPENDED或者LOST时视为丢失领导权
4. 可通过await方法等待成功获取领导权,并可加入timeout
5. 可通过hasLeadership方法判断是否为Leader
6. 可通过getLeader方法获取当前Leader
7. 可通过getParticipants方法获取当前竞选Leader的参与方
简单实现:
package com.wangx.kafka.zk; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorLeaderLatch { public static void main(String[] args) throws Exception { //设置重试策略,这里是沉睡一秒后开始重试,重试五次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5); //通过工厂类获取curatorFramework CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy); //leader节点创建 LeaderLatch leaderLatch = new LeaderLatch(curatorFramework,"/parent","node"); //监听leader节点 leaderLatch.addListener(new LeaderLatchListener() { //当前节点是leader时回调 public void isLeader() { System.out.println("I am a listener"); } //不再是leader时回调 public void notLeader() { System.out.println("I am not a listener"); } }); //启动 curatorFramework.start(); leaderLatch.start(); Thread.sleep(100000000); leaderLatch.close(); curatorFramework.close(); } }
2. Curator LeaderSelector特点及api的作用:
1. 竞选Leader成功后回调takeLeadership方法
2. 可在takeLeadership方法中实现业务逻辑
3. 一旦takeLeadership方法返回,即视为放弃领导权
4. 可通过autoRequeue方法循环获取领导权
5. 可通过hasLeadership方法判断是否为Leader
6. 可通过getLeader方法获取当前Leader
7. 可通过getParticipants方法获取当前竞选Leader的参与方
简单实现:
package com.wangx.kafka.zk; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.*; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorLeaderSelector { public static void main(String[] args) throws Exception { //设置重试策略,这里是沉睡一秒后开始重试,重试五次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,5); //通过工厂类获取curatorFramework CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("node1:2181",retryPolicy); //leader节点创建,监听Leader状态,并在takeLeadership回调函数中做自己的业务逻辑 LeaderSelector leaderSelector = new LeaderSelector(curatorFramework,"/node", new LeaderSelectorListenerAdapter() { public void takeLeadership(CuratorFramework curatorFramework) throws Exception { Thread.sleep(1000); System.out.println("启动了 takeLeadership"); } }); leaderSelector.autoRequeue(); leaderSelector.start(); //启动 curatorFramework.start(); Thread.sleep(100000000); leaderSelector.close(); curatorFramework.close(); } }
这里的LeaderSelectorListenerAdapter实现了LeaderSelectorListener接口,源码如下:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.apache.curator.framework.recipes.leader; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener { public LeaderSelectorListenerAdapter() { } //当连接失败时,会抛出异常,这样就会中断takeLeadership方法,防止业务逻辑错误操作 public void stateChanged(CuratorFramework client, ConnectionState newState) { if (client.getConnectionStateErrorPolicy().isErrorState(newState)) { throw new CancelLeadershipException(); } } }
Kafka学习笔记(4)----Kafka的Leader Election
原文:https://www.cnblogs.com/Eternally-dream/p/10023848.html