Zookeeper
一、Zookeeper介绍
Zookeeper本身是Haddoop生态圈中的一个组件,Zookeeper强大的功能在Java分布式架构中也被频繁的使用。
Zookeeper的作用:
- 注册中心(类似Eureka的作用,但是Eureka是属于SpringCloud的组件)
- 统一的配置管理(类似Config)
- 集群管理
- 实现分布式锁、分布式任务
- 队列的管理
Zookeeper就是一个文件系统+监听通知机制
二、Zookeeper安装
使用docker安装Zookeeper
Step1:创建一个文件夹
cd /opt
mkdir docker_zk
cd docker_zk/
vi docker-compose.yml
Step2:编写docker-compose.yml文件
version: "3.1"
services:
zk: # 服务的名称
image: daocloud.io/daocloud/zookeeper:latest
restart: always
container_name: zk # 容器的名称
ports:
- 2181:2181 # 宿主机端口:容器内端口
Step3:启动这个容器
docker -compose up -d # 启动
docker ps # 查看容器id
docker exec -it 5c bash # 通过容器id(比如5c)进入到Zookeeper容器内部
cd /bin
./zkCli.sh # 表示用客户端去连接Zookeeper服务
三、Zookeeper架构
3.1 Zookeeper的架构图
- 每一个节点都叫做znode
- 每一个znode中都可以存储数据
- 同一个父节点的znode的名称是不允许重复的
3.2 znode类型
-
持久节点
永久的保存在Zookeeper中
-
持久有序节点(可以实现队列功能)
永久的保存在Zookeeper中,Zookeeper自动会给节点添加一个有序的序号。
(比如创建时是 /xx 创建后查看节点名称时是 /xx000001)
-
临时节点
当存储的客户端和Zookeeper断开连接时,这个临时节点自动删除
-
临时有序节点(主要用于实现分布式锁和分布式任务)
当存储的客户端和Zookeeper断开连接时,这个临时节点自动删除,Zookeeper自动会给节点添加一个有序的序号
3.3 Zookeeper的监听通知机制
客户端可以去监听Zookeeper中的znode节点,当znode改变时(增删改)就会通知监听当前znode的客户端
利用这个监听通知机制可以实现类似Eureka注册中心、集群管理、Config的功能。把配置放在znode中,znode中的数据改变了就会通知到客户端,客户端就可以根据通知做出一些响应。
四、Zookeeper的常用命令
-
查询
# 查询当前节点下的所有子节点 ls 节点名称 # 例如 ls /
# 查询当前节点下的数据 get 节点名称 # 例如 get /lkf
-
创建节点
creat [-s] [-e] znode节点名称 节点的数据 # -s:表示创建的是有序节点 # -e:表示创建的是临时节点 # 节点名称相关: /lkf 就是默认创建在根节点下, /lkf/xx 就是在/lkf这个节点下创建/xx子节点 # 例如 create /lkf aaaaa
-
修改节点数据
# 会直接覆盖 set 节点名称 新数据 # 例如 set /lkf bbbbbb
-
删除节点
# 删除没有子节点的节点 delete 节点名称
# 删除当前节点和它的所有子节点 rmr 节点名称
五、Zookeeper集群
5.1 Zookeeper集群架构图
- Zookeeper集群是有主从之分的
- Zookeeper集群中的主节点称为leader,从节点称为follower
- Zookeeper集群中必须有leader节点,否则Zookeeper集群无法正常工作
- leader执行读写操作,follower只执行读操作
- Zookeeper集群在没有leader时会触发投票机制,选取出leader
5.2 Zookeeper集群中节点的角色
-
Leader
主节点
-
Follower(默认的从节点)
从节点,参与选举全新的Leader
-
Observer
从节点,不参与投票
-
Looking
正在找Leader节点(一般看不到),如果找到之后就会变成Follower或Observer
5.3 Zookeeper集群的投票策略
Zookeeper的特性:
- 每一个Zookeeper服务都会被分配一全局唯一的myid(myid是一个数字,在创建Zookeeper时在docker-compose.yml文件中指定)。
- Zookeeper在执行写数据时,每一个节点都有一个自己的FIFO队列,保证写每一个数据的时候,顺序是不会乱的。Zookeeper还会给每一个数据分配一个全局唯一的zxid,数据越新,则zxid越大。
投票策略:
- 选举出zxid最大的节点作为Leader(因为zxid最大的数据最新)
- 在zxid相同的节点中,选举出一个myid最大的节点作为Leader
5.4 搭建Zookeeper集群
用docker-compose搭建三个Zookeeper节点
version: "3.1"
services:
zk1:
image: zookeeper
restart: always
container_name: zk1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1 # 指定当前Zookeeper节点的myid
## 格式:
# server.myid=服务名称:Zookeeper节点间通讯的端口:投票时用的端口;容器内的端口
# 有几个节点就写几个
ZOO_SERVERS: server.1=zk1:2888:3888;2821 server.2=zk2:2888:3888;2821 server.3=zk3:2888:3888;2821
zk2:
image: zookeeper
restart: always
container_name: zk2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zk1:2888:3888;2821 server.2=zk2:2888:3888;2821 server.3=zk3:2888:3888;2821
zk3:
image: zookeeper
restart: always
container_name: zk3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zk1:2888:3888;2821 server.2=zk2:2888:3888;2821 server.3=zk3:2888:3888;2821
docker -compose up -d # 启动
docker ps # 查看容器id
docker exec -it 5c bash # 通过容器id(比如5c)进入到Zookeeper容器内部
cd /bin # 进入到容器内的/bin目录
./zkServer.sh status # 查看当前Zookeeper节点的角色
注意:
- 当前的3个Zookeeper节点是同时启动的,所以选举出zk3作为leader,但是如果把zk3关闭就会选举出zk3作为leader,然后再把zk3打开zk3不会变成leader,因为投票选举leader的操作是在没有leader的情况下才会进行。
- Zookeeper集群至少要有2个节点,只有1个节点的Zookeeper集群会报错不能运行。
六、Java操作Zookeeper
6.1 Java连接Zookeeper
Step1:创建Maven工程
Step2:导入依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
Step3:编写连接Zookeeper的工具类
public class ZkUtil {
public static CuratorFramework getCf() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 2); // 指定重试策略(当前是每3000ms重试1次, 一共重试2次)
CuratorFramework cf = CuratorFrameworkFactory.builder()
// 指定要连接的所有Zookeeper节点
.connectString("192.168.199.109:2821, 192.168.199.109:2822, 192.168.199.109:2823")
.retryPolicy(retryPolicy)
.build();
cf.start(); // 开启后才能连接
return cf;
}
}
Step4:测试(不报错就表示连接成功)
public class TestZk {
@Test
public void testConnect() {
CuratorFramework cf = ZkUtil.getCf();
}
}
6.2 Java操作znode节点
-
查询
public class Demo { CuratorFramework cf = ZkUtil.getCf(); /** * 查询节点的所有子节点 **/ @Test public void getChildren() throws Exception{ List<String> znodes = cf.getChildren().forPath("/"); for (String znode: znodes) { System.out.println(znode); } } /** * 查询节点的数据 **/ @Test public void getData() throws Exception{ byte[] bytes = cf.getData().forPath("/lkf"); System.out.println(new String(bytes, "UTF-8")); } }
-
添加
public class Demo { CuratorFramework cf = ZkUtil.getCf(); /** * 添加节点和数据 **/ @Test public void creat() throws Exception{ // PRESISTENT: 持久节点 PRESISTENT_SEQUENTINAL: 持久有序节点 // EPHEMERAL: 临时节点 EPHEMERAL_SEQUENTINAL: 临时有序节点 // 指定节点名称和数据(数据是字节数组) cf.creat().withMode(CreatMode.PRESISTENT).forPath("/lkf2", "uuuu".getBytes()); } }
-
修改
public class Demo { CuratorFramework cf = ZkUtil.getCf(); /** * 修改节点的数据 **/ @Test public void update() throws Exception{ // 指定节点名称和修改后的数据(字节数组) cf.setData().forPath("/lkf2", "ooooo".getBytes()); } }
-
删除
public class Demo { CuratorFramework cf = ZkUtil.getCf(); /** * 如果节点有子节点就删除其下的所有子节点和节点 * 如果节点没有节点就直接删除节点 **/ @Test public void delete() throws Exception{ // 指定节点名称和修改后的数据(字节数组) cf.delete().deletingChildrenIfNeeded().forPath("/lkf2"); } }
-
查看znode的状态
public class Demo { CuratorFramework cf = ZkUtil.getCf(); /** * 查看znode的状态 * 返回的Stat里的各个状态属性是long类型, 想转换为时间显示的话用dateFormat **/ @Test public void stat() throws Exception{ Stat stat = cf.checkExists().forPath("/lkf"); System.out.println(stat); } }
6.3 Java实现Zookeeper的监听通知机制
public class Demo {
CuratorFramework cf = ZkUtil.getCf();
@Test
public void listen() throws Exception {
// 1. 创建NodeCache对象, 指定连接的Zookeeper(或Zookeeper集群)和要监听的节点
NodeCache nodeCache = new NodeCache(cf, "/lkf");
// 2. 添加一个监听器
nodeCache.getListenable().addListener(new NodeCacheLisener() {
@Override
public void nodeChanged() throws Exception {
String path = nodeCache.getCurrentData().getPath();
byte[] data = nodeCache.getCurrentData().getData();
Stat stat = nodeCache.getCurrentData().getStat();
System.out.println("监听的节点是:" + path);
System.out.println("节点现在的数据是:" + new String(data, "UTF-8"));
System.out.println("节点的状态是:" + stat);
}
});
System.out.println("开始监听 ...");
System.in.read(); // 防止程序停止监听器失效
}
}
测试: 到容器中操作一下节点,查看监听的打印结果是否正确。