本篇来介绍ZooKeeper的数据结构,也可以说是存储结构。
ZooKeeper的数据库对应的类是ZKDatabase:
protected DataTree dataTree;
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
dataTree存储了具体的数据,sessionsWithTimeouts存储了全局session的id和超时值。
看一下DataTree:
private final NodeHashMap nodes;
private IWatchManager dataWatches;
private IWatchManager childWatches;
private final AtomicLong nodeDataSize = new AtomicLong(0);
private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
private final Set<String> containers = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final Set<String> ttls = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
private final ReferenceCountedACLCache aclCache = new ReferenceCountedACLCache();
- nodes存储了所有的节点,是数据实际存储的地方。
- dataWatches存储了对于节点本身数据的watcher。
- childWatches存储了对于节点的子节点变更的watcher。
- nodeDataSize所有节点的总大小。
- ephemerals,存储了session对应的临时节点。
- containers存储了所有container节点。
- ttls存储了所有设置了TTL(存活时间Time To Live)的节点。
- aclCache存储了有效的访问权限控制列表。
看一下NodeHashMapImpl:
private final ConcurrentHashMap<String, DataNode> nodes;
nodes存储了节点的路径和节点详情。
看一下DataNode:
private volatile long digest;
byte[] data;
Long acl;
public StatPersisted stat;
private Set<String> children = null;
- digest存储节点的hash值。
- data存储节点的数据。
- acl存储节点的访问权限控制列表的索引。
- stat存储节点的状态。
- children存储节点的子节点的路径。
看一下StatPersisted:
private long czxid;
private long mzxid;
private long ctime;
private long mtime;
private int version;
private int cversion;
private int aversion;
private long ephemeralOwner;
private long pzxid;
- czxid存储创建zxid号。
- mzxid存储更改zxid号。
- ctime存储创建时间戳。
- mtime存储修改时间戳。
- version修改版本号,每次修改数据会+1。
- cversion是childVersion,实际存储的创建过多少个子节点,删除不会递减。顺序节点就是通过这个值实现的。
- aversion是acl版本号,每次修改acl会+1。
- ephemeralOwner,存储的临时节点对应的sessionid。或者是0x8000000000000000代表这是一个container节点。最高位为ff的情况下,代表这是一个ttl节点,低位存储ttl值。
- pzxid,最后一次添加或者删除子节点的zxid。
我们来看一下创建节点的过程,首次看DataTree的createNode方法:
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
提取出父节点的全名和子节点的名字。
StatPersisted stat = createStat(zxid, time, ephemeralOwner);
为子节点创建StatPersisted。
DataNode parent = nodes.get(parentName);
if (parent == null) {
throw new KeeperException.NoNodeException();
}
获取父节点对应的DataNode,若不存在,抛出节点不存在的异常。
Set<String> children = parent.getChildren();
if (children.contains(childName)) {
throw new KeeperException.NodeExistsException();
}
判断节点是否已经存在,若已经存在,则抛出节点已存在异常。
DataNode child = new DataNode(data, longval, stat);
parent.addChild(childName);
创建节点。
nodeDataSize.addAndGet(getNodeSize(path, child.data));
nodes.put(path, child);
更新总大小并存储该节点。
if (ephemeralType == EphemeralType.CONTAINER) {
containers.add(path);
} else if (ephemeralType == EphemeralType.TTL) {
ttls.add(path);
} else if (ephemeralOwner != 0) {
HashSet<String> list = ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet<String>();
ephemerals.put(ephemeralOwner, list);
}
synchronized (list) {
list.add(path);
}
}
更新对应节点类型的集合。
if (parentName.startsWith(quotaZookeeper)) {
// now check if its the limit node
if (Quotas.limitNode.equals(childName)) {
// this is the limit node
// get the parent and add it to the trie
pTrie.addPath(parentName.substring(quotaZookeeper.length()));
}
if (Quotas.statNode.equals(childName)) {
updateQuotaForPath(parentName.substring(quotaZookeeper.length()));
}
}
判断创建的是否是quota节点(一类特殊节点,zookeeper_limits存储配额信息,zookeeper_stats存储节点的统计信息),pTrie是PathTrie的实例,用于存储和搜索最大匹配的前缀路径。如果是zookeeper_limits节点,则调用pTrie.addPath将路径加入追踪。如果是zookeeper_stats节点,则调用updateQuotaForPath更新统计信息,统计信息包括节点数(包括节点本身和后代节点,相当于以该节点为根,整个树的节点数。),数据总量(同节点数统计方式)。节点/foo/bar对应的配额限制节点是/zookeeper/quota/foo/bar/zookeeper_limits,对应的统计信息节点是/zookeeper/quota/foo/bar/zookeeper_stats。
String lastPrefix = getMaxPrefixWithQuota(path);
long bytes = data == null ? 0 : data.length;
if (lastPrefix != null) {
updateCountBytes(lastPrefix, bytes, 1);
}
getMaxPrefixWithQuota搜索最大匹配的前缀路径,如/foo/bar设置了quota,当前添加的节点为/foo/bar/baz,则返回的是/foo/bar。如果该节点的某个祖先节点被设置了quota,则更新该quota的信息。
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
触发watch的事件。
我们顺便来看一下watch的触发过程,触发的方法是WatchManager的triggerWatch方法:
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
watchers存储需要触发的watcher,getPathParentIterator获取一个由子节点到父节点的迭代器,如/foo/ba/baz的迭代顺序是/foo/bar/baz、/foo/bar、/foo和/。
接下来是对pathParentIterator作迭代操作。
Set<Watcher> thisWatchers = watchTable.get(localPath);
if (thisWatchers == null || thisWatchers.isEmpty()) {
continue;
}
Iterator<Watcher> iterator = thisWatchers.iterator();
如果路径对应的watcher不为空,接下来迭代路径对应的watcher。
Watcher watcher = iterator.next();
WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
if (watcherMode.isRecursive()) {
if (type != EventType.NodeChildrenChanged) {
watchers.add(watcher);
}
} else if (!pathParentIterator.atParentPath()) {
watchers.add(watcher);
if (!watcherMode.isPersistent()) {
iterator.remove();
Set<String> paths = watch2Paths.get(watcher);
if (paths != null) {
paths.remove(localPath);
}
}
}
- 首先判断是不是一个递归的watcher,如果是且事件类型不是EventType.NodeChildrenChanged,则将watcher放入待触发的集合中。
- 判断是否是pathParentIterator的第一次迭代,也就是非递归watcher的情况下,路径完全匹配,则将watcher放入待触发的集合中。然后如果不是持久的(不会持久化,只是触发一次后不会删除)watcher,则移除该watcher。
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
pathParentIterator迭代完成后,执行watcher的逻辑。
return new WatcherOrBitSet(watchers);
返回选中的watcher。