分布式锁
方式一
原理
节点不可重名 + watch
- 争抢创建临时节点,未争抢到锁的实例添加 watch。
- 创建了临时节点,就执行相关方法,执行完成之后,删除临时节点,释放锁。
- 其他节点再次争抢创建临时节点。
使用临时节点的原因:防止实例在创建临时节点之后,因各种原因宕机,此时,临时节点也能被同时删除。假如是永久节点,节点不会被删除,锁也就不会被释放。
缺点
惊群效应
假如有 1000 个节点,此时,有节点争抢到锁,其他节点会进入等待。锁释放,999 个节点会醒来,争抢锁。适用于并发小的情况。
举例
package com.study.mike.zookeeper;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
public class ZKDistributeLock implements Lock {
private String lockPath;
private ZkClient client;
// 锁重入计数
private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
public ZKDistributeLock(String lockPath) {
super();
this.lockPath = lockPath;
client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
}
@Override
public boolean tryLock() { // 不会阻塞
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 0) {
this.reentrantCount.set(++count);
return true;
}
}
// 创建节点
try {
client.createEphemeral(lockPath);
this.reentrantCount.set(1);
} catch (ZkNodeExistsException e) {
return false;
}
return true;
}
@Override
public void unlock() {
// 重入的释放锁处理
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 1) {
this.reentrantCount.set(--count);
return;
} else {
this.reentrantCount.set(null);
}
}
client.delete(lockPath);
}
@Override
public void lock() { // 如果获取不到锁,阻塞等待
if (!tryLock()) {
// 没获得锁,阻塞自己
waitForLock();
// 再次尝试
lock();
}
}
private void waitForLock() {
CountDownLatch cdl = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("----收到节点被删除了-------------");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
client.subscribeDataChanges(lockPath, listener);
// 阻塞自己
if (this.client.exists(lockPath)) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取消注册
client.unsubscribeDataChanges(lockPath, listener);
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
public static void main(String[] args) {
// 并发数
int currency = 50;
// 循环屏障
CyclicBarrier cb = new CyclicBarrier(currency);
// 多线程模拟高并发
for (int i = 0; i < currency; i++) {
new Thread(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
// 等待一起出发
try {
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
ZKDistributeLock lock = new ZKDistributeLock("/distLock11");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " 获得锁!");
} finally {
lock.unlock();
}
}
}).start();
}
}
}
方式二
原理
取号 + 最小号获得锁 + watch
每个节点只需要关注它的前一个节点即可。
举例
package com.study.mike.zookeeper;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
public class ZKDistributeImproveLock implements Lock {
/*
* 利用临时顺序节点来实现分布式锁
* 获取锁:取排队号(创建自己的临时顺序节点),然后判断自己是否是最小号,如是,则获得锁;不是,则注册前一节点的watcher,阻塞等待
* 释放锁:删除自己创建的临时顺序节点
*/
private String lockPath;
private ZkClient client;
private ThreadLocal<String> currentPath = new ThreadLocal<>();
private ThreadLocal<String> beforePath = new ThreadLocal<>();
// 锁重入计数
private ThreadLocal<Integer> reentrantCount = new ThreadLocal<>();
public ZKDistributeImproveLock(String lockPath) {
super();
this.lockPath = lockPath;
client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
if (!this.client.exists(lockPath)) {
try {
this.client.createPersistent(lockPath);
} catch (ZkNodeExistsException e) {
}
}
}
@Override
public boolean tryLock() {
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 0) {
this.reentrantCount.set(++count);
return true;
}
}
if (this.currentPath.get() == null) {
currentPath.set(this.client.createEphemeralSequential(lockPath + "/", "aaa"));
}
// 获得所有的子
List<String> children = this.client.getChildren(lockPath);
// 排序list
Collections.sort(children);
// 判断当前节点是否是最小的
if (currentPath.get().equals(lockPath + "/" + children.get(0))) {
this.reentrantCount.set(1);
return true;
} else {
// 取到前一个
// 得到字节的索引号
int curIndex = children.indexOf(currentPath.get().substring(lockPath.length() + 1));
beforePath.set(lockPath + "/" + children.get(curIndex - 1));
}
return false;
}
@Override
public void lock() {
if (!tryLock()) {
// 阻塞等待
waitForLock();
// 再次尝试加锁
lock();
}
}
private void waitForLock() {
CountDownLatch cdl = new CountDownLatch(1);
// 注册watcher
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("-----监听到节点被删除");
cdl.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
client.subscribeDataChanges(this.beforePath.get(), listener);
// 怎么让自己阻塞
if (this.client.exists(this.beforePath.get())) {
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 醒来后,取消watcher
client.unsubscribeDataChanges(this.beforePath.get(), listener);
}
@Override
public void unlock() {
// 重入的释放锁处理
if (this.reentrantCount.get() != null) {
int count = this.reentrantCount.get();
if (count > 1) {
this.reentrantCount.set(--count);
return;
} else {
this.reentrantCount.set(null);
}
}
// 删除节点
this.client.delete(this.currentPath.get());
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
public static void main(String[] args) {
// 并发数
int currency = 50;
// 循环屏障
CyclicBarrier cb = new CyclicBarrier(currency);
// 多线程模拟高并发
for (int i = 0; i < currency; i++) {
new Thread(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + "---------我准备好---------------");
// 等待一起出发
try {
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
ZKDistributeImproveLock lock = new ZKDistributeImproveLock("/distLock");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " 获得锁!");
} finally {
lock.unlock();
}
}
}).start();
}
}
}
PHP实现
<?php
namespace common\utils\zookeeper;
class ZkUtil
{
const debug = false;
private static $_zk = null; // zk client
private static $_node = null; // 当前节点信息
private static $_isNotifyed = null;
private static $_root; // 根目录
private static $_acl = [ // 授权
[
'perms' => \Zookeeper::PERM_ALL,
'scheme' => 'world',
'id' => 'anyone',
]
];
/**
* 初始化链接
* @param $conf
* @param $root
* @return \Zookeeper|null
* @throws \Exception
*/
public static function getInstance($conf, $root): ?\Zookeeper
{
if (self::$_zk !== null) return self::$_zk;
$client = new \Zookeeper($conf['client_uri']);
if (!$client) {
throw new \Exception('connect zookeeper error');
}
self::$_root = $root;
return self::$_zk = $client;
}
/**
* 获取锁
* @param $key
* @param $value
* @return false
*/
public static function tryClock($key, $value): bool
{
try {
self::createRoot($value);//构建根节点
self::createSub(self::$_root . $key, $value);//构建子节点
return self::getLock();//获取锁
} catch (\Exception $e) {
return false;
}
}
/**
* 释放锁
* @return bool
*/
public static function releaseLock(): bool
{
if (self::$_zk->delete(self::$_node)) {
return true;
} else {
return false;
}
}
/**
* 创建根节点
* @param $value
* @return bool
* @throws \Exception
*/
public static function createRoot($value): bool
{
// 判读根节点是否存在
if (!self::$_zk->exists(self::$_root)) {
// 创建根节点 创建成功返回节点名
$result = self::$_zk->create(self::$_root, $value, self::$_acl);
if (!$result) {
throw new \Exception('create ' . self::$_root . ' fail');
}
}
return true;
}
/**
* 创建子节点
* @param $path
* @param $value
* @return bool
* @throws \Exception
*/
public static function createSub($path, $value): bool
{
self::$_node = self::$_zk->create($path, $value, self::$_acl, \Zookeeper::EPHEMERAL | \Zookeeper::SEQUENCE);
if (!self::$_node) {
throw new \Exception('create -s -e ' . $path . ' fail');
}
if (self::debug){
echo 'create - node:' . self::$_node . PHP_EOL;
}
return true;
}
/**
* 获取锁
* @return bool
*/
public function getLock()
{
$beforeNode = self::checkNodeBefore();
// 如果得到锁返回
if ($beforeNode === true) {
return true;
} else {
self::$_isNotifyed = false;// 初始化状态
$result = self::$_zk->get($beforeNode, [ZkUtil::class, 'watcher']);
while (!$result) {
$res = self::checkNodeBefore();
if ($res === true) {
return true;
} else {
$result = self::$_zk->get($res, [ZkUtil::class, 'watcher']);
}
}
while (!self::$_isNotifyed) {
if(self::debug){
echo '.';
}
usleep(500000);//500ms
}
return true;
}
}
public static function watcher($type, $state, $key)
{
if(self::debug){
echo PHP_EOL . $key . ' notifyed ... ' . PHP_EOL;
}
self::$_isNotifyed = true;
self::getLock();
}
/**
* 检查当前节点是否可以得到锁,如果不可以获取它的上一个节点
* @return bool
*/
public function checkNodeBefore()
{
// 获取所有子节点
$nodes = self::$_zk->getChildren(self::$_root);
// 对节点进行排序
sort($nodes);
$root = self::$_root;
// 节点全路径拼接
array_walk($nodes, function (&$val) use ($root) {
$val = $root . '/' . $val;
});
// 判断是否在首位
if ($nodes[0] == self::$_node) {
if(self::debug){
echo 'get lock node ' . self::$_node . '------' . PHP_EOL;
}
return true;
} else {
// 找到当前节点的上一个节点
$index = array_search(self::$_node, $nodes);
$before = $nodes[$index - 1];
if(self::debug){
echo 'before node ' . $before . '.....' . PHP_EOL;
}
return $before;
}
}
}
//function zkLock($resourceId)
//{
// $conf['client_uri'] = '192.168.3.111:2181,192.168.3.112:2181,192.168.3.113:2181';
// $root = '/lock_' . $resourceId;
// $lockKey = '/lock_';
// $value = 'server_info_1';
// $client = ZkUtil::getInstance($conf, $root);
// $re = ZkUtil::tryClock($lockKey, $value);
// if ($re) {
// echo 'get lock success' . PHP_EOL;
// } else {
// echo 'get lock fail' . PHP_EOL;
// return false;
// }
// try {
// action();
// } catch (\Exception $e) {
// echo $e->getMessage() . PHP_EOL;
// } finally {
// $re = ZkUtil::releaseLock();
// if ($re) {
// echo 'release lock success' . PHP_EOL;
// } else {
// echo 'release lock fail' . PHP_EOL;
// }
// return;
// }
//}
//
//function action()
//{
// $n = rand(1, 20);
// switch ($n) {
// case 1:
// sleep(15);//模拟超时
// break;
// case 2:
// throw new \Exception('system throw message...');//模拟中断
// break;
// case 3:
// die('system crashed...');//模拟程序崩溃
// break;
// default:
// sleep(10);//正常处理
// break;
// }
//}
//
//zkLock(0);