spring-boot基于zookeeper实现分布式锁

Curator一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,本文主要介绍使用curator框架来实现zookeeper的分布式锁实现方案。

使用curator来实现zookeeper分布式锁有多种方案,本文主要使用 InterProcessMutex 来实现全局共享锁。

代码已经上传至github:https://github.com/xsg1995/spring-boot-curator

引入依赖

pom.xml文件如下所示:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsg</groupId>
    <artifactId>spring-boot-curator</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring-boot-curator</name>
    <url>http://maven.apache.org</url>
    
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>

    
    <dependencies>
        <!--spring-boot-starter-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!--spring-boot-starter-test-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--spring-boot-configuration-processor-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <!--curator-recipes-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

        <!--curator-framework-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>

        <!--curator-test 用来模拟 zookeeper 环境-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    
</project>

zookeeper的配置信息

在 application.yml 中配置 zookeeper 的配置:

zookeeper:
  #每次重试时间间隔,单位毫秒
  baseSleepTimeMs: 1000
  #重试次数
  maxRetries: 3
  #zookeeper服务连接id与端口
  connectString: 127.0.0.1:2181
  #会话超时时间,单位毫秒
  sessionTimeoutMs: 5000
  #连接创建超时时间,单位毫秒
  connection-timeout-ms: 5000

后台定义一个 ZookeeperProperties 类注入 zookeeper 的配置,实现如下:

/**
 * 注入 zookeeper 的配置信息
 */
@Component
@ConfigurationProperties(prefix = "zookeeper")
public class ZookeeperProperties {
    private int baseSleepTimeMs;
    private int maxRetries;
    private String connectString;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
    ...
}

配置 CuratorFramework

CuratorFramework 类封装了对zookeeper底层的操作,配置如下:

/**
 * curator 配置
 */
@Configuration
public class ZookeeperConfig {

    /**
     * 获取 CuratorFramework
     * 使用 curator 操作 zookeeper
     * @return
     */
    @Bean
    public CuratorFramework curatorFramework(ZookeeperProperties zookeeperProperties) {
        //配置zookeeper连接的重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(), zookeeperProperties.getMaxRetries());
        
        //构建 CuratorFramework
        CuratorFramework curatorFramework =
                CuratorFrameworkFactory.builder()
                    .connectString(zookeeperProperties.getConnectString())
                    .sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs())
                    .connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs())
                    .retryPolicy(retryPolicy)
                    .build();
        //连接 zookeeper
        curatorFramework.start();
        return curatorFramework;
    }

}

编写加锁、释放锁具体实现逻辑

使用模板模式,将加锁、释放锁的通用代码给抽取出来,通过接口回调方式回调具体的业务实现逻辑,实现如下:

/**
 * 可重入共享锁组件
 */
@Component
public class ShardReentrantLockComponent {

    @Autowired
    private CuratorFramework curatorFramework;

