分布式专题|如何使用zookeeper实现分布式锁
在分布式中,避免不了使用分布式锁,在前面的专题中,我们已经说过使用Redis实现分布式锁,这里我将给大家演示如何使用zookeeper实现分布式锁。
首先,给大家介绍下实现的基本思路,这里默认大家已经掌握了分布式锁的基本概念了,如果还没有理解分布式锁是用来干嘛的,可以查阅相关文章了解下:
共享锁(读写锁):
两种锁都是基于序号节点的特性完成,zookeepr帮我们保证了创建节点的顺序一致性,先创建的,节点序号比后创建的节点序号要小,所以我们可以这么实现:
共享锁实现思路
-
如果是写锁:
- 创建一个序号节点 lock-w-00003
- 获取锁节点下的所有序号子节点
- 通过比较节点序号大小,lock-w-00003是最小的序号节点,则表明获取成功
- 否则如果lock-w-00003不是最小的序号节点,则对lock-w-00003前面的一个节点进行监听,并阻塞当前线程;如果前面一个节点变化了,再进行同样的判断,直至lock-w-00003是最小的子节点,即获取锁成功;
-
如果是读锁:
- 创建一个序号节点lock-r-0004
- 获取锁节点下的所有序号子节点
- 通过比较序号节点大小。如果在lock-r-00004前面都是读节点,那么就代表获取读锁成功,否则监听lock-r-00004前一个节点,并阻塞当前线程;如果节点发生变化,则进行同样的判断, 直到lock-r-00004前面都是读节点,获取lock-r-00004是最小的节点,才代表获取锁成功;
排它锁实现思路
- 创建序号节点,lock-0005
- 获取锁节点下的所有序号子节点
- 比较发现,lock-0005是最小的子节点,则代表获取排它锁成功;
- 否则监听lock-0005前面的节点变化,并阻塞当前线程
- 如果发现到节点变化,则继续执行2-4的操作,直至获取锁;
排它锁实现核心代码及注释
@Override
public void lock() {
if (tryLock()) { //如果获取锁成功,则直接返回
System.out.println("###成功获取锁###");
} else {
// 获取锁失败,监听前面的子节点,阻塞等待
waitLock();
// 前面子节点被删除,重新获取锁
lock();
}
}
@Override
public void unLock() {
if(zkClient!=null){
// 由子类调用,关闭会话,防止回话过多,导致拒绝链接
zkClient.close();
System.out.println("###释放所资源###");
}
}
public boolean tryLock() {
// 创建临时序号节点
if (null == currentPath || currentPath.length() <= 0) {
currentPath = zkClient.createEphemeralSequential(lockPath + "/", "lock");
}
// 获取所有子节点按照从小到大进行排序
List<String> children = zkClient.getChildren(lockPath);
Collections.sort(children);
// 判断第一个节点是否是自己,如果是自己,则获取锁成功,直接返回
if (currentPath.equals(lockPath + "/" + children.get(0))) {
System.out.println("成功获取锁:" + currentPath);
return true;
}
String key = currentPath.substring(lockPath.length() + 1);
int location = Collections.binarySearch(children, key);
// 定位前面一个节点的位置,进行监听
this.beforePath = lockPath + "/" + children.get(location - 1);
return false;
}
// 锁获取失败,会调用此方法
public void waitLock() {
IZkDataListener listener = new IZkDataListener() {
public void handleDataChange(String s, Object o) {
}
public void handleDataDeleted(String s) {
LATCH.countDown();
}
};
// 监听前面一个子节点变化
zkClient.subscribeDataChanges(this.beforePath, listener);
// 如果没有子节点,则直接返回,重新获取锁
if (!zkClient.exists(this.beforePath)) {
return;
}
try {
// 这里使用JUC中的CountDownLatch 进行阻塞等待
LATCH.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 被监听的节点发生变化,取消监听,节省资源;
zkClient.unsubscribeDataChanges(this.beforePath, listener);
}
// 子类实现的释放锁的方法
@Override
public void unLock() {
System.out.println("成功释放锁"+currentPath);
zkClient.delete(currentPath);
// 调用父类方法释放资源
super.unLock();
}
代码已经上传到码云 https://gitee.com/yangleliu/code_learning.git
读写锁实现核心代码及注释
读写锁和排它锁代码在实现方式上主要是在tryLock的方式上不同:
写锁
public boolean tryLock() {
if (null == currentPath || currentPath.length() <= 0) {
currentPath = zkClient.createEphemeralSequential(lockPath + "/write", "lock");
}
List<String> children = zkClient.getChildren(lockPath);
// 按照序号节点进行排序
Collections.sort(children, Comparator.comparingInt(o -> Integer.parseInt(o.replaceAll("[^0-9]*", ""))));
// System.out.println("当前孩子:"+children.toString());
// 如果当前节点是第一个,则代表获取写锁成功
if (currentPath.equals(lockPath + "/" + children.get(0))) {
System.out.println("成功获取锁:" + currentPath);
return true;
}
String key = currentPath.substring(lockPath.length() + 1);
int location = children.indexOf(key);
// 获取前面一个序号节点,并监听
this.beforePath = lockPath + "/" + children.get(location - 1);
System.out.println("获取写锁失败"+key);
System.out.println("当前孩子:"+children.toString());
return false;
}
读锁
public boolean tryLock() {
if (null == currentPath || currentPath.length() <= 0) {
currentPath = zkClient.createEphemeralSequential(lockPath + "/read", "lock");
System.out.println(currentPath);
}
List<String> children = zkClient.getChildren(lockPath);
// 按照节点进行排序
Collections.sort(children, Comparator.comparingInt(o -> Integer.parseInt(o.replaceAll("[^0-9]*", ""))));
// 如果当前节点处于第一个则获取锁成功
if (currentPath.equals(lockPath + "/" + children.get(0))) {
// System.out.println("成功获取锁:" + currentPath);
return true;
}
String key = currentPath.substring(lockPath.length() + 1);
int location = children.indexOf(key);
// 遍历当前节点位置前面的所有子节点
for (int i=0;i<location;i++){
System.out.println(children.get(i));
// 如果包含写锁,则直接返回获取锁失败,
if (children.get(i).contains("write")){
this.beforePath = lockPath + "/" + children.get(location - 1);
System.out.println("获取读锁失败:当前孩子"+children.toString());
return false;
}
}
// 如果前面没有写锁,则代表获取锁成功
return true;
}