使用Zookeeper解决微服务架构下分布式事务问题

准备工作

单机调试zookeeper集群的话,我们需要在虚拟机里虚拟出几台“微服务器“,做这一步操作之前需要在系统中预留出来8G以上磁盘空间,4G以上物理内存。

[if !supportLists]1. [endif]虚拟机

我们使用virtualbox

在官网下载最新版并安装

https://www.virtualbox.org/wiki/Downloads

[if !supportLists]2. [endif]操作系统

操作系统使用

CentOS-6.8-x86_64-minimal版

[if !supportLists]3. [endif]Zookeeper

下载Zookeeper 

http://zookeeper.apache.org

[if !supportLists]4. [endif]准备SSH连接工具

Xshell 或Winscp + Putty

https://winscp.net/eng/download.php

https://www.putty.org/

操作系统安装

[if !supportLists]1. [endif]加载光盘镜像

选择ISO文件

[if !supportLists]2. [endif]跳过磁盘测试

剩下的步骤按照提示下一步就可以了

Zookeeper集群安装部署

Zookeeper客户端命令

Zookeeper Java Api

使用zookeeper开发分布式锁

分布式锁算法

竞争式核心代码

此方式会在高并发场景下有缺陷

// 1.检查父节点

String path = "/locks"; //父节点名称

Stat stat = zooKeeper.exists(path, false);

  //如果 stat这个对象是空的意味着需要创建节点

if (stat == null){

//创建父节点

// createMode e临时节点 不支持 有子节点

zooKeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);


System.out.println("DistributedLock -> getLock ->父节点创建成功");

}

String lock_path = path + "/" + lockItem; //子节点完整路径

// 2.创建 带有唯一标识的 子节点

int try_max = 20;  //允许重试次数

int try_count = 1; //重试计数

long try_time = 2000; //重试间隔

