一:分布式barrier
? 分布式barrier一般出现在类似这样的场景,某个任务最终的执行需要基于很多并行计算的子结果。在JDK中也有类似的实现,可以参加我的另一篇博客,地址:http://tanjie090508.iteye.com/blog/2287426。但是在多个进程中,CyclicBarrier就不能用了,但是基于zookeeper是很好实现的。
二:zookeeper实现分布式屏障思路
?????? 某个node路径为"/queue_barrier",为该根节点赋值为某个默认值,假设为10,当根路径"/queue_barrier"下的子节点个数为10时,则所有子进程都完成了任务,主进程开始执行。
? 基于zookeeper的节点类型,创建临时连续的节点会在创建的节点后给节点名加上一个数字后缀,基于这个顺序,我们可以有如下的思路:
? 1:通过调用getData()来获取某个节点的值,假设为10。
? 2:调用getChildren()来获取所有的子节点,同时注册watcher监听。
? 3:统计子节点的个数。
? 4:将统计的个数和getData()获取的值比较,如果还不足10,就需要等待。
? 5:接收watcher通知。
大概的流程图如下:
三:举例说明
package com.travelsky.pss.react.zookeeper.queue;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* @author tanjie
*/
public class BarrerQueue implements Watcher{
private static final String Addr = "ip:2181,ip:2182,ip:2183";
private String root = null;
private ZooKeeper zk = null;
private static String subNode = "/element";
private static CountDownLatch latch = new CountDownLatch(1);
private static final CountDownLatch countDownLatch = new CountDownLatch(10);
//构造函数初始化
public BarrerQueue(String root) {
this.root = root;
try {
// 连接zk服务器
zk = new ZooKeeper(Addr, 3000, this);
if (zk != null) {
// 建立根目录节点
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zk.setData(root, "10".getBytes(), -1);
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
void add(String path,CountDownLatch countDownLatch) {
try {
if(null != zk){
// 设置一个监控的标志,当大小为10时,所有子节点都已经创建完毕,进行主流程处理
zk.exists(root + "/start", true);
zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> list = zk.getChildren(root, false);
System.out.println("子节点的个数:" + list.size() + ",跟节点默认参考值:" + Integer.parseInt(new String(zk.getData(root,false, new Stat()))) );
if (list.size() < Integer.parseInt(new String(zk.getData(root,false, new Stat())))) {
countDownLatch.countDown();
} else {
if (null == zk.exists(root + "/start", false)) {
zk.create(root + "/start", new byte[0],Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String args[]) throws InterruptedException {
try {
final BarrerQueue queue2 = new BarrerQueue("/queue_barrier");
latch.await();
for (int i = 0; i < 10; i++) {
queue2.add("/queue_barrier" + subNode,countDownLatch);
}
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.await();
}
@Override
public void process(WatchedEvent event) {
if ((root + "/start").equals(event.getPath())&& event.getType() == EventType.NodeCreated) {
System.out.println(root + "/start" + "---" + "节点被传建了");
try {
List<String> list = zk.getChildren(root, false);
for (final String node : list) {
if(!"start".equals(node)){
System.out.println(node);
}
}
System.out.println("所以人到齐,开始吃饭");
countDownLatch.countDown();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}else if(event.getState() == KeeperState.SyncConnected){
latch.countDown();
}
}
}
? 让我们看zk服务器上指定节点下的数据,因为子节点是临时连续后,当session端口后,节点后自动消失,所以我们debug运行后,会看到下面的结果:
2182端口
2182端口
2183端口
?程序运行结果如下:
子节点的个数:1,跟节点默认参考值:10 子节点的个数:2,跟节点默认参考值:10 子节点的个数:3,跟节点默认参考值:10 子节点的个数:4,跟节点默认参考值:10 子节点的个数:5,跟节点默认参考值:10 子节点的个数:6,跟节点默认参考值:10 子节点的个数:7,跟节点默认参考值:10 子节点的个数:8,跟节点默认参考值:10 子节点的个数:9,跟节点默认参考值:10 子节点的个数:10,跟节点默认参考值:10 /queue_barrier/start---节点被传建了 element0000000004 element0000000005 element0000000006 element0000000007 element0000000008 element0000000000 element0000000009 element0000000001 element0000000002 element0000000003 所以人到齐,开始吃饭
原文:http://tanjie090508.iteye.com/blog/2288583