spring-boot基于zookeeper实现分布式锁

Curator一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,本文主要介绍使用curator框架来实现zookeeper的分布式锁实现方案。

使用curator来实现zookeeper分布式锁有多种方案,本文主要使用 InterProcessMutex 来实现全局共享锁。

代码已经上传至github:https://github.com/xsg1995/spring-boot-curator

引入依赖

pom.xml文件如下所示:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsg</groupId>
    <artifactId>spring-boot-curator</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring-boot-curator</name>
    <url>http://maven.apache.org</url>
    
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>

    
    <dependencies>
        <!--spring-boot-starter-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <!--spring-boot-starter-test-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--spring-boot-configuration-processor-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <!--curator-recipes-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

        <!--curator-framework-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>

        <!--curator-test 用来模拟 zookeeper 环境-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    
</project>

zookeeper的配置信息

在 application.yml 中配置 zookeeper 的配置:

zookeeper:
  #每次重试时间间隔,单位毫秒
  baseSleepTimeMs: 1000
  #重试次数
  maxRetries: 3
  #zookeeper服务连接id与端口
  connectString: 127.0.0.1:2181
  #会话超时时间,单位毫秒
  sessionTimeoutMs: 5000
  #连接创建超时时间,单位毫秒
  connection-timeout-ms: 5000

后台定义一个 ZookeeperProperties 类注入 zookeeper 的配置,实现如下:

/**
 * 注入 zookeeper 的配置信息
 */
@Component
@ConfigurationProperties(prefix = "zookeeper")
public class ZookeeperProperties {
    private int baseSleepTimeMs;
    private int maxRetries;
    private String connectString;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
    ...
}

配置 CuratorFramework

CuratorFramework 类封装了对zookeeper底层的操作,配置如下:

/**
 * curator 配置
 */
@Configuration
public class ZookeeperConfig {

    /**
     * 获取 CuratorFramework
     * 使用 curator 操作 zookeeper
     * @return
     */
    @Bean
    public CuratorFramework curatorFramework(ZookeeperProperties zookeeperProperties) {
        //配置zookeeper连接的重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(zookeeperProperties.getBaseSleepTimeMs(), zookeeperProperties.getMaxRetries());
        
        //构建 CuratorFramework
        CuratorFramework curatorFramework =
                CuratorFrameworkFactory.builder()
                    .connectString(zookeeperProperties.getConnectString())
                    .sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs())
                    .connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs())
                    .retryPolicy(retryPolicy)
                    .build();
        //连接 zookeeper
        curatorFramework.start();
        return curatorFramework;
    }

}

编写加锁、释放锁具体实现逻辑

使用模板模式,将加锁、释放锁的通用代码给抽取出来,通过接口回调方式回调具体的业务实现逻辑,实现如下:

/**
 * 可重入共享锁组件
 */
@Component
public class ShardReentrantLockComponent {

    @Autowired
    private CuratorFramework curatorFramework;

