Zookeeper客户端Curator使用详解(一)

转:http://throwable.coding.me/2018/12/16/zookeeper-curator-usage

前提

因为最近项目需要使用Zookeeper这个中间件,提前了解一下它的客户端Curator的使用。

简介

Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。

引子和趣闻:
Zookeeper名字的由来是比较有趣的,下面的片段摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理与实践》一书:
Zookeeper最早起源于雅虎的研究院的一个研究小组。在当时,研究人员发现,在雅虎内部很多大型的系统需要依赖一个类似的系统进行分布式协调,但是这些系统往往存在分布式单点问题。所以雅虎的开发人员就试图开发一个通用的无单点问题的分布式协调框架。在立项初期,考虑到很多项目都是用动物的名字来命名的(例如著名的Pig项目),雅虎的工程师希望给这个项目也取一个动物的名字。时任研究院的首席科学家Raghu Ramakrishnan开玩笑说:再这样下去,我们这儿就变成动物园了。此话一出,大家纷纷表示就叫动物园管理员吧——因为各个以动物命名的分布式组件放在一起,雅虎的整个分布式系统看上去就像一个大型的动物园了,而Zookeeper正好用来进行分布式环境的协调——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士军刀,它译作”馆长”或者’’管理者’’,不知道是不是开发小组有意而为之,笔者猜测有可能这样命名的原因是说明Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包含了几个包:

  • curator-framework:对zookeeper的底层api的一些封装。
  • curator-client:提供一些客户端的操作,例如重试策略等。
  • curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。

Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败):

<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-framework</artifactId>
 <version>2.12.0</version>
</dependency>
<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>2.12.0</version>
</dependency>

Curator的基本Api

创建会话

1.使用静态工程方法创建客户端

一个例子如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
 connectionInfo,
 5000,
 3000,
 retryPolicy);

newClient静态工厂方法包含四个主要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

2.使用Fluent风格的Api创建会话

核心参数变为流式设置,一个列子如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
 .connectString(connectionInfo)
 .sessionTimeoutMs(5000)
 .connectionTimeoutMs(5000)
 .retryPolicy(retryPolicy)
 .build();

3.创建包含隔离命名空间的会话

为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下面的例子)当客户端指定了独立命名空间为“/base”,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的。通过设置Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在多个应用共用一个Zookeeper集群的场景下,这对于实现不同应用之间的相互隔离十分有意义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
 CuratorFramework client =
 CuratorFrameworkFactory.builder()
 .connectString(connectionInfo)
 .sessionTimeoutMs(5000)
 .connectionTimeoutMs(5000)
 .retryPolicy(retryPolicy)
 .namespace("base")
 .build();

启动客户端

当创建会话成功,得到client的实例然后可以直接调用其start( )方法:

client.start();

数据节点操作

创建数据节点

Zookeeper的节点创建模式

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带序列号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时并且带序列号

创建一个节点,初始内容为空

client.create().forPath("path");

注意:如果没有设置节点属性,节点创建模式默认为持久化节点,内容默认为空

创建一个节点,附带初始化内容

client.create().forPath("path","init".getBytes());

创建一个节点,指定创建模式(临时节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创建一个节点,指定创建模式(临时节点),附带初始化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes())

创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点

client.create()
 .creatingParentContainersIfNeeded()
 .withMode(CreateMode.EPHEMERAL)
 .forPath("path","init".getBytes());

这个creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点。

删除数据节点

删除一个节点

client.delete().forPath("path");

注意,此方法只能删除叶子节点,否则会抛出异常。

删除一个节点,并且递归删除其所有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

删除一个节点,强制指定版本进行删除

client.delete().withVersion(10086).forPath("path");

删除一个节点,强制保证删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。

注意:上面的多个流式接口是可以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

读取数据节点数据

读取一个节点的数据内容

client.getData().forPath("path");

注意,此方法返的返回值是byte[ ];

读取一个节点的数据内容,同时获取到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

更新数据节点数据

更新一个节点的数据内容

client.setData().forPath("path","data".getBytes());

注意:该接口会返回一个Stat实例

更新一个节点的数据内容,强制指定版本进行更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

检查节点是否存在

client.checkExists().forPath("path");

注意:该方法返回一个Stat实例,用于检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath()指定要操作的ZNode

获取某个节点的所有子节点路径

client.getChildren().forPath("path");

注意:该方法的返回值为List<string style="box-sizing: border-box;">,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode</string>

事务

CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交。一个例子如下:

client.inTransaction().check().forPath("path")
 .and()
 .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
 .and()
 .setData().withVersion(10086).forPath("path","data2".getBytes())
 .and()
 .commit();

异步接口

上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

一个异步创建节点的例子如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
 .creatingParentsIfNeeded()
 .withMode(CreateMode.EPHEMERAL)
 .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
 },executor)
 .forPath("path");

注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。

Curator食谱(高级特性)

提醒:首先你必须添加curator-recipes依赖,下文仅仅对recipes一些特性的使用进行解释和举例,不打算进行源码级别的探讨

<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>2.12.0</version>
</dependency>

重要提醒:强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态为LOST,curator-recipes下的所有Api将会失效或者过期,尽管后面所有的例子都没有使用到ConnectionStateListener。

Path Cache

Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。

实际使用时会涉及到四个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

通过下面的构造函数创建Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想使用cache,必须调用它的start方法,使用完后调用close方法。 可以设置StartMode来实现启动的模式,

StartMode有下面几种:

  1. NORMAL:正常初始化。
  2. BUILD_INITIAL_CACHE:在调用start()之前会调用rebuild()
  3. POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)可以增加listener监听缓存的变化。

getCurrentData()方法返回一个List<ChildData>对象,可以遍历所有的子节点。

设置/更新、移除其实是使用client (CuratorFramework)来操作, 不通过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:如果new PathChildrenCache(client, PATH, true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将返回null,cache将不会缓存节点数据。

注意:示例中的Thread.sleep(10)可以注释掉,但是注释后事件监听的触发次数会不全,这可能与PathCache的实现原理有关,不能太过频繁的触发事件!

Node Cache

Node Cache与Path Cache类似,Node Cache只是监听某一个特定的节点。它涉及到下面的三个类:

  • NodeCache - Node Cache实现类
  • NodeCacheListener - 节点监听器
  • ChildData - 节点数据

注意:使用cache,依然要调用它的start()方法,使用完后调用close()方法。

getCurrentData()将得到节点当前的状态,通过它的状态可以得到当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示例中的Thread.sleep(10)可以注释,但是注释后事件监听的触发次数会不全,这可能与NodeCache的实现原理有关,不能太过频繁的触发事件!

注意:NodeCache只能监听一个节点的状态变化。

Tree Cache

Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合,主要涉及到下面四个类:

  • TreeCache - Tree Cache实现类
  • TreeCacheListener - 监听器类
  • TreeCacheEvent - 触发的事件类
  • ChildData - 节点数据
public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中没有使用Thread.sleep(10),但是事件触发次数也是正常的。

注意:TreeCache在初始化(调用start()方法)的时候会回调TreeCacheListener实例一个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有可能导致空指针异常,这里应该主动处理并避免这种情况。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 198,932评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,554评论 2 375
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 145,894评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,442评论 1 268
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,347评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,899评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,325评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,980评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,196评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,163评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,085评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,826评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,389评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,501评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,753评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,171评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,616评论 2 339