4、Zookeeper的客户端实例

Zookeeper的常用开源客户端有ZkClient 和 Curator,网上可以找到很多关于这两个客户端的资料。本文将要讲述的不是这两个客户端,而是一个更加轻量级的Zookeeper客户端:de-zookeeper

完整代码已经提交到github上:

https://github.com/huangyanxiong/de-framework/tree/master/de-zookeeper

下面将详细讲述de-zookeeper的代码细节和使用方法。

1、ZookeeperProperty.java

该类用于封装 Zookeeper 的连接属性,包括IP和端口、路径、超时时间三个属性,并提供带参构造方法和对应的 get/set 方法:

package com.dataeye.zookeeper;

public class ZookeeperProperty {
  private String zkConnnectionStr;   // 连接的IP和端口,比如: localhost:2181
  private String zNodePath;   //  zk路径,比如:/data/work
  private int sessionTimeout;  //  超时时间,毫秒,比如  60000

  public ZookeeperProperty(String zkConnnectionStr, String zNodePath, int sessionTimeout) {
    this.zkConnnectionStr = zkConnnectionStr;
    this.zNodePath = zNodePath;
    this.sessionTimeout = sessionTimeout;
  }

  public String getZkConnnectionStr() {
    return zkConnnectionStr;
  }

  public void setZkConnnectionStr(String zkConnnectionStr) {
    this.zkConnnectionStr = zkConnnectionStr;
  }

  public String getzNodePath() {
    return zNodePath;
  }

  public void setzNodePath(String zNodePath) {
    this.zNodePath = zNodePath;
  }

  public int getSessionTimeout() {
    return sessionTimeout;
  }

  public void setSessionTimeout(int sessionTimeout) {
    this.sessionTimeout = sessionTimeout;
  }
}

2、ZooKeeperListener.java

这是描述客户端行为的接口。这里Zookeeper监听三种行为:

connected:建立连接
nodeValueChange:节点的值发生改变
nodeChildChange:子节点发生变化

package com.dataeye.zookeeper;

import java.util.Map;

public interface ZooKeeperListener {

  void nodeChildChange(Map<String, String> newChildrenValue);

  void nodeValueChange(String newValue);

  void connected();
}

3、NodeValueCodec.java

节点的编码和解码类,提供 decode 和 encode 接口

package com.dataeye.zookeeper;

import java.nio.charset.StandardCharsets;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public interface NodeValueCodec<T> {

  NodeValueCodec DEFAULT = DefaultNodeValueCodec.INSTANCE;

  default Set<T> decodeAll(byte[] zNodeValue) {
    requireNonNull(zNodeValue, "zNodeValue");
    return decodeAll(new String(zNodeValue, StandardCharsets.UTF_8));
  }

  Set<T> decodeAll(String zNodeValue);

  default T decode(byte[] zNodeValue) {
    requireNonNull(zNodeValue, "zNodeValue");
    return decode(new String(zNodeValue, StandardCharsets.UTF_8));
  }

  T decode(String zNodeValue);

  byte[] encodeAll(Iterable<T> entries);

  byte[] encode(T entry);
}

4、DefaultNodeValueCodec.java

NodeValueCodec接口的实现类,具体实现了编码和解码的过程。

这里的编码过程比较简单,只是把Worker类的两个属性: id 和 biz使用冒号分割开来,返回类似于 id:biz 这样的字符串;解码过程则相反。

package com.dataeye.zookeeper;

import com.dataeye.crawler.thrift.Worker;

import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Pattern;

import static java.util.Objects.requireNonNull;

public final class DefaultNodeValueCodec implements NodeValueCodec<Worker> {
  public static final DefaultNodeValueCodec INSTANCE = new DefaultNodeValueCodec();
  private static final String segmentDelimiter = ",";
  private static final String fieldDelimiter = ":";
  private static final Pattern SEGMENT_DELIMITER = Pattern.compile("\\s*" + segmentDelimiter + "\\s*");

