Zookeeper(四)-客户端-消息处理流程

概述

本节分析下客户端消息的处理,重点关注Watcher及DataCallback;本节以getData的同步和异步方法为例进行分析;


RPC方法流程.png

处理流程

客户端请求处理流程.png

1.示例代码

public class DemoTest implements Watcher, AsyncCallback.DataCallback {
    private static Stat stat = new Stat();

    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
        DemoTest demoTest = new DemoTest();
        ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 3000000, demoTest);
        // 同步
        byte[] bytes = zooKeeper.getData("/test1", demoTest, stat);
        // 异步
        zooKeeper.getData("/test1", demoTest, demoTest, "异步回调需要传递的数据");
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("接收到watch通知:" + event);
    }

    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        System.out.println("结果回调:" + path + "----" + ctx);
    }
}
  • 实现Watcher接口,重写process方法用于watch事件回调;
  • 实现AsyncCallback接口,重写processResult方法用于异步结果回调;

2.queuePacket构造packet入队outgoingQueue

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
        Record response, AsyncCallback cb, String clientPath,
        String serverPath, Object ctx, WatchRegistration watchRegistration){
    Packet packet = null;
    synchronized (outgoingQueue) {
        packet = new Packet(h, r, request, response, watchRegistration);
        // AsyncCallback默认为空
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            // 数据包入队 需要发送的队列
            outgoingQueue.add(packet);
        }
    }
    // selector.wakeup();
    sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}
  • new Packet构造Packet(包含Request/Response/Watcher等);
  • outgoingQueue.add(packet)入队outgoingQueue,SendThread.run中进行处理;
  • wakeupCnxn()唤醒多路复用器立即进行select;

3.同步处理流程阻塞

public ReplyHeader submitRequest(RequestHeader h, Record request,
        Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(h, r, request, response, null, null, null,
                null, watchRegistration);
    synchronized (packet) {
        // packet响应后置为true,否则 wait
        while (!packet.finished) {
            packet.wait();
        }
    }
    return r;
}
  • packet.wait()阻塞等待packet响应后进行notify,然后同步返回;

4.异步处理流程结束

  • 异步流程同步结束,等待服务端响应后回调processResult;

5.doTransport

  • doTransport先write请求包再read响应包,具体流程跟启动流程类似,参考上一节的分析;

6.readResponse处理响应

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();
    // 反序列化响应头
    replyHdr.deserialize(bbia, "header");
    
    ......

    Packet packet;
    synchronized (pendingQueue) {
        if (pendingQueue.size() == 0) {
            throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
        }
        packet = pendingQueue.remove();
    }
    /*
     * Since requests are processed in order, we better get a response to the first request!
     * 由于请求是按顺序处理的,因此响应也要按顺序处理
     */
    try {
        // 比对响应xid 跟 原请求xid是否相等,保证顺序性
        if (packet.requestHeader.getXid() != replyHdr.getXid()) {
            packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
            throw new IOException("Xid out of order. Got Xid "
                    + replyHdr.getXid() + " with err " +
                    + replyHdr.getErr() +
                    " expected Xid "
                    + packet.requestHeader.getXid()
                    + " for a packet with details: "
                    + packet );
        }
        // 设置响应头
        packet.replyHeader.setXid(replyHdr.getXid());
        packet.replyHeader.setErr(replyHdr.getErr());
        packet.replyHeader.setZxid(replyHdr.getZxid());
        if (replyHdr.getZxid() > 0) {
            lastZxid = replyHdr.getZxid();
        }
        // 反序列化响应体
        if (packet.response != null && replyHdr.getErr() == 0) {
            packet.response.deserialize(bbia, "response");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet);
        }
    } finally {
        // packet响应反序列化后,处理watch注册
        finishPacket(packet);
    }
}
  • replyHdr.deserialize反序列化ReplyHeader;
  • pendingQueue.remove()从pendingQueue中移除Packet;
  • packet.requestHeader.getXid() != replyHdr.getXid()比对响应xid 跟 原请求xid是否相等,不相等抛出异常,保证请求处理的顺序性;
  • packet.response.deserialize反序列化响应体;

7.finishPacket

private void finishPacket(Packet p) {
    // submitRequest时的wcb,watcher不为空时p.watchRegistration != null
    if (p.watchRegistration != null) {
        // 注册watcher
        p.watchRegistration.register(p.replyHeader.getErr());
    }

    // 异步回调AsyncCallback为空,需要同步返回
    if (p.cb == null) {
        synchronized (p) {
            // finished置为true,
            p.finished = true;
            // 唤醒submitRequest时线程
            p.notifyAll();
        }
    } else {
        // 异步返回,通过eventThread线程处理
        p.finished = true;
        // 加入阻塞队列waitingEvents (LinkedBlockingQueue)
        eventThread.queuePacket(p);
    }
}
  • p.watchRegistration.register注册watcher,分别放到ZKWatchManager的三个Map中,用于在服务端触发相应事件时回调自定义的process方法;
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
  • p.notifyAll()p.cb == null即异步回调AsyncCallback为空,唤醒submitRequest时线程,同步返回;
  • eventThread.queuePacket(p)加入阻塞队列waitingEvents (LinkedBlockingQueue线程安全,不需要synchronized),触发EventThread.run方法,异步返回;

8.processEvent处理事件

private void processEvent(Object event) {
      try {
              Packet p = (Packet) event;
              int rc = 0;
              String clientPath = p.clientPath;
              if (p.replyHeader.getErr() != 0) {
                  rc = p.replyHeader.getErr();
              }
              
              ......
              
              else if (p.response instanceof GetDataResponse) {
                  DataCallback cb = (DataCallback) p.cb;
                  GetDataResponse rsp = (GetDataResponse) p.response;
                  if (rc == 0) {
                      cb.processResult(rc, clientPath, p.ctx, rsp.getData(), rsp.getStat());
                  } else {
                      cb.processResult(rc, clientPath, p.ctx, null, null);
                  }
              } 
          }
      } catch (Throwable t) {
          LOG.error("Caught unexpected throwable", t);
      }
}
  • p.response instanceof GetDataResponse判断当前事件时什么类型;
  • cb.processResult回调processResult,执行用户自定义逻辑;
    -------over-------
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 224,861评论 6 522
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,263评论 3 402
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 172,033评论 0 366
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 60,999评论 1 300
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,000评论 6 400
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,483评论 1 314
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,850评论 3 428
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,827评论 0 279
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,366评论 1 324
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,404评论 3 346
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,525评论 1 355
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,130评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,853评论 3 338
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,293评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,426评论 1 276
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,082评论 3 381
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,590评论 2 366

推荐阅读更多精彩内容