Curator一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,本文主要介绍使用curator框架来实现zookeeper的分布式锁实现方案。
使用curator来实现zookeeper分布式锁有多种方案,本文主要使用 InterProcessMutex 来实现全局共享锁。
代码已经上传至github:https://github.com/xsg1995/spring-boot-curator
引入依赖
pom.xml文件如下所示:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xsg</groupId>
<artifactId>spring-boot-curator</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-curator</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<dependencies>
<!--spring-boot-starter-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--spring-boot-starter-test-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--spring-boot-configuration-processor-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!--curator-recipes-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<!--curator-framework-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<!--curator-test 用来模拟 zookeeper 环境-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.12.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
zookeeper的配置信息
在 application.yml 中配置 zookeeper 的配置:
zookeeper:
#每次重试时间间隔,单位毫秒
baseSleepTimeMs: 1000
#重试次数
maxRetries: 3
#zookeeper服务连接id与端口
connectString: 127.0.0.1:2181
#会话超时时间,单位毫秒
sessionTimeoutMs: 5000
#连接创建超时时间,单位毫秒
connection-timeout-ms: 5000
后台定义一个 ZookeeperProperties 类注入 zookeeper 的配置,实现如下:
/**
* 注入 zookeeper 的配置信息
*/
@Component
@ConfigurationProperties(prefix = "zookeeper")
public class ZookeeperProperties {
private int baseSleepTimeMs;
private int maxRetries;
private String connectString;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
...
}
配置 CuratorFramework
CuratorFramework 类封装了对zookeeper底层的操作,配置如下:
/**
* curator 配置
*/
@Configuration
public class ZookeeperConfig {
/**
* 获取 CuratorFramework
* 使用 curator 操作 zookeeper
* @return
*/
@Bean
public CuratorFramework curatorFramework(ZookeeperProperties zookeeperProperties) {
//配置zookeeper连接的重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(), zookeeperProperties.getMaxRetries());
//构建 CuratorFramework
CuratorFramework curatorFramework =
CuratorFrameworkFactory.builder()
.connectString(zookeeperProperties.getConnectString())
.sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs())
.connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs())
.retryPolicy(retryPolicy)
.build();
//连接 zookeeper
curatorFramework.start();
return curatorFramework;
}
}
编写加锁、释放锁具体实现逻辑
使用模板模式,将加锁、释放锁的通用代码给抽取出来,通过接口回调方式回调具体的业务实现逻辑,实现如下:
/**
* 可重入共享锁组件
*/
@Component
public class ShardReentrantLockComponent {
@Autowired
private CuratorFramework curatorFramework;
/**
* 该方法为模板方法,获得锁后回调 BaseLockHandler 中的 handler 方法
* @return
*/
public <T> T acquireLock(BaseLockHandler<T> baseLockHandler) {
//获取要加锁的路径
String path = baseLockHandler.getPath();
//获取超时时间
int timeOut = baseLockHandler.getTimeOut();
//时间单位
TimeUnit timeUnit = baseLockHandler.getTimeUnit();
//通过 InterProcessMutex 该类来获取可重入共性锁
InterProcessMutex lock = new InterProcessMutex(this.curatorFramework, path);
//用于标识是否获取了锁
boolean acquire = false;
try {
try {
//成功获得锁后返回 true
acquire = lock.acquire(timeOut, timeUnit);
} catch (Exception e) {
e.printStackTrace();
}
if(acquire) {
//获得锁后回调具体的业务逻辑
return baseLockHandler.handler();
} else {
//没有获得锁返回 null
return null;
}
} finally {
try {
if(acquire) {
//释放锁
lock.release();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
BaseLockHandler 抽象类回调具体的业务逻辑,实现如下:
/**
* 业务逻辑抽象类
* @param <T>
*/
public abstract class BaseLockHandler<T> {
//获得锁的默认超时时间,默认为 200ms
private static final int DEFAULT_TIME_OUT = 200;
//加锁的资源路径
private String path;
public BaseLockHandler(String path) {
this.path = path;
}
/**
* 具体的业务实现逻辑,重写该方法
* @return
*/
public abstract T handler();
/**
* 返回加锁的路径
* @return
*/
public String getPath() {
return this.path;
}
/**
* 返回加锁的超时时间
* @return
*/
public int getTimeOut() {
return DEFAULT_TIME_OUT;
}
/**
* 时间单位
* @return
*/
public TimeUnit getTimeUnit() {
return TimeUnit.MILLISECONDS;
}
}
测试
编写测试用例类进行测试:
/**
* LockComponent的测试类
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class LockComponentTest {
/**
* 表示开启多个线程并行执行
*/
private static final int THREAD_COUNT = 100;
@Autowired
private ShardReentrantLockComponent lockComponent;
/**
* 用来模拟 zookeeper 服务
*/
private TestingServer server;
/**
* 用来实现具体逻辑,对该计数器加1
*/
private int count;
@Before
public void before() throws Exception {
//模拟一个zookeeper节点,端口号为 2181
server = new TestingServer(2181);
}
@After
public void after() {
if(server != null) {
//关闭资源
CloseableUtils.closeQuietly(server);
}
}
/**
* 不加锁实现多个线程同时对 count 执行 ++ 操作
* 会出现数据不一致现象
* @throws Exception
*/
@Test
public void noAcquireLockTest() throws Exception {
//初始化一个拥有 100 个线程的线程池
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
//使用 CountDownLatch 实现线程的协调
CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
for(int i = 0; i < THREAD_COUNT; i++) {
final int index = i;
//提交线程
executorService.submit(() -> {
//name 表示该线程的名称
String name = "client" + (index + 1);
//执行 count++
count++;
try {
//睡眠 50ms ,使测试结果更明显
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
//打印各个线程执行结果
System.out.println(name + " 执行业务方法,对 count 执行 ++ 操作后 count 的值为 : " + count);
//调用countDown方法,表示该线程执行完毕
countDownLatch.countDown();
});
}
//使该方法阻塞住,不然看不到效果
countDownLatch.await();
}
/**
* 使用 zookeeper 加锁实现多个线程同时对 count 执行 ++ 操作
* @throws Exception
*/
@Test
public void acquireLockTest() throws Exception {
//要加锁节点的路径
String path = "/path/test";
//初始化一个拥有 100 个线程的线程池
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
//使用 CountDownLatch 实现线程的协调
CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
for(int i = 0; i < THREAD_COUNT; i++) {
final int index = i;
//提交线程
executorService.submit(() -> {
//name 表示该线程的名称
String name = "client" + (index + 1);
//result 获取执行完业务逻辑后返回值
String result = null;
while (result == null) {
//result 为 null 表示没有的锁,会继续执行while循环去竞争获取锁
result = lockComponent.acquireLock(new BaseLockHandler<String>(path) {
//执行具体的业务逻辑
@Override
public String handler() {
//执行 count++
count++;
try {
//睡眠 50ms ,使测试结果更明显
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
//打印各个线程执行结果
System.out.println(name + " 执行业务方法,对count++ : " + count);
//执行成功后不要返回null,如果返回null,会继续执行while去竞争获取锁
return this.getPath();
}
});
}
//调用countDown方法,表示该线程执行完毕
countDownLatch.countDown();
});
}
//使该方法阻塞住,不然看不到效果
countDownLatch.await();
}
}
测试的业务逻辑是对 count 局部变量执行 ++ 操作,这里编写了两个方法:
- noAcquireLockTest 该方法实现是通过开启 100 个线程,以不加锁的方法,并行的对 count 执行 ++ 操作;
- acquireLockTest 该方法通过获取 zookeeper 的加锁机制,开启 100 个线程,并行的对 count 执行 ++ 操作;
不加锁的执行方式结果如下所示:
可以看到,数据已经出现不一致现象。
使用 zookeeper 加锁执行结果如下所示:
从右边计数器 count 的值可以看出,加锁操作是成功的。