try{

//创建成功

zooKeeper.create(lock_path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

}catch (Exception e) {

/**创建失败 说明节点已存在

*接下来使用while循环尝试继续创建节点

*直到成功或次数超限

 */

while (try_count < try_max) {

try_count = try_count + 1;

Thread.sleep(try_time);

try{

zooKeeper.create(lock_path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

}catch (Exception e1) {

System.out.println("DistributedLock -> getLock子节点创建中 当前次数  :" +  try_count);

continue;//重试创建失败 继续

}

break; //创建节点成功,退出循环

}

}

System.out.println("DistributedLock -> getLock子节点创建: 成功 使用次数 :" +  try_count);

队列式


Curator

Curator是Netflix公司一个开源的zookeeper客户端,在原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。同时内部实现了诸如Session超时重连,Watcher反复注册等功能,实现了Fluent风格的API接口,是使用最广泛的zookeeper客户端之一。

Curator包含了几个包:

curator-framework:对zookeeper的底层api的一些封装

curator-client:提供一些客户端的操作,例如重试策略等

curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

Maven依赖

使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x


<dependency>

            <groupId>org.apache.curator</groupId>

            <artifactId>curator-framework</artifactId>

            <version>2.12.0</version>

        </dependency>

        <dependency>

            <groupId>org.apache.curator</groupId>

            <artifactId>curator-recipes</artifactId>

            <version>2.12.0</version>

        </dependency>

建立连接

retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

CuratorFramework client =

CuratorFrameworkFactory.newClient(

                connectionString,

                        5000,

                        3000,

                        retryPolicy);


数据节点操作

@Test

/**

*创建节点

 */

public void t2()throws Exception {

String string = client.create().forPath("/ct1");

System.out.println("string:" + string);

}

@Test

/**

*展示节点

 */

public void t3()throws Exception {

List<String> forPath = client.getChildren().forPath("/");

for(String node : forPath) {

System.out.println(node);

}

}

@Test

/**

*节点类型

 */

public void t4()throws Exception {

String string = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/ct1");

System.out.println("string:" + string);

t3();

}

@Test

/**

*节点数据

 */

public void t5()throws Exception {

String string = client.create().withMode(CreateMode.PERSISTENT).forPath("/ct2","aaxxa".getBytes());

System.out.println("string:" + string);

}

@Test

/**

*获取数据

 */

public void t6()throws Exception {

t5();

byte[] forPath = client.getData().forPath("/ct2");

System.out.println("string:" + new String(forPath));

}

@Test

/**

*创建子节点

 */

public void t7()throws Exception {

client.create().forPath("/ct666/xxoo","xxoo".getBytes());

}

@Test

/**

*创建子节点并自动创建父节点

 */

public void t8()throws Exception {

client.create().creatingParentsIfNeeded().forPath("/ct666/xxoo2","xxoo".getBytes());

}

@Test

/**

*创建子节点并自动创建父节点

 */

public void t9()throws Exception {

client.create().creatingParentContainersIfNeeded().forPath("/ct666/xxoo2","xxoo".getBytes());

}

@Test

/**

*创建子节点并自动创建父节点

 */

public void t10()throws Exception {

List<String> forPath = client.getChildren().forPath("/ct666");

for(String node : forPath) {

System.out.println(node);

}

}

@Test

/**

*删除节点

 */

public void t11()throws Exception {

client.delete().forPath("/ct666/xxoo");

t10();

}

@Test

/**

*删除节点,有子节点不能删除

 */

public void t12()throws Exception {

client.delete().forPath("/ct666");

t10();

}

@Test

/**

*删除节点,有子节点 一起删

 */

public void t13()throws Exception {

client.delete().deletingChildrenIfNeeded().forPath("/ct666");

t10();

}

@Test

/**

*检查节点是否存在

 */

public void t14()throws Exception {

Stat forPath = client.checkExists().forPath("/ct666");

if(null == forPath) {

System.out.println("不存在");

}else {

client.delete().deletingChildrenIfNeeded().forPath("/ct666");

t10();

}

}

@Test

/**

*事务

* 1.检查节点是否存在

* 2.创建节点

* 3.设置数据

* 4.提交事务

 */

public void t15()throws Exception {

Collection<CuratorTransactionResult> commit = client.inTransaction()

.check().forPath("/")

.and()

.create().forPath("/order/order2")

.and()

.setData().forPath("/order/order1","xxx".getBytes())

.and()

.commit();

for (CuratorTransactionResult curatorTransactionResult : commit) {

System.out.println("curatorTransactionResult:" + curatorTransactionResult.getForPath() + curatorTransactionResult.getResultStat());

}


}


事件监听 Cache

Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。

Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。

Curator提供了三种Watcher(Cache)来监听结点的变化。

Path Cache

Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。

@Test

/**

PathChildrenCache监听子节点

 */

public void t16()throws Exception {



PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/", true );

pathChildrenCache.start();

PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {

@Override

public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {

System.out.println("事件类型:" + event.getType());

if(null != event.getData())

                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));

}

};

pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);

Thread.sleep(Long.MAX_VALUE);

}

NodeCache

Node Cache只是监听某一个特定的节点是否存在和数据变化

@Test

/**

NodeCache监听单个节点

 */

public void t17()throws Exception {

NodeCache nodeCache = new NodeCache(client, "/order");

nodeCache.start();

NodeCacheListener nodeCacheListener = new NodeCacheListener() {

@Override

public void nodeChanged() throws Exception {

ChildData currentData = nodeCache.getCurrentData();

if(null == currentData ) {

System.out.println("节点不存在");

}else {

System.out.println("currentData:" + new String(currentData.getData()));

}

}

};

nodeCache.getListenable().addListener(nodeCacheListener);

Thread.sleep(Long.MAX_VALUE);


Tree Cache

@Test

/**

TreeCache监听节点下的所有事件

 */

public void t18()throws Exception {

TreeCache treeCache = new TreeCache(client, "/order");

treeCache.start();



TreeCacheListener treeCacheListener = new TreeCacheListener() {

@Override

public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {

System.out.println("事件类型:" + event.getType());

if(null != event.getData())

                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));

}

};

treeCache.getListenable().addListener(treeCacheListener);

Thread.sleep(Long.MAX_VALUE);

}


分布式锁

可重入共享锁—Shared Reentrant Lock

不可重入共享锁—Shared Lock

可重入读写锁—Shared Reentrant Read Write Lock

多共享锁对象—Multi Shared Lock

分布式计数器

分布式队列

分布式屏障—Barrier

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容