开源客户端,原生api的不足
连接的创建是异步的,需要开发人员自行编码实现等待
连接没有自动的超时重连机制
Zk本身没提供序列化机制,需要开发人员自行指定,从而实现数据的序列化和反序列化
Watcher注册一次只会生效一次,需要不断的重复注册
Watcher的使用方式不符合java本身的术语,如果采用监听器方式,更容易理解
不支持递归创建树形节点
开源客户端---ZkClient介绍
Github上一个开源的zk客户端,由datameer的工程师Stefan Groschupf和Peter Voss一起开发
– 解决session会话超时重连
– Watcher反复注册
– 简化开发api
– 还有.....
– https://github.com/sgroschupf/zkclient
开源客户端---Curator介绍
1. 使用CuratorFrameworkFactory工厂的两个静态方法创建客户端
a) static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs,
RetryPolicy retryPolicy)
b) static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
2. Start()方法启动
参数说明
connectString 分开的ip:port对
retryPolicy 重试策略,默认四种:Exponential BackoffRetry,RetryNTimes ,RetryOneTime,
RetryUntilElapsed
sessionTimeoutMs 会话超时时间,单位为毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位为毫秒,默认是15000ms
重试策略
– 实现接口RetryPolicy可以自定重重试策略
? boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
retryCount 已经重试的次数,如果第一次重试 此值为0
elapsedTimeMs 重试花费的时间,单位为毫秒
sleeper 类似于Thread.sleep,用于sleep指定时间
返回值 如果还会继续重试,则返回true
四种默认重试策略
– ExponentialBackoffRetry
? ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
? ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
? 当前应该sleep的时间: baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))),随着重试次数增加重试时间间隔变大,指数倍增长
参数说明
baseSleepTimeMs 初始sleep时间
maxRetries 最大重试次数
maxSleepMs 最大重试时间
返回值 如果还会继续重试,则返回true
默认重试策略
– RetryNTimes
? RetryNTimes(int n, int sleepMsBetweenRetries)
? 当前应该sleep的时间
参数说明
n 最大重试次数
sleepMsBetweenRetries 每次重试的间隔时间
– RetryOneTime
? 只重试一次
? RetryOneTime(int sleepMsBetweenRetry), sleepMsBetweenRetry为重试间隔的时间
默认重试策略
– RetryUntilElapsed
? RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
? 重试的时间超过最大时间后,就不再重试
参数说明
maxElapsedTimeMs 最大重试时间
sleepMsBetweenRetries 每次重试的间隔时间
Fluent风格的API
– 定义:一种面向对象的开发方式,目的是提高代码的可读性
– 实现方式:通过方法的级联或者方法链的方式实现
– 举例:
zkclient = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("tests").build();
创建节点
– 构建操作包装类(Builder): CreateBuilder create()---- CuratorFramework
– CreateBuilder
? creatingParentsIfNeeded() //递归创建父目录
? withMode(CreateMode mode)//设置节点属性,比如:CreateMode.PERSISTENT,如果是递归创建,创建模式
为临时节点,则只有叶子节点是临时节点,非叶子节点都为持久节点
? withACL(List aclList) //设置acl
? forPath(String path) //指定路劲
删除节点
– 构建操作包装类(Builder):DeleteBuilder delete() -----CuratorFramework
– DeleteBuilder
? withVersion(int version) //特定版本号
? guaranteed() //确保节点被删除
? forPath(String path) //指定路径
? deletingChildrenIfNeeded() //递归删除所有子节点
关于guaranteed:
Solves edge cases where an operation may succeed on the server but connection failure
occurs before a response can be successfully returned to the client
意思是:解决当某个删除操作在服务器端可能成功,但是此时客户端与服务器端的连接中断,而删除的响
应没有成功返回到客户端
底层的本质是重试
关于异步操作
– inBackground()
– inBackground(Object context)
– inBackground(BackgroundCallback callback)
– inBackground(BackgroundCallback callback, Object context)
– inBackground(BackgroundCallback callback, Executor executor)
– inBackground(BackgroundCallback callback, Object context, Executor executor)
从参数看跟zk的原生异步api相同,多了一个线程池,用于执行回调
读取数据
– 构建操作包装类(Builder): GetDataBuilder getData() -----CuratorFramework
– GetDataBuilder
? storingStatIn(org.apache.zookeeper.data.Stat stat) //把服务器端获取的状态数据存储到stat对象
? Byte[] forPath(String path)//节点路径
读取子节点
– 构建操作包装类(Builder): GetChildrenBuilder getChildren() -----CuratorFramework
– GetChildrenBuilder
? storingStatIn(org.apache.zookeeper.data.Stat stat) //把服务器端获取的状态数据存储到stat对象
? Byte[] forPath(String path)//节点路径
? usingWatcher(org.apache.zookeeper.Watcher watcher) //设置watcher,类似于zk本身的api,也只能使用一次
? usingWatcher(CuratorWatcher watcher) //设置watcher ,类似于zk本身的api,也只能使用一次
设置watcher
– NodeCache
? 监听数据节点的内容变更
? 监听节点的创建,即如果指定的节点不存在,则节点创建后,会触发这个监听
– PathChildrenCache
? 监听指定节点的子节点变化情况
? 包括:新增子节点 子节点数据变更 和子节点删除
NodeCache
– 构造函数
? NodeCache(CuratorFramework client, String path)
? NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
参数说明
client 客户端实例
path 数据节点路径
dataIsCompressed 是否进行数据压缩
– 回调接口
? public interface NodeCacheListener
void nodeChanged() //没有参数,怎么获取事件信息以及节点数据?
PathChildrenCache
client 客户端实例
path 数据节点路径
dataIsCompressed 是否进行数据压缩
cacheData 用于配置是否把节点内容缓存起来,如果配置为true,那么客户端在接
收到节点列表变更的同时,也能够获取到节点的数据内容,如果为false
则无法取到数据内容
threadFactory 通过这两个参数构造专门的线程池来处理事件通知
executorService
PathChildrenCache
– 监听接口
? 时间类型包括:新增子节点(CHILD_ADDED),子节点数据变更(CHILD_UPDATED),子节点删除(CHILD_REMOVED)
– PathChildrenCache.StartMode
? BUILD_INITIAL_CACHE //同步初始化客户端的cache,及创建cache后,就从服务器端拉入对应的数据
? NORMAL //异步初始化cache
? POST_INITIALIZED_EVENT //异步初始化,初始化完成触发事件PathChildrenCacheEvent.Type.INITIALIZED
zkclient举例
curator举例
原文:https://www.cnblogs.com/a-du/p/9888972.html