    /**
     * 该方法为模板方法,获得锁后回调 BaseLockHandler 中的 handler 方法
     * @return
     */
    public <T> T acquireLock(BaseLockHandler<T> baseLockHandler) {
        //获取要加锁的路径
        String path = baseLockHandler.getPath();
        //获取超时时间
        int timeOut = baseLockHandler.getTimeOut();
        //时间单位
        TimeUnit timeUnit = baseLockHandler.getTimeUnit();
        //通过 InterProcessMutex 该类来获取可重入共性锁
        InterProcessMutex lock = new InterProcessMutex(this.curatorFramework, path);
        //用于标识是否获取了锁
        boolean acquire = false;
        try {
            try {
                //成功获得锁后返回 true
                acquire = lock.acquire(timeOut, timeUnit);
            } catch (Exception e) {
                e.printStackTrace();
            }
            if(acquire) {
                //获得锁后回调具体的业务逻辑
                return baseLockHandler.handler();
            } else {
                //没有获得锁返回 null
                return null;
            }
        } finally {
            try {
                if(acquire) {
                    //释放锁
                    lock.release();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

}

BaseLockHandler 抽象类回调具体的业务逻辑,实现如下:

/**
 * 业务逻辑抽象类
 * @param <T>
 */
public abstract class BaseLockHandler<T> {
    //获得锁的默认超时时间,默认为 200ms
    private static final int DEFAULT_TIME_OUT = 200;
    //加锁的资源路径
    private String path;

    public BaseLockHandler(String path) {
        this.path = path;
    }

    /**
     * 具体的业务实现逻辑,重写该方法
     * @return
     */
    public abstract T handler();

    /**
     * 返回加锁的路径
     * @return
     */
    public String getPath() {
        return this.path;
    }

    /**
     * 返回加锁的超时时间
     * @return
     */
    public int getTimeOut() {
        return DEFAULT_TIME_OUT;
    }

    /**
     * 时间单位
     * @return
     */
    public TimeUnit getTimeUnit() {
        return TimeUnit.MILLISECONDS;
    }
}

测试

编写测试用例类进行测试:

/**
 * LockComponent的测试类
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class LockComponentTest {

    /**
     * 表示开启多个线程并行执行
     */
    private static final int THREAD_COUNT = 100;

    @Autowired
    private ShardReentrantLockComponent lockComponent;

    /**
     * 用来模拟 zookeeper 服务
     */
    private TestingServer server;

    /**
     * 用来实现具体逻辑,对该计数器加1
     */
    private int count;

    @Before
    public void before() throws Exception {
        //模拟一个zookeeper节点,端口号为 2181
       server = new TestingServer(2181);
    }

    @After
    public void after() {
        if(server != null) {
            //关闭资源
            CloseableUtils.closeQuietly(server);
        }
    }

    /**
     * 不加锁实现多个线程同时对 count 执行 ++ 操作
     * 会出现数据不一致现象
     * @throws Exception
     */
    @Test
    public void noAcquireLockTest() throws Exception {
        //初始化一个拥有 100 个线程的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
        //使用 CountDownLatch 实现线程的协调
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
        for(int i = 0; i < THREAD_COUNT; i++) {
            final int index = i;
            //提交线程
            executorService.submit(() -> {
                //name 表示该线程的名称
                String name = "client" + (index + 1);
                //执行 count++
                count++;
                try {
                    //睡眠 50ms ,使测试结果更明显
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //打印各个线程执行结果
                System.out.println(name + "    执行业务方法,对 count 执行 ++ 操作后 count 的值为 : " + count);
                //调用countDown方法,表示该线程执行完毕
                countDownLatch.countDown();
            });
        }
        //使该方法阻塞住,不然看不到效果
        countDownLatch.await();
    }

    /**
     * 使用 zookeeper 加锁实现多个线程同时对 count 执行 ++ 操作
     * @throws Exception
     */
    @Test
    public void acquireLockTest() throws Exception {
        //要加锁节点的路径
        String path = "/path/test";
        //初始化一个拥有 100 个线程的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
        //使用 CountDownLatch 实现线程的协调
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);

        for(int i = 0; i < THREAD_COUNT; i++) {
            final int index = i;
            //提交线程
            executorService.submit(() -> {
                //name 表示该线程的名称
                String name = "client" + (index + 1);
                //result 获取执行完业务逻辑后返回值
                String result = null;
                while (result == null) {
                    //result 为 null 表示没有的锁,会继续执行while循环去竞争获取锁
                    result = lockComponent.acquireLock(new BaseLockHandler<String>(path) {

                        //执行具体的业务逻辑
                        @Override
                        public String handler() {
                            //执行 count++
                            count++;
                            try {
                                //睡眠 50ms ,使测试结果更明显
                                Thread.sleep(50);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            //打印各个线程执行结果
                            System.out.println(name + "    执行业务方法,对count++ : " + count);

                            //执行成功后不要返回null,如果返回null,会继续执行while去竞争获取锁
                            return this.getPath();
                        }
                    });
                }
                //调用countDown方法,表示该线程执行完毕
                countDownLatch.countDown();
            });
        }
        //使该方法阻塞住,不然看不到效果
        countDownLatch.await();
    }

}

测试的业务逻辑是对 count 局部变量执行 ++ 操作,这里编写了两个方法:

  • noAcquireLockTest 该方法实现是通过开启 100 个线程,以不加锁的方法,并行的对 count 执行 ++ 操作;
  • acquireLockTest 该方法通过获取 zookeeper 的加锁机制,开启 100 个线程,并行的对 count 执行 ++ 操作;

不加锁的执行方式结果如下所示:


不加锁的执行方式结果

可以看到,数据已经出现不一致现象。

使用 zookeeper 加锁执行结果如下所示:


使用 zookeeper 加锁执行结果

从右边计数器 count 的值可以看出,加锁操作是成功的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,776评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,527评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,361评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,430评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,511评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,544评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,561评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,315评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,763评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,070评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,235评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,911评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,554评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,173评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,424评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,106评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,103评论 2 352

推荐阅读更多精彩内容