在之前的文章中介绍过分布式锁的特点和利用Redis实现简单的分布式锁。但是分布式锁的实现还有很多其他方式,但是万变不离其宗,始终遵循一个特点:同一时刻只能有一个操作获取。这篇文章主要介绍如何基于zookeeper实现分布式锁。
关于分布式锁的相关特性,这里不再赘述,请参考分布式锁。
这里回顾下分布式锁的特点:
zookeeper中有一种临时顺序节点,它具有以下特征:
利用以上的特点可以满足分布式锁实现的基本要求:
因为顺序性,可以让最小顺序号的应用获取到锁,从而满足分布式锁的每次只能一个占用锁,因为只有它一个获取到,所以可以实现重复进入,只要设置标识即可。锁的释放,即删除应用在zookeeper上注册的节点,因为每个节点只被自己注册拥有,所以只有自己才能删除,这样就满足只有占用者才可以解锁
zookeeper的序号分配是原子的,分配后即不会再改变,让最小序号者获取锁,所以获取锁是原子的
因为注册的是临时节点,在会话期间内有效,所以不会产生死锁
zookeeper注册节点的性能能满足几千,而且支持集群,能够满足大部分情况下的性能
需要获取分布式锁的应用都向zookeeper的/lock/{resouce}目录下注册sequence-前缀的节点,序号最小者获取到操作资源的权限:
Note:
这里的resource需要依据竞争的具体资源确定,如竞争账户则可以使用账户号作为resource。
从图中可以看出,clientA的顺序号最小,由它获取到锁,操作资源。
算法步骤:
流程图:
因为最小的节点只被获取到锁的client持有,所以该锁不可能被其他client释放。同时释放锁只需要将临时顺序节点删除,也是原子性操作。
/**
* 基于Zookeeper实现分布式锁
*
* @author huaijin
*/
public class DistributedLockBaseZookeeper implements DistributedLock {
private static final Logger log = LoggerFactory.getLogger(DistributedLockBaseZookeeper.class);
/**
* 利用空串作为各个节点存储的数据
*/
private static final String EMPTY_DATA = "";
/**
* 分布式锁的根目录
*/
private static final String LOCK_ROOT = "/lock";
/**
* zookeeper目录分隔符
*/
private static final String PATH_SEPARATOR = "/";
/**
* 临时顺序节点前缀
*/
private static final String LOCK_NODE_PREFIX = "sequence-";
/**
* 利用Lock和Condition实现等待通知
*/
private Lock waitNotifierLock = new ReentrantLock();
private Condition waitNotifier = waitNotifierLock.newCondition();
/**
* 操作zookeeper的client
*/
private ZkClient zkClient;
/**
* 分布式资源的路径
*/
private String resourcePath;
/**
* 锁节点完整前缀
*/
private String lockNodePrefix;
/**
* 当前注册的临时顺序节点路径
*/
private String currentLockNodePath;
public DistributedLockBaseZookeeper(String resource, ZkClient zkClient) {
Objects.requireNonNull(zkClient, "zkClient must not be null!");
if (resource == null || resource.isEmpty()) {
throw new IllegalArgumentException("resource must not be null!");
}
this.zkClient = zkClient;
this.resourcePath = LOCK_ROOT + PATH_SEPARATOR + resource;
this.lockNodePrefix = resourcePath + PATH_SEPARATOR + LOCK_NODE_PREFIX;
// 创建分布式锁根目录
if (!this.zkClient.exists(LOCK_ROOT)) {
try {
this.zkClient.create(LOCK_ROOT, EMPTY_DATA, CreateMode.PERSISTENT);
} catch (ZkNodeExistsException e) {
// ignore, logging
log.warn("The root path for lock already exists.");
}
}
// 创建资源目录
if (!this.zkClient.exists(resourcePath)) {
try {
this.zkClient.create(resourcePath, EMPTY_DATA, CreateMode.PERSISTENT);
} catch (ZkNodeExistsException e) {
// ignore, logging
log.warn("The resource path for [" + resourcePath + "] already exists.");
}
}
}
@Override
public void lock() throws DistributedLockException {
if (!acquireLock()) {
// 如果获取锁不成功,则等待
waitNotifierLock.lock();
try {
waitNotifier.await();
} catch (Exception e) {
throw new DistributedLockException("Interrupt when waiting notification.");
} finally {
waitNotifierLock.unlock();
}
}
}
@Override
public void unlock() {
// 删除自身节点,释放锁
zkClient.delete(currentLockNodePath);
}
private boolean acquireLock() throws DistributedLockException {
// 如果当前未注册临时顺序节点,则注册
if (this.currentLockNodePath == null) {
this.currentLockNodePath = zkClient.create(lockNodePrefix, EMPTY_DATA, CreateMode.EPHEMERAL_SEQUENTIAL);
}
// 获取顺序号
long lockNodeSeq = fetchSeqFromNodePath(currentLockNodePath);
// 获取所有子节点
List<String> childNodePaths = zkClient.getChildren(resourcePath);
if (childNodePaths == null || childNodePaths.isEmpty()) {
throw new DistributedLockException("Not exists child nodes.");
}
// 从所有子节点中获取最小子节点的顺序号
long minSeq = 1000000L;
int minIndex = -1;
for (int i = 0; i < childNodePaths.size(); i++) {
long nodeSeq = fetchSeqFromNodePath(resourcePath + childNodePaths.get(i));
if (nodeSeq < minSeq) {
minSeq = nodeSeq;
minIndex = i;
}
}
// 比较自身顺序号与最小序号
if (lockNodeSeq > minSeq) {
// 如果存在更小序号,则监控最小序号的子节点
String minLockNodePath = childNodePaths.get(minIndex);
zkClient.subscribeDataChanges(resourcePath + PATH_SEPARATOR + minLockNodePath,
new ListenerForLockRelease());
return false;
}
// 成功获取锁,返回
return true;
}
private long fetchSeqFromNodePath(String nodePath) {
String seq = nodePath.substring(lockNodePrefix.length());
return Long.valueOf(seq);
}
private class ListenerForLockRelease implements IZkDataListener {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 如果成功获取锁,则通知,让主线程返回
if (acquireLock()) {
waitNotifierLock.lock();
try {
waitNotifier.signal();
} finally {
waitNotifierLock.unlock();
}
}
}
}
}
原文:https://www.cnblogs.com/lxyit/p/10387123.html