    /**
     * 该方法为模板方法,获得锁后回调 BaseLockHandler 中的 handler 方法
     * @return
     */
    public <T> T acquireLock(BaseLockHandler<T> baseLockHandler) {
        //获取要加锁的路径
        String path = baseLockHandler.getPath();
        //获取超时时间
        int timeOut = baseLockHandler.getTimeOut();
        //时间单位
        TimeUnit timeUnit = baseLockHandler.getTimeUnit();
        //通过 InterProcessMutex 该类来获取可重入共性锁
        InterProcessMutex lock = new InterProcessMutex(this.curatorFramework, path);
        //用于标识是否获取了锁
        boolean acquire = false;
        try {
            try {
                //成功获得锁后返回 true
                acquire = lock.acquire(timeOut, timeUnit);
            } catch (Exception e) {
                e.printStackTrace();
            }
            if(acquire) {
                //获得锁后回调具体的业务逻辑
                return baseLockHandler.handler();
            } else {
                //没有获得锁返回 null
                return null;
            }
        } finally {
            try {
                if(acquire) {
                    //释放锁
                    lock.release();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

}

BaseLockHandler 抽象类回调具体的业务逻辑,实现如下:

/**
 * 业务逻辑抽象类
 * @param <T>
 */
public abstract class BaseLockHandler<T> {
    //获得锁的默认超时时间,默认为 200ms
    private static final int DEFAULT_TIME_OUT = 200;
    //加锁的资源路径
    private String path;

    public BaseLockHandler(String path) {
        this.path = path;
    }

    /**
     * 具体的业务实现逻辑,重写该方法
     * @return
     */
    public abstract T handler();

    /**
     * 返回加锁的路径
     * @return
     */
    public String getPath() {
        return this.path;
    }

    /**
     * 返回加锁的超时时间
     * @return
     */
    public int getTimeOut() {
        return DEFAULT_TIME_OUT;
    }

    /**
     * 时间单位
     * @return
     */
    public TimeUnit getTimeUnit() {
        return TimeUnit.MILLISECONDS;
    }
}

测试

编写测试用例类进行测试:

/**
 * LockComponent的测试类
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class LockComponentTest {

    /**
     * 表示开启多个线程并行执行
     */
    private static final int THREAD_COUNT = 100;

    @Autowired
    private ShardReentrantLockComponent lockComponent;

    /**
     * 用来模拟 zookeeper 服务
     */
    private TestingServer server;

    /**
     * 用来实现具体逻辑,对该计数器加1
     */
    private int count;

    @Before
    public void before() throws Exception {
        //模拟一个zookeeper节点,端口号为 2181
       server = new TestingServer(2181);
    }

    @After
    public void after() {
        if(server != null) {
            //关闭资源
            CloseableUtils.closeQuietly(server);
        }
    }

    /**
     * 不加锁实现多个线程同时对 count 执行 ++ 操作
     * 会出现数据不一致现象
     * @throws Exception
     */
    @Test
    public void noAcquireLockTest() throws Exception {
        //初始化一个拥有 100 个线程的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
        //使用 CountDownLatch 实现线程的协调
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
        for(int i = 0; i < THREAD_COUNT; i++) {
            final int index = i;
            //提交线程
            executorService.submit(() -> {
                //name 表示该线程的名称
                String name = "client" + (index + 1);
                //执行 count++
                count++;
                try {
                    //睡眠 50ms ,使测试结果更明显
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //打印各个线程执行结果
                System.out.println(name + "    执行业务方法,对 count 执行 ++ 操作后 count 的值为 : " + count);
                //调用countDown方法,表示该线程执行完毕
                countDownLatch.countDown();
            });
        }
        //使该方法阻塞住,不然看不到效果
        countDownLatch.await();
    }

    /**
     * 使用 zookeeper 加锁实现多个线程同时对 count 执行 ++ 操作
     * @throws Exception
     */
    @Test
    public void acquireLockTest() throws Exception {
        //要加锁节点的路径
        String path = "/path/test";
        //初始化一个拥有 100 个线程的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
        //使用 CountDownLatch 实现线程的协调
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);

        for(int i = 0; i < THREAD_COUNT; i++) {
            final int index = i;
            //提交线程
            executorService.submit(() -> {
                //name 表示该线程的名称
                String name = "client" + (index + 1);
                //result 获取执行完业务逻辑后返回值
                String result = null;
                while (result == null) {
                    //result 为 null 表示没有的锁,会继续执行while循环去竞争获取锁
                    result = lockComponent.acquireLock(new BaseLockHandler<String>(path) {

                        //执行具体的业务逻辑
                        @Override
                        public String handler() {
                            //执行 count++
                            count++;
                            try {
                                //睡眠 50ms ,使测试结果更明显
                                Thread.sleep(50);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            //打印各个线程执行结果
                            System.out.println(name + "    执行业务方法,对count++ : " + count);

                            //执行成功后不要返回null,如果返回null,会继续执行while去竞争获取锁
                            return this.getPath();
                        }
                    });
                }
                //调用countDown方法,表示该线程执行完毕
                countDownLatch.countDown();
            });
        }
        //使该方法阻塞住,不然看不到效果
        countDownLatch.await();
    }

}

测试的业务逻辑是对 count 局部变量执行 ++ 操作,这里编写了两个方法:

  • noAcquireLockTest 该方法实现是通过开启 100 个线程,以不加锁的方法,并行的对 count 执行 ++ 操作;
  • acquireLockTest 该方法通过获取 zookeeper 的加锁机制,开启 100 个线程,并行的对 count 执行 ++ 操作;

不加锁的执行方式结果如下所示:


不加锁的执行方式结果

可以看到,数据已经出现不一致现象。

使用 zookeeper 加锁执行结果如下所示:


使用 zookeeper 加锁执行结果

从右边计数器 count 的值可以看出,加锁操作是成功的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容