  @Override
  public Worker decode(String segment) {
    final String[] tokens = segment.split(fieldDelimiter);

    String workerId = tokens[0];
    String biz = tokens[1];
    final Worker worker = new Worker(workerId, biz);

    return worker;
  }

  @Override
  public Set<Worker> decodeAll(String valueString) {
    Set<Worker> workers = new HashSet<>();
    try {
      for (String segment : SEGMENT_DELIMITER.split(valueString)) {
        workers.add(decode(segment));
      }
    } catch (Exception e) {
      throw new RuntimeException("invalid worker list: " + valueString, e);
    }
    if (workers.isEmpty()) {
      throw new RuntimeException("ZNode does not contain any workers.");
    }
    return workers;
  }

  @Override
  public byte[] encodeAll(Iterable<Worker> workers) {
    requireNonNull(workers, "workers");
    StringBuilder nodeValue = new StringBuilder();
    workers.forEach(worker -> nodeValue.append(worker.getId()).append(fieldDelimiter).append(
            worker.getBiz()).append(segmentDelimiter));
    //delete the last unused segment delimiter
    if (nodeValue.length() > 0) {
      nodeValue.deleteCharAt(nodeValue.length() - 1);
    }
    return nodeValue.toString().getBytes(StandardCharsets.UTF_8);
  }

  @Override
  public byte[] encode(Worker worker) {
    return (worker.getId() + fieldDelimiter + worker.getBiz()).getBytes(StandardCharsets.UTF_8);
  }
}

5、zk.thrift

该客户端使用thrift生成序列化对象:

namespace java com.dataeye.crawler.thrift

struct Worker {
    //worker的标识
    1: required string id,
    //worker所属的业务
    2: required string biz
}

6、ZooKeeperConnector.java

该类负责ZooKeeper的连接,节点操作,节点监听等功能,是该客户端的核心类

package com.dataeye.zookeeper;

import com.google.common.annotations.VisibleForTesting;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

import static java.util.Objects.requireNonNull;
import static org.apache.zookeeper.KeeperException.Code.get;

public final class ZooKeeperConnector {
  private static final Logger logger = LoggerFactory.getLogger(ZooKeeperConnector.class);
  private final String zkConnectionStr;
  private final String zNodePath;
  private final int sessionTimeout;
  private final ZooKeeperListener listener;
  private ZooKeeper zooKeeper;
  private BlockingQueue<KeeperState> stateQueue;
  private CountDownLatch latch;
  private boolean activeClose;
  private String prevNodeValue;
  private Map<String, String> prevChildValue;

  public ZooKeeperConnector(String zkConnectionStr, String zNodePath, int sessionTimeout,
                            ZooKeeperListener listener
  ) {
    this.zkConnectionStr = requireNonNull(zkConnectionStr, "zkConnectionStr");
    this.zNodePath = requireNonNull(zNodePath, "zNodePath");
    this.sessionTimeout = sessionTimeout;
    this.listener = requireNonNull(listener, "listener");
  }

  /**
   * Do connect.
   */
  public void connect() {
    try {
      activeClose = false;
      latch = new CountDownLatch(1);
      zooKeeper = new ZooKeeper(zkConnectionStr, sessionTimeout, new ZkWatcher());
      latch.await();
      notifyConnected();
      notifyChange();
      if (stateQueue != null) {
        //put a fake stat to ensure method postConnected finished completely
        //(so that it won't throw exception under ZooKeeper connection recovery test)
        stateQueue.put(KeeperState.Disconnected);
      }
    } catch (Exception e) {
      throw new ZooKeeperException(
              "failed to connect to ZooKeeper:  " + zkConnectionStr + " (" + e + ')', e);
    }
  }

