Zookeeper的常用开源客户端有ZkClient 和 Curator,网上可以找到很多关于这两个客户端的资料。本文将要讲述的不是这两个客户端,而是一个更加轻量级的Zookeeper客户端:de-zookeeper
完整代码已经提交到github上:
https://github.com/huangyanxiong/de-framework/tree/master/de-zookeeper
下面将详细讲述de-zookeeper的代码细节和使用方法。
1、ZookeeperProperty.java
该类用于封装 Zookeeper 的连接属性,包括IP和端口、路径、超时时间三个属性,并提供带参构造方法和对应的 get/set 方法:
package com.dataeye.zookeeper;
public class ZookeeperProperty {
private String zkConnnectionStr; // 连接的IP和端口,比如: localhost:2181
private String zNodePath; // zk路径,比如:/data/work
private int sessionTimeout; // 超时时间,毫秒,比如 60000
public ZookeeperProperty(String zkConnnectionStr, String zNodePath, int sessionTimeout) {
this.zkConnnectionStr = zkConnnectionStr;
this.zNodePath = zNodePath;
this.sessionTimeout = sessionTimeout;
}
public String getZkConnnectionStr() {
return zkConnnectionStr;
}
public void setZkConnnectionStr(String zkConnnectionStr) {
this.zkConnnectionStr = zkConnnectionStr;
}
public String getzNodePath() {
return zNodePath;
}
public void setzNodePath(String zNodePath) {
this.zNodePath = zNodePath;
}
public int getSessionTimeout() {
return sessionTimeout;
}
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
}
2、ZooKeeperListener.java
这是描述客户端行为的接口。这里Zookeeper监听三种行为:
connected:建立连接
nodeValueChange:节点的值发生改变
nodeChildChange:子节点发生变化
package com.dataeye.zookeeper;
import java.util.Map;
public interface ZooKeeperListener {
void nodeChildChange(Map<String, String> newChildrenValue);
void nodeValueChange(String newValue);
void connected();
}
3、NodeValueCodec.java
节点的编码和解码类,提供 decode 和 encode 接口
package com.dataeye.zookeeper;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import static java.util.Objects.requireNonNull;
public interface NodeValueCodec<T> {
NodeValueCodec DEFAULT = DefaultNodeValueCodec.INSTANCE;
default Set<T> decodeAll(byte[] zNodeValue) {
requireNonNull(zNodeValue, "zNodeValue");
return decodeAll(new String(zNodeValue, StandardCharsets.UTF_8));
}
Set<T> decodeAll(String zNodeValue);
default T decode(byte[] zNodeValue) {
requireNonNull(zNodeValue, "zNodeValue");
return decode(new String(zNodeValue, StandardCharsets.UTF_8));
}
T decode(String zNodeValue);
byte[] encodeAll(Iterable<T> entries);
byte[] encode(T entry);
}
4、DefaultNodeValueCodec.java
NodeValueCodec接口的实现类,具体实现了编码和解码的过程。
这里的编码过程比较简单,只是把Worker类的两个属性: id 和 biz使用冒号分割开来,返回类似于 id:biz 这样的字符串;解码过程则相反。
package com.dataeye.zookeeper;
import com.dataeye.crawler.thrift.Worker;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;
import static java.util.Objects.requireNonNull;
public final class DefaultNodeValueCodec implements NodeValueCodec<Worker> {
public static final DefaultNodeValueCodec INSTANCE = new DefaultNodeValueCodec();
private static final String segmentDelimiter = ",";
private static final String fieldDelimiter = ":";
private static final Pattern SEGMENT_DELIMITER = Pattern.compile("\\s*" + segmentDelimiter + "\\s*");
@Override
public Worker decode(String segment) {
final String[] tokens = segment.split(fieldDelimiter);
String workerId = tokens[0];
String biz = tokens[1];
final Worker worker = new Worker(workerId, biz);
return worker;
}
@Override
public Set<Worker> decodeAll(String valueString) {
Set<Worker> workers = new HashSet<>();
try {
for (String segment : SEGMENT_DELIMITER.split(valueString)) {
workers.add(decode(segment));
}
} catch (Exception e) {
throw new RuntimeException("invalid worker list: " + valueString, e);
}
if (workers.isEmpty()) {
throw new RuntimeException("ZNode does not contain any workers.");
}
return workers;
}
@Override
public byte[] encodeAll(Iterable<Worker> workers) {
requireNonNull(workers, "workers");
StringBuilder nodeValue = new StringBuilder();
workers.forEach(worker -> nodeValue.append(worker.getId()).append(fieldDelimiter).append(
worker.getBiz()).append(segmentDelimiter));
//delete the last unused segment delimiter
if (nodeValue.length() > 0) {
nodeValue.deleteCharAt(nodeValue.length() - 1);
}
return nodeValue.toString().getBytes(StandardCharsets.UTF_8);
}
@Override
public byte[] encode(Worker worker) {
return (worker.getId() + fieldDelimiter + worker.getBiz()).getBytes(StandardCharsets.UTF_8);
}
}
5、zk.thrift
该客户端使用thrift生成序列化对象:
namespace java com.dataeye.crawler.thrift
struct Worker {
//worker的标识
1: required string id,
//worker所属的业务
2: required string biz
}
6、ZooKeeperConnector.java
该类负责ZooKeeper的连接,节点操作,节点监听等功能,是该客户端的核心类
package com.dataeye.zookeeper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import static java.util.Objects.requireNonNull;
import static org.apache.zookeeper.KeeperException.Code.get;
public final class ZooKeeperConnector {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperConnector.class);
private final String zkConnectionStr;
private final String zNodePath;
private final int sessionTimeout;
private final ZooKeeperListener listener;
private ZooKeeper zooKeeper;
private BlockingQueue<KeeperState> stateQueue;
private CountDownLatch latch;
private boolean activeClose;
private String prevNodeValue;
private Map<String, String> prevChildValue;
public ZooKeeperConnector(String zkConnectionStr, String zNodePath, int sessionTimeout,
ZooKeeperListener listener
) {
this.zkConnectionStr = requireNonNull(zkConnectionStr, "zkConnectionStr");
this.zNodePath = requireNonNull(zNodePath, "zNodePath");
this.sessionTimeout = sessionTimeout;
this.listener = requireNonNull(listener, "listener");
}
/**
* Do connect.
*/
public void connect() {
try {
activeClose = false;
latch = new CountDownLatch(1);
zooKeeper = new ZooKeeper(zkConnectionStr, sessionTimeout, new ZkWatcher());
latch.await();
notifyConnected();
notifyChange();
if (stateQueue != null) {
//put a fake stat to ensure method postConnected finished completely
//(so that it won't throw exception under ZooKeeper connection recovery test)
stateQueue.put(KeeperState.Disconnected);
}
} catch (Exception e) {
throw new ZooKeeperException(
"failed to connect to ZooKeeper: " + zkConnectionStr + " (" + e + ')', e);
}
}
/**
* Utility method to create a node.
* @param nodePath node name
* @param value node value
*/
public void createChild(String nodePath, byte[] value, CreateMode createMode) {
// CreateMode.EPHEMERAL 临时节点
// CreateMode.PERSISTENT 持久化节点
try {
//first check the parent node
if (zooKeeper.exists(zNodePath, false) == null) {
//parent node not exist, create it
zooKeeper.create(zNodePath, zNodePath.getBytes(StandardCharsets.UTF_8),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
if (zooKeeper.exists(zNodePath + '/' + nodePath, true) == null) {
zooKeeper.create(zNodePath + '/' + nodePath, value, Ids.OPEN_ACL_UNSAFE, createMode);
} else {
throw new ZooKeeperException("failed to create ZooKeeper Node:" + zNodePath + '/' + nodePath +
",because the path exist already.");
}
} catch (Exception e) {
throw new ZooKeeperException(
"failed to create ZooKeeper Node: " + zkConnectionStr + " (" + e + ')', e);
}
}
/**
* Closes the underlying Zookeeper connection.
* @param active if it is closed by user actively ? or passively by program because of underlying
* connection expires
*/
public void close(boolean active) {
try {
activeClose = active;
zooKeeper.close();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Notify listener that ZooKeeper connection has been established.
*/
private void notifyConnected() {
listener.connected();
}
/**
* Notify listener that a node value or node children has changed.
*/
private void notifyChange() {
//forget it if event was triggered by user's actively closing EndpointGroup
if (activeClose) {
return;
}
List<String> children;
byte[] newValueBytes;
try {
if (zooKeeper.exists(zNodePath, true) == null) {
return;
}
children = zooKeeper.getChildren(zNodePath, true);
newValueBytes = zooKeeper.getData(zNodePath, false, null);
if (newValueBytes != null) {
String newValue = new String(newValueBytes, StandardCharsets.UTF_8);
if (prevNodeValue == null || !prevNodeValue.equals(newValue)) {
listener.nodeValueChange(newValue);
prevNodeValue = newValue;
}
}
//check children status
if (children != null) {
Map<String, String> newChildValue = new HashMap<>();
children.forEach(child -> {
try {
newChildValue.put(child,
new String(zooKeeper.getData(zNodePath + '/' + child,
false, null), StandardCharsets.UTF_8));
} catch (Exception e) {
throw new ZooKeeperException(e);
}
});
if (prevChildValue == null || !prevChildValue.equals(newChildValue)) {
listener.nodeChildChange(newChildValue);
prevChildValue = newChildValue;
}
}
} catch (Exception ex) {
throw new ZooKeeperException("Failed to notify ZooKeeper listener", ex);
}
}
/**
* A ZooKeeper watch.
*/
final class ZkWatcher implements Watcher, StatCallback {
@Override
public void process(WatchedEvent event) {
if (stateQueue != null) {
enqueueState(event.getState());
}
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// Connection state has been changed. Keep retrying until the connection is recovered.
switch (event.getState()) {
case Disconnected:
break;
case SyncConnected:
// We are here because of one of the following:
// - initial connection,
// - reconnection due to session timeout or
// - reconnection due to session expiration
// Once connected, reset the retry delay.
latch.countDown();
break;
case Expired:
// Session expired usually happens when a client reconnected to the server after
// long time network partition, exceeding the configured
// session timeout. We need to reconstruct the ZooKeeper client.
// First, clean the original handle.
close(false);
zooKeeper = null;
try {
if (!activeClose) {
connect();
}
} catch (ZooKeeperException e) {
logger.warn("Failed to attempt to recover a ZooKeeper connection", e);
}
break;
}
} else {
if (path != null && path.startsWith(zNodePath)) {
// Something has changed on the node, let's find out.
try {
zooKeeper.exists(path, true, this, null);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
@Override
public void processResult(int responseCodeInt, String path, Object ctx, Stat stat) {
Code responseCode = get(responseCodeInt);
switch (responseCode) {
case OK:
break;
case NONODE:
break;
case SESSIONEXPIRED:
// Ignore this and let the zNode Watcher process it first.
case NOAUTH:
// It's possible that this happens during runtime. We ignore this and wait for the ACL
// configuration returns to normal. If it happens when the ZooKeeper client is initially
// constructed, the constructor will throw an exception.
return;
default:
// Retry on recoverable errors. Fatal errors go to the process() method above.
try {
zooKeeper.exists(path, true, this, null);
} catch (Exception ex) {
throw new ZooKeeperException("Failed to process ZooKeeper callback event", ex);
}
return;
}
if (!activeClose) {
notifyChange();
//enqueue an end flag to force the main thread to wait until this callback finished before exit
if (stateQueue != null) {
enqueueState(KeeperState.Disconnected);
}
}
}
/**
* Enqueue the state.
* @param state ZooKeeper state
*/
private void enqueueState(KeeperState state) {
if (stateQueue == null) {
return;
}
try {
stateQueue.put(state);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
7、ZKClient.java
Zookeeper客户端
package com.dataeye.zookeeper.client;
import com.dataeye.crawler.thrift.Worker;
import com.dataeye.zookeeper.DefaultNodeValueCodec;
import com.dataeye.zookeeper.ZooKeeperConnector;
import com.dataeye.zookeeper.ZooKeeperListener;
import com.dataeye.zookeeper.ZookeeperProperty;
import java.util.*;
import java.util.stream.Collectors;
public class ZKClient {
public static void main(String[] args) {
String zkConn = "localhost:2181";
DefaultNodeValueCodec nodeValueCodec = DefaultNodeValueCodec.INSTANCE;
ZookeeperProperty zookeeperProperty = new ZookeeperProperty(zkConn, "/de-spider/works", 60000);
ZooKeeperConnector zooKeeperConnector = new ZooKeeperConnector(
zookeeperProperty.getZkConnnectionStr(),
zookeeperProperty.getzNodePath(),
zookeeperProperty.getSessionTimeout(),
new ZooKeeperListener() {
@Override
public void nodeChildChange(Map<String, String> newChildrenValue) {
List<Worker> newWorkers = newChildrenValue.values().stream().map(nodeValueCodec::decode).filter(Objects::nonNull).collect(Collectors.toList());
Map<String, Set<Worker>> newBizWorkers = new HashMap<>();
if (newWorkers != null) {
for (Worker worker : newWorkers) {
String biz = worker.biz;
if (!newBizWorkers.containsKey(biz)) {
newBizWorkers.put(biz, new HashSet<>());
}
newBizWorkers.get(biz).add(worker);
}
}
}
@Override
public void nodeValueChange(String newValue) {
}
@Override
public void connected() {
}
});
zooKeeperConnector.connect();
}
}