一:分布式队列
?? 业界有很多消息队列,比如kafka,ActiveMQ等,本次我们将介绍基于zookeeper如何实现分布式队列,分布式队列一般有2种,一种是FIFO,一种是等待多个线程都返回了,在统一执行,有点像barrier。这里我们先介绍如何实现FIFO
二:zookeeper实现FIFO思路
??? 基于zookeeper的节点类型,创建连续的节点会在创建的节点后给节点名加上一个数字后缀,基于这个顺序,我们可以有如下的思路
?1:多个客户端同时在指定节点下创建一个连续的子节点
?2:调用getChildren()来获取所有的子节点,并确定自己创建的节点在子节点中的顺序
?3:如果自己是最小的节点,则消费,如果不是,则在比自己节点顺序大1位的节点上注册watcher监听
三:举例说明
package com.travelsky.pss.react.zookeeper.queue;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* 实现的思路也非常简单,就是在特定的目录下创建 SEQUENTIAL类型的子目录 /queue_i,
* 这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren()方法可以返回当前所有的队列中的元素,
* 然后消费其中最小的一个,这样就能保证 FIFO
*
* @author tanjie
*
*/
public class Queue implements Watcher {
private static final String Addr = "ip:2181,ip:2182,ip:2183";
private static CountDownLatch latch = new CountDownLatch(10);
private String root;
private ZooKeeper zk = null;
private Integer mutex = null;
private String subNode = "/element";
public Queue(String root) {
this.root = root;
try {
// 连接zk服务器
zk = new ZooKeeper(Addr, 5000 * 10, this);
} catch (IOException e) {
e.printStackTrace();
}
mutex = new Integer(-1);
if (zk != null) {
try {
// 建立根目录节点
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 当znode上事件触发,唤醒相应的等待线程
*
* @param event
*/
@Override
public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
}
/**
* 生产着生产
* @param i
* @return
*/
boolean produce(int i) {
try {
zk.create(root + subNode + i, ("生产着生产的数据_" + i).getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
return false;
}
System.out.println("生产数据Success:" + root + subNode + i);
latch.countDown();
return true;
}
/**
* 消费者源源不断消费
* @throws KeeperException
* @throws InterruptedException
*/
void consume() throws KeeperException, InterruptedException {
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
System.out.println("队列里面没有数据供消费");
mutex.wait();// 释放锁,让其他任务得以执行
} else {
System.out.println("目前队里的数据为:" + list);
Integer min = new Integer(list.get(0).substring(
subNode.length() - 1));
for (final String s : list) {
Integer tempValue = new Integer(s.substring(subNode
.length() - 1));
if (tempValue < min) {
min = tempValue;
}
}
System.out.println("消费者消费了: " + root + subNode + min);
zk.delete(root + subNode + min, 0);
}
}
}
}
public static void main(String args[]) throws InterruptedException {
Producer producer = new Producer(new Queue("/queue"));
// 生产者线程启动
producer.start();
latch.await();
// 消费者线程
Consumer consumer = new Consumer(new Queue("/queue"));
consumer.start();
}
}
class Producer extends Thread {
private Queue queue;
public Producer(Queue queue) {
this.queue = queue;
}
@Override
public void run() {
// 该生产者线程创建10个znode
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
queue.produce(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer extends Thread {
private Queue queue;
public Consumer(Queue queue) {
this.queue = queue;
}
// 消费者线程,不断的从zk服务器中读取znode,进行操作。
@Override
public void run() {
try {
queue.consume();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
? 运行结果如下:
生产数据Success:/queue/element0 生产数据Success:/queue/element1 生产数据Success:/queue/element2 生产数据Success:/queue/element3 生产数据Success:/queue/element4 生产数据Success:/queue/element5 生产数据Success:/queue/element6 生产数据Success:/queue/element7 生产数据Success:/queue/element8 生产数据Success:/queue/element9 目前队里的数据为:[element8, element9, element6, element7, element4, element5, element2, element3, element0, element1] 消费者消费了: /queue/element0 目前队里的数据为:[element8, element9, element6, element7, element4, element5, element2, element3, element1] 消费者消费了: /queue/element1 目前队里的数据为:[element8, element9, element6, element7, element4, element5, element2, element3] 消费者消费了: /queue/element2 目前队里的数据为:[element8, element9, element6, element7, element4, element5, element3] 消费者消费了: /queue/element3 目前队里的数据为:[element8, element9, element6, element7, element4, element5] 消费者消费了: /queue/element4 目前队里的数据为:[element8, element9, element6, element7, element5] 消费者消费了: /queue/element5 目前队里的数据为:[element8, element9, element6, element7] 消费者消费了: /queue/element6 目前队里的数据为:[element8, element9, element7] 消费者消费了: /queue/element7 目前队里的数据为:[element8, element9] 消费者消费了: /queue/element8 目前队里的数据为:[element9] 消费者消费了: /queue/element9 队列里面没有数据供消费 队列里面没有数据供消费
?
原文:http://tanjie090508.iteye.com/blog/2288582