Zookeeper特性
- 一致性:数据按照顺序分批入库
- 原子性:事务要么成功要么失败
- 单一视图:客户端连接任一zookeeper节点,数据都是一致的
- 可靠性:每次操作都会保存
- 实时性:客户端可以读取到最新数据
zoo.fig配置
- tickTime: 计算时间单元,毫秒
- initLimit: 用于集群,允许从节点到主节点的初始连接时间,是tickTime的倍数
- syncLimit: 心跳机制请求和应答时间长度
- dataDit: 数据存储目录
- dataLogDir: 日志目录
- clientPort: 服务器端口,默认2181
Zookeeper基本功能
- 树结构,类似linux结构
- ./zkCli.sh 启动客户端
- 节点选举,主节点挂了,从节点会接手工作,保证高可用
- 统一配置文件管理,即只需部署一台服务器,则可以把相同的配置文件同步更新到其他所有服务器
- 发布订阅,发布者将数据存在znode上,订阅者读取
- 分布式锁,不同进程会争夺资源
- 集群管理, 保证数据的强一致性
命令
- ls 查看子节点
- ls2 查看子节点+stat
- get stat 显示基本信息
- create /demo/demo demo (-s 有序 , -e 临时)
- set修改 set /demo newdemo 1(1是版本号,乐观锁)
- delete /demo 1(只会删除版本号为1的节点)
session原理
- 客户端与服务端之间的连接存在会话
- 每个会话都可以设置超时时间
- 心跳结束,session就会过期
- session过期,临时结点znode会被抛弃
watcher机制
- 针对每个节点的操作
- 当某个节点znode发生变化,会触发watcher
- 一次性(通过插件实现永久)
- 增删改都会触发
- 创建stat /demo watch 给demo节点加触发器get /demo watch ls /demo watch
- 修改子节点不会触发父节点的触发器
ACL权限控制
四种ACL模式 world,auth,digest,ip
- 针对节点设置相关的读写权限
- getAcl 获取某节点权限信息
- setAcl 设置某节点信息
- addauth 输入认证授权信息
- 通过三个元素构成权限列表 scheme:某种机制 id:允许访问的用户 permissions:权限组和字符串
- crdwa权限字符串,c创建,r获取,w写数据,d删除,a修改权限
- addauth digest user:pwd 先将用户密码添加到库里
- setAcl /demo auth:wxs:123:cdrwa 设置第一次有效setAcl /names/test
- digest:wxs:du6Uisv8vNTo9rDUPJ8ikL2Ro94=:cdra安全性高
- 通过ip设置权限 setAcl /names/ip ip:192.168.37.1:cdrwa 只有通过此ip才能访问
- super超级用户,需要修改配置文件
四字命令
- 查看节点信息 echo stat | nc localhost 2181
- 查看是否在线 echo ruok | nc localhost 2181
- 查看临时节点信息 echo dump | nc localhost 2181
- 查看服务器配置 echo conf | nc localhost 2181
- 查看服务器连接信息echo cons | nc localhost 2181
- 查看环境变量 echo envi | nc localhost 2181
- mntr 健康信息 wchs 触发器信息 wchc 查看session wchp 查看path
集群搭建
- 配置文件 myid 1/2/3 对应 server 1/2/3
- 环境变量,ip配置不同,端口号相同
server.1=192.168.48.128:2888:3888
server.2=192.168.48.129:2888:3888
server.3=192.168.48.130:2888:3888
Java客户端连接
/**
* @Title: ZKConnectDemo.java
* @Package com.imooc.zk.demo
* @Description: zookeeper 连接demo演示
*/
public class ZKConnect implements Watcher {
final static Logger log = LoggerFactory.getLogger(ZKConnect.class);
public static final String zkServerPath = "192.168.1.110:2181";
// public static final String zkServerPath = "192.168.1.111:2181,192.168.1.111:2182,192.168.1.111:2183";
public static final Integer timeout = 5000;
public static void main(String[] args) throws Exception {
/**
* 客户端和zk服务端链接是一个异步的过程
* 当连接成功后后,客户端会收的一个watch通知
*
* 参数:
* connectString:连接服务器的ip字符串,
* 比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"
* 可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群
* 也可以在ip后加路径
* sessionTimeout:超时时间,心跳收不到了,那就超时
* watcher:通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,那就设置为null
* canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
* 此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用
* sessionId:会话的id
* sessionPasswd:会话密码 当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话
*/
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnect());
log.warn("客户端开始连接zookeeper服务器...");
log.warn("连接状态:{}", zk.getState());
new Thread().sleep(2000);
log.warn("连接状态:{}", zk.getState());
}
@Override
public void process(WatchedEvent event) {
log.warn("接受到watch通知:{}", event);
}
}
console:
2018-04-04 11:25:08,846 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:43)] - [WARN] 客户端开始连接zookeeper服务器...
2018-04-04 11:25:08,850 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:44)] - [WARN] 连接状态:CONNECTING
2018-04-04 11:25:08,873 [main-EventThread] [com.imooc.zk.demo.ZKConnect.process(ZKConnect.java:53)] - [WARN] 接受到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-04 11:25:10,851 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:48)] - [WARN] 连接状态:CONNECTED
会话重连
/**
*
* @Title: ZKConnectDemo.java
* @Description: zookeeper 恢复之前的会话连接demo演示
*/
public class ZKConnectSessionWatcher implements Watcher {
final static Logger log = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);
public static final String zkServerPath = "192.168.1.110:2181";
public static final Integer timeout = 5000;
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnectSessionWatcher());
// 获取sessionID和密码
long sessionId = zk.getSessionId();
byte[] sessionPassword = zk.getSessionPasswd();
log.warn("客户端开始连接zookeeper服务器...");
log.warn("连接状态:{}", zk.getState());
new Thread().sleep(1000);
log.warn("连接状态:{}", zk.getState());
new Thread().sleep(200);
// 开始会话重连
log.warn("开始会话重连...");
//多传入两个参数,SSID和密码
ZooKeeper zkSession = new ZooKeeper(zkServerPath,
timeout,
new ZKConnectSessionWatcher(),
sessionId,
sessionPassword);
log.warn("重新连接状态zkSession:{}", zkSession.getState());
new Thread().sleep(1000);
log.warn("重新连接状态zkSession:{}", zkSession.getState());
}
@Override
public void process(WatchedEvent event) {
log.warn("接受到watch通知:{}", event);
}
}
console:
2018-04-04 13:52:21,097 [main] [com.imooc.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:30)] - [WARN] 客户端开始连接zookeeper服务器...
2018-04-04 13:52:21,103 [main] [com.imooc.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:31)] - [WARN] 连接状态:CONNECTING
2018-04-04 13:52:21,141 [main-EventThread] [com.imooc.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:52)] - [WARN] 接受到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-04 13:52:22,103 [main] [com.imooc.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:33)] - [WARN] 连接状态:CONNECTED
2018-04-04 13:52:22,304 [main] [com.imooc.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:38)] - [WARN] 开始会话重连...
2018-04-04 13:52:22,306 [main] [com.imooc.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:45)] - [WARN] 重新连接状态zkSession:CONNECTING
2018-04-04 13:52:22,311 [main-EventThread] [com.imooc.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:52)] - [WARN] 接受到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-04 13:52:23,307 [main] [com.imooc.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:47)] - [WARN] 重新连接状态zkSession:CONNECTED
节点操作
- 创建节点
-
同步
//创建一个"/node"节点,数据是nodeData,权限是world:anyone:cdrwa,持久型节点
zk.create("/node", "nodeData".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); -
异步
String ctx = "{'create':'success'}";
//额外传入一个回调函数,实现StringCallback接口,ctx为回调对象
zk.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);
- 修改节点
-
同步
/** * 参数: * path:节点路径 * data:数据 * version:数据状态 */ Stat status = zkServer.getZookeeper().setData("/testnode", "xyz".getBytes(), 0); System.out.println(status.getVersion());
- 删除节点
- 同步
zk.getZookeeper().delete("/test-delete-node", 0);
-
异步(推荐)
String ctx = "{'delete':'success'}"; zk.getZookeeper().delete("/test-delete-node", 0, new DeleteCallBack(), ctx); Thread.sleep(2000);
CountDownLatch
- 需要设定初始值
- 调用countDown()使初始值减1
- 调用await()阻塞线程,直到初始值减少到0
节点查询
-
获取节点数据
//创建一个节点"/demo",设定watch为true,传入一空的stat对象会自动填充属性
byte[] resultByte = zk.getZookeeper().getData("/demo",true,stat);
String result = new String(resultByte); 获取子节点数据
-
同步
List<String> strChildList = zkServer.getZookeeper().getChildren("/imooc", true); for (String s : strChildList) { System.out.println(s); }
- 判断节点状态
//不存在返回null,存在则获取信息
Stat stat = zkServer.getZookeeper().exists("/imooc-fake", true);
操作权限
-
自定义用户
//首先注册用户,类似addauth
zkServer.getZookeeper().addAuthInfo("digest", "user1:123456".getBytes());
zkServer.getZookeeper().addAuthInfo("digest", "user2:123456".getBytes());// 自定义用户认证访问
List<ACL> acls = new ArrayList<ACL>();//创建权限列表//用户1
// Id user1 = new Id("digest", AclUtils.getDigestUserPwd("user1:123456"));//用户2
Id user2 = new Id("digest", AclUtils.getDigestUserPwd("user2:123456"));
//acls.add(new ACL(Perms.ALL, user1));
acls.add(new ACL(Perms.READ, user2));
acls.add(new ACL(Perms.DELETE | Perms.CREATE, user2));
//可以创建
zkServer.createZKNode("/aclimooc/testdigest", "testdigest".getBytes(), acls); 自定义ip