  /**
   * Utility method to create a node.
   * @param nodePath node name
   * @param value    node value
   */
  public void createChild(String nodePath, byte[] value, CreateMode createMode) {
    // CreateMode.EPHEMERAL 临时节点
    // CreateMode.PERSISTENT 持久化节点
    try {
      //first check the parent node
      if (zooKeeper.exists(zNodePath, false) == null) {
        //parent node not exist, create it
        zooKeeper.create(zNodePath, zNodePath.getBytes(StandardCharsets.UTF_8),
                Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT);
      }
      if (zooKeeper.exists(zNodePath + '/' + nodePath, true) == null) {
        zooKeeper.create(zNodePath + '/' + nodePath, value, Ids.OPEN_ACL_UNSAFE, createMode);
      } else {
        throw new ZooKeeperException("failed to create ZooKeeper Node:" + zNodePath + '/' + nodePath +
                ",because the path exist already.");
      }
    } catch (Exception e) {
      throw new ZooKeeperException(
              "failed to create ZooKeeper Node:  " + zkConnectionStr + " (" + e + ')', e);
    }
  }

  /**
   * Closes the underlying Zookeeper connection.
   * @param active if it is closed by user actively ? or passively by program because of underlying
   *               connection expires
   */
  public void close(boolean active) {
    try {
      activeClose = active;
      zooKeeper.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  /**
   * Notify listener that ZooKeeper connection has been established.
   */
  private void notifyConnected() {
    listener.connected();
  }

  /**
   * Notify listener that a node value or node children has changed.
   */
  private void notifyChange() {
    //forget it if event was triggered by user's actively closing EndpointGroup
    if (activeClose) {
      return;
    }
    List<String> children;
    byte[] newValueBytes;
    try {
      if (zooKeeper.exists(zNodePath, true) == null) {
        return;
      }
      children = zooKeeper.getChildren(zNodePath, true);
      newValueBytes = zooKeeper.getData(zNodePath, false, null);
      if (newValueBytes != null) {
        String newValue = new String(newValueBytes, StandardCharsets.UTF_8);
        if (prevNodeValue == null || !prevNodeValue.equals(newValue)) {
          listener.nodeValueChange(newValue);
          prevNodeValue = newValue;
        }
      }
      //check children status
      if (children != null) {
        Map<String, String> newChildValue = new HashMap<>();
        children.forEach(child -> {
          try {
            newChildValue.put(child,
                    new String(zooKeeper.getData(zNodePath + '/' + child,
                            false, null), StandardCharsets.UTF_8));
          } catch (Exception e) {
            throw new ZooKeeperException(e);
          }
        });
        if (prevChildValue == null || !prevChildValue.equals(newChildValue)) {
          listener.nodeChildChange(newChildValue);
          prevChildValue = newChildValue;
        }
      }
    } catch (Exception ex) {
      throw new ZooKeeperException("Failed to notify ZooKeeper listener", ex);
    }
  }

  /**
   * A ZooKeeper watch.
   */
  final class ZkWatcher implements Watcher, StatCallback {
    @Override
    public void process(WatchedEvent event) {
      if (stateQueue != null) {
        enqueueState(event.getState());
      }
      String path = event.getPath();
      if (event.getType() == Event.EventType.None) {
        // Connection state has been changed. Keep retrying until the connection is recovered.
        switch (event.getState()) {
          case Disconnected:
            break;
          case SyncConnected:
            // We are here because of one of the following:
            // - initial connection,
            // - reconnection due to session timeout or
            // - reconnection due to session expiration
            // Once connected, reset the retry delay.
            latch.countDown();
            break;
          case Expired:
            // Session expired usually happens when a client reconnected to the server after
            // long time network partition, exceeding the configured
            // session timeout. We need to reconstruct the ZooKeeper client.
            // First, clean the original handle.
            close(false);
            zooKeeper = null;
            try {
              if (!activeClose) {
                connect();
              }
            } catch (ZooKeeperException e) {
              logger.warn("Failed to attempt to recover a ZooKeeper connection", e);
            }
            break;
        }
      } else {
        if (path != null && path.startsWith(zNodePath)) {
          // Something has changed on the node, let's find out.
          try {
            zooKeeper.exists(path, true, this, null);
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }
    }

    @Override
    public void processResult(int responseCodeInt, String path, Object ctx, Stat stat) {
      Code responseCode = get(responseCodeInt);
      switch (responseCode) {
        case OK:
          break;
        case NONODE:
          break;
        case SESSIONEXPIRED:
          // Ignore this and let the zNode Watcher process it first.
        case NOAUTH:
          // It's possible that this happens during runtime. We ignore this and wait for the ACL
          // configuration returns to normal. If it happens when the ZooKeeper client is initially
          // constructed, the constructor will throw an exception.
          return;
        default:
          // Retry on recoverable errors. Fatal errors go to the process() method above.
          try {
            zooKeeper.exists(path, true, this, null);
          } catch (Exception ex) {
            throw new ZooKeeperException("Failed to process ZooKeeper callback event", ex);
          }
          return;
      }
      if (!activeClose) {
        notifyChange();
        //enqueue an end flag to force the main thread to wait until this callback finished  before exit
        if (stateQueue != null) {
          enqueueState(KeeperState.Disconnected);
        }
      }
    }

    /**
     * Enqueue the state.
     * @param state ZooKeeper state
     */
    private void enqueueState(KeeperState state) {
      if (stateQueue == null) {
        return;
      }
      try {
        stateQueue.put(state);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

7、ZKClient.java

Zookeeper客户端

package com.dataeye.zookeeper.client;

import com.dataeye.crawler.thrift.Worker;
import com.dataeye.zookeeper.DefaultNodeValueCodec;
import com.dataeye.zookeeper.ZooKeeperConnector;
import com.dataeye.zookeeper.ZooKeeperListener;
import com.dataeye.zookeeper.ZookeeperProperty;

import java.util.*;
import java.util.stream.Collectors;


public class ZKClient {

    public static void main(String[] args) {
        String zkConn = "localhost:2181";

        DefaultNodeValueCodec nodeValueCodec = DefaultNodeValueCodec.INSTANCE;

        ZookeeperProperty zookeeperProperty = new ZookeeperProperty(zkConn, "/de-spider/works", 60000);

        ZooKeeperConnector zooKeeperConnector = new ZooKeeperConnector(
                zookeeperProperty.getZkConnnectionStr(),
                zookeeperProperty.getzNodePath(),
                zookeeperProperty.getSessionTimeout(),
                new ZooKeeperListener() {
                    @Override
                    public void nodeChildChange(Map<String, String> newChildrenValue) {
                        List<Worker> newWorkers = newChildrenValue.values().stream().map(nodeValueCodec::decode).filter(Objects::nonNull).collect(Collectors.toList());
                        Map<String, Set<Worker>> newBizWorkers = new HashMap<>();

                        if (newWorkers != null) {
                            for (Worker worker : newWorkers) {
                                String biz = worker.biz;

                                if (!newBizWorkers.containsKey(biz)) {
                                    newBizWorkers.put(biz, new HashSet<>());
                                }
                                newBizWorkers.get(biz).add(worker);
                            }
                        }
                    }

                    @Override
                    public void nodeValueChange(String newValue) {

                    }

                    @Override
                    public void connected() {

                    }
                });
        zooKeeperConnector.connect();
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容

  • 转自:http://blog.csdn.net/kesonyk/article/details/50924489 ...
    晴天哥_王志阅读 24,787评论 2 38
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,629评论 18 139
  • 一、ZooKeeper的背景 1.1 认识ZooKeeper ZooKeeper---译名为“动物园管理员”。动物...
    algernoon阅读 9,058评论 1 106
  • 出来社会后,绝大多数有效时间都给了工作, 所以能够安安静静地提笔写字,敲文成章的时候,并不多, 还好,写文,看书这...
    收刀入鞘阅读 174评论 0 0
  • 此刻 羞怯的原子依附着散落的尘埃 指针代替衰老 在冗长的梦境里 熟练的偷盗者略显公平 挂在空气中的雨 在半个世纪以...
    陆丘禾阅读 272评论 0 5