一:Curator客户端
???? Curator是Netfix公司开源的基于Zookeeper的客户端框架,其封装了原生Zookeeper很多底层的操作,比如重试机制,watcher的反复注册等。
? maven项目引入curator很简单
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>2.7.0</version>
</dependency>
二:Curator操作Zookeeper public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}
??? 参数说明: public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
{
super(validateMaxRetries(maxRetries));
this.baseSleepTimeMs = baseSleepTimeMs;
this.maxSleepMs = maxSleepMs;
}
? 参数说明: @Override
public void processResult(CuratorFramework client,
CuratorEvent event) throws Exception {
}
??? 我们知道,zk的事件通知处理线程是EventThread,它是一个串行处理的线程,想象一下,如果某个事件处理比较耗时,势必会影响后面的事件处理,而Curator给我们提供的异步处理接口,在异步处理处理接口中,其中有如下的一个构造方法public T inBackground(BackgroundCallback callback, Executor executor);? 也就是在程序中我们可以传入一个Executor实例,这样的话,我们就可以把某些处理比较耗时的事件单独交给线程池来处理了,看下面的例子:
public class Curator_Node_BackGroud_Example {
static String path = "/zk-backGroud";
static String hosts = "ip:2181";
static CuratorFramework client;
static {
client = CuratorFrameworkFactory.newClient(hosts, 2000, 2000,
new ExponentialBackoffRetry(1000, 3));
}
public static void main(String[] args) {
try {
ExecutorService service = Executors.newFixedThreadPool(2);
final CountDownLatch latch = new CountDownLatch(2);
client.start();
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client,
CuratorEvent event) throws Exception {
System.out.println("创建节点返回的状态码:"
+ event.getResultCode() + ",返回的类型:"
+ event.getType());
System.out.println("运行创建该节点的线程为:"
+ Thread.currentThread().getName());
latch.countDown();
}
}, service).forPath(path, "测试".getBytes());
Thread.sleep(2000);
// 为了比较,再次在相同的节点下创建
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client,
CuratorEvent event) throws Exception {
System.out.println("创建节点返回的状态码:"
+ event.getResultCode() + ",返回的类型:"
+ event.getType());
System.out.println("运行创建该节点的线程为:"
+ Thread.currentThread().getName());
latch.countDown();
}
}).forPath(path, "测试2".getBytes());
latch.await();
service.shutdown();
} catch (Exception e) {
e.printStackTrace();
} finally {
client.close();
}
}
}??
?可能得运行结果如下:创建节点返回的状态码:0,返回的类型:CREATE 运行创建该节点的线程为:pool-3-thread-1 创建节点返回的状态码:-110,返回的类型:CREATE 运行创建该节点的线程为:main-EventThread? ?可以看到,异步接口返回的状态码0表示节点创建成功,当节点已经存在再去创建的时候,返回-110表示节点创建失败了。
?curator还封装了很多原生zookeeper的操作,比如分布式锁,队列,屏障,事件监听,后面我们都会介绍,请持续关注。
原文:http://tanjie090508.iteye.com/blog/2288255