摘要
每台服务器在启动的过程中,会启动一个QuorumPeerManager,负责各台服务器之间的底层Leader选举过程中的网络通信。
这个类就是QuorumCnxManager
在看本节源码之前,建议先把refer中的内容看完,也不多,对于基本概念有一个理解
本节主要讲解
内部类
SendWorker类作为网络IO的发送者,从发送队列取出,发给对应sid的机器
Message类定义了消息结构,包含sid以及消息体ByteBuffer
RecvWorker类作为网络IO的接受者
Listener类作为electionPort端口的监听器,等待其他机器的连接
属性
recvQueue作为接受队列
queueSendMap表示每个sid对应的发送的发送队列
函数
连接相关
sender,recv生产消费相关
其他
思考以及总结
内部类
可以看到有四个内部类
SendWorker类,Message类,RecvWorker类,Listener类
SendWorker
这个类作为“发送者”,继承ZooKeeperThread,线程不断地从发送队列取出,发给对应sid的机器
属性
Long sid;//目标机器sid,不是当前机器sid
Socket sock;
RecvWorker recvWorker;//该sid对应的RecvWorker
volatile boolean running = true;
DataOutputStream dout;
主要方法
构造函数
SendWorker(Socket sock, Long sid) {
super("SendWorker:" + sid);
this.sid = sid;
this.sock = sock;
recvWorker = null;
try {
dout = new DataOutputStream(sock.getOutputStream());
} catch (IOException e) {
LOG.error("Unable to access socket output stream", e);
closeSocket(sock);
running = false;
}
LOG.debug("Address of remote peer: " + this.sid);
}
run方法
@Override
public void run() {
threadCnt.incrementAndGet();//线程数+1
try {
/**
* If there is nothing in the queue to send, then we
* send the lastMessage to ensure that the last message
* was received by the peer. The message could be dropped
* in case self or the peer shutdown their connection
* (and exit the thread) prior to reading/processing
* the last message. Duplicate messages are handled correctly
* by the peer.
*
* If the send queue is non-empty, then we have a recent
* message than that stored in lastMessage. To avoid sending
* stale message, we should send the message in the send queue.
*/
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);//找到sid对应需要send的队列
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);//如果没有什么发的,就把上一次发的再发一遍(重发能够正确处理)
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid=" + sid);
send(b);//发送
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);//从发送队列里面取出消息
} else {//队列没有记录在map中
LOG.error("No queue of incoming messages for " +
"server " + sid);
break;
}
if(b != null){
lastMessageSent.put(sid, b);//更新最后一次发送的
send(b);//发送
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
e);
}
}
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid + " my id = " +
self.getId() + " error = " + e);
}
this.finish();
LOG.warn("Send worker leaving thread");
}
}
send方法
synchronized void send(ByteBuffer b) throws IOException {
byte[] msgBytes = new byte[b.capacity()];
try {
b.position(0);
b.get(msgBytes);
} catch (BufferUnderflowException be) {
LOG.error("BufferUnderflowException ", be);
return;
}
dout.writeInt(b.capacity());
dout.write(b.array());
dout.flush();
}
里面涉及的字段在下面会讲,这里注意
在SendWorker中,一旦Zookeeper发现针对当前服务器的消息发送队列为空,那么此时需要从lastMessageSent中取出一个最近发送过的消息来进行再次发送,这是为了解决接收方在消息接收前或者接收到消息后服务器挂了,导致消息尚未被正确处理。同时,Zookeeper能够保证接收方在处理消息时,会对重复消息进行正确的处理。
Message
这个类定义了server之间传输的消息结构,源码如下
static public class Message {
Message(ByteBuffer buffer, long sid) {
this.buffer = buffer;
this.sid = sid;
}
ByteBuffer buffer;
long sid;
}
sid为消息来源方的sid,buffer即消息体
RecvWorker
这个类作为“接受者”,类似SendWorker,继承ZooKeeperThread,线程不断地从网络IO中读取数据,放入接收队列
属性
Long sid;//来源方sid
Socket sock;
volatile boolean running = true;
DataInputStream din;//input
final SendWorker sw;
主要方法
构造方法
RecvWorker(Socket sock, Long sid, SendWorker sw) {
super("RecvWorker:" + sid);
this.sid = sid;
this.sock = sock;
this.sw = sw;
try {
din = new DataInputStream(sock.getInputStream());
// OK to wait until socket disconnects while reading.
sock.setSoTimeout(0);
} catch (IOException e) {
LOG.error("Error while accessing socket for " + sid, e);
closeSocket(sock);
running = false;
}
}
run方法
@Override
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
int length = din.readInt();//获取长度
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);//解析出byteBuffer
addToRecvQueue(new Message(message.duplicate(), sid));//加入接收队列
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = " +
self.getId() + ", error = " , e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
if (sock != null) {
closeSocket(sock);
}
}
}
}
Listener
这个类也继承ZooKeeperThread,主要监听electionPort,不断的接收外部连接
run方法核心代码如下
while (!shutdown) {
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
receiveConnection(client);//不断接受连接
numRetries = 0;
}
内部类小结
SendWorker和RecvWorker互相依赖对方,原因在下面思考中列出
RecvWorker相比SendWorker代码要好理解
两者都有属性sid,表示每个机器和其他机器连接时,按sid区分不同的RecvWorker和SendWorker
好比sid1和其余(n-1)个server建立连接,那么就按sid分开,有(n-1)个RecvWorker和SendWorker
Message作为消息的封装,包含sid和ByteBuffer作为消息体
Listener主要监听本机配置的electionPort,不断的接收外部连接
属性
将重要属性字段整理如下
属性 | 默认值 | 备注 |
---|---|---|
RECV_CAPACITY | 100 | 接收队列的长度 |
SEND_CAPACITY | 1 | 发送队列的长度,原因在"思考"中提到 |
ConcurrentHashMap<Long, SendWorker> senderWorkerMap; | sid对应的SendWorker | |
ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap; | 消息发送队列,key为各机器sid | |
ConcurrentHashMap<Long, ByteBuffer> lastMessageSent; | 上一次发送给sid机器的内容 | |
ArrayBlockingQueue<Message> recvQueue | 接收队列 |
比较好理解
函数
请求连接,接收连接相关函数
请求连接相关
connectOne:连接上一个sid的服务器
synchronized void connectOne(long sid){//连接上某个sid的server
if (senderWorkerMap.get(sid) == null){//如果没有记录在sender的map里面
InetSocketAddress electionAddr;
if (self.quorumPeers.containsKey(sid)) {
electionAddr = self.quorumPeers.get(sid).electionAddr;//从配置文件获取对应sid机器的选举端口
} else {
LOG.warn("Invalid server id: " + sid);
return;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Opening channel to server " + sid);
}
Socket sock = new Socket();
setSockOpts(sock);
sock.connect(self.getView().get(sid).electionAddr, cnxTO);//连接上对应socket
if (LOG.isDebugEnabled()) {
LOG.debug("Connected to server " + sid);
}
initiateConnection(sock, sid);//初始化连接
} catch (UnresolvedAddressException e) {
// Sun doesn't include the address that causes this
// exception to be thrown, also UAE cannot be wrapped cleanly
// so we log the exception in order to capture this critical
// detail.
LOG.warn("Cannot open channel to " + sid
+ " at election address " + electionAddr, e);
// Resolve hostname for this server in case the
// underlying ip address has changed.
if (self.getView().containsKey(sid)) {
self.getView().get(sid).recreateSocketAddresses();
}
throw e;
} catch (IOException e) {
LOG.warn("Cannot open channel to " + sid
+ " at election address " + electionAddr,
e);
// We can't really tell if the server is actually down or it failed
// to connect to the server because the underlying IP address
// changed. Resolve the hostname again just in case.
if (self.getView().containsKey(sid)) {
self.getView().get(sid).recreateSocketAddresses();
}
}
} else {
LOG.debug("There is a connection already for server " + sid);
}
}
如果senderWorkerMap没有sid对应记录,代表目前没有连接,那么就去连,主要核心代码是调用
initiateConnection函数
initiateConnection:初始化连接
/**
* If this server has initiated the connection, then it gives up on the
* connection if it loses challenge. Otherwise, it keeps the connection.
*/
public boolean initiateConnection(Socket sock, Long sid) {//初始化连接
DataOutputStream dout = null;
try {
// Sending id and challenge
dout = new DataOutputStream(sock.getOutputStream());
dout.writeLong(self.getId());//发送本机sid
dout.flush();
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
closeSocket(sock);
return false;
}
// If lost the challenge, then drop the new connection
if (sid > self.getId()) {//发送连接的时候,只让大sid给小sid发送,如果当前sid小,那就close掉
LOG.info("Have smaller server identifier, so dropping the " +
"connection: (" + sid + ", " + self.getId() + ")");
closeSocket(sock);
// Otherwise proceed with the connection
} else {//自己sid大,初始化SendWorker和RecvWorker
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, sid, sw);//rw记录sw
sw.setRecv(rw);//sw记录rw
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();//finish掉sid对应的SendWorker,vsw
senderWorkerMap.put(sid, sw);//放入新的SendWorker,sw
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
}
sw.start();
rw.start();
return true;
}
return false;
}
接收连接相关
receiveConnection:接收外部connect
public void receiveConnection(Socket sock) {//接收connect
Long sid = null;
try {
// Read server id
DataInputStream din = new DataInputStream(sock.getInputStream());
sid = din.readLong();
if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
sid = din.readLong();
// next comes the #bytes in the remainder of the message
// note that 0 bytes is fine (old servers)
int num_remaining_bytes = din.readInt();
if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
closeSocket(sock);
return;
}
byte[] b = new byte[num_remaining_bytes];
// remove the remainder of the message from din
int num_read = din.read(b);
if (num_read != num_remaining_bytes) {
LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
}
}
if (sid == QuorumPeer.OBSERVER_ID) {
/*
* Choose identifier at random. We need a value to identify
* the connection.
*/
sid = observerCounter--;
LOG.info("Setting arbitrary identifier to observer: " + sid);
}
} catch (IOException e) {
closeSocket(sock);
LOG.warn("Exception reading or writing challenge: " + e.toString());
return;
}
//If wins the challenge, then close the new connection.
if (sid < self.getId()) {//如果自己id大,就close掉当前连接(当前是小sid发给大sid的连接),自己再去连对方sid
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
*/
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
/*
* Now we start a new connection
*/
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock);
connectOne(sid);
// Otherwise start worker threads to receive data.
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
}
sw.start();
rw.start();
return;
}
}
这里有个概念,就是win challenge和lose challenge
在zk中,为了保证每一对server只有一个socket,Zookeeper只允许SID大的服务器主动和其他机器建立连接,否则断开连接。
发出连接时,要求自己sid大,完成SendWorker和ReceiveWorker的构造以及线程启动,否则close
接收连接时,要求自己sid小,完成SendWorker和ReceiveWorker的构造以及线程启动,否则close
在“思考”中也会分析
sender生产,消费相关函数
生成,加入sender队列
private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
ByteBuffer buffer) {//发送队列长度为1,如果满了就remove,然后add
if (queue.remainingCapacity() == 0) {
try {
queue.remove();
} catch (NoSuchElementException ne) {
// element could be removed by poll()
LOG.debug("Trying to remove from an empty " +
"Queue. Ignoring exception " + ne);
}
}
try {
queue.add(buffer);
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert an element in the queue " + ie);
}
}
每个sender的队列长度都是1,为了避免发送旧的数据,因此会把旧的remove掉
消费sender队列
private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue,
long timeout, TimeUnit unit) throws InterruptedException {
return queue.poll(timeout, unit);
}
recv生产,消费相关函数
recv队列生产
public void addToRecvQueue(Message msg) {
synchronized(recvQLock) {
if (recvQueue.remainingCapacity() == 0) {
try {
recvQueue.remove();
} catch (NoSuchElementException ne) {
// element could be removed by poll()
LOG.debug("Trying to remove from an empty " +
"recvQueue. Ignoring exception " + ne);
}
}
try {
recvQueue.add(msg);//加入接收队列
} catch (IllegalStateException ie) {
// This should never happen
LOG.error("Unable to insert element in the recvQueue " + ie);
}
}
}
这里并不是很清楚为什么加入队列时,如果满了要把前面的remove掉,队列的长度上限是100
recv队列消费
public Message pollRecvQueue(long timeout, TimeUnit unit)
throws InterruptedException {
return recvQueue.poll(timeout, unit);
}
其他函数
toSend
将消息根据sid添加进recv队列或者send队列,间接调用send,recv的生产
public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
if (self.getId() == sid) {//如果发送给自己,加入recv队列
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
/*
* Otherwise send to the corresponding thread to send.
*/
} else {
/*
* Start a new connection if doesn't have one already.
*/
if (!queueSendMap.containsKey(sid)) {//如果发送map没有记录这个sid
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY);//阻塞队列长度为1
queueSendMap.put(sid, bq);
addToSendQueue(bq, b);
} else {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if(bq != null){
addToSendQueue(bq, b);
} else {
LOG.error("No queue for server " + sid);
}
}
connectOne(sid);//和这个sid建立连接
}
}
haveDelivered
是否发送过消息
/**
* Check if all queues are empty, indicating that all messages have been delivered.
*/
boolean haveDelivered() {//如果有一个队列是空的,代表发送过了,和注释不一致
for (ArrayBlockingQueue<ByteBuffer> queue : queueSendMap.values()) {
LOG.debug("Queue size: " + queue.size());
if (queue.size() == 0) {
return true;
}
}
return false;
}
这里吐槽一下,代码和注释不一致,在"思考"中进行讲解
连接所有queueSendMap的sid
public void connectAll(){//把所有需要发送消息的机器sid都连接上
long sid;
for(Enumeration<Long> en = queueSendMap.keys();//连接所有queueSendMap记录的sid
en.hasMoreElements();){
sid = en.nextElement();
connectOne(sid);
}
}
思考
tricky方法的体现:每一对server之间只有一个连接
可以理解成n个server,互相之间都要用connection
好比n个点,用无向的边连起来,用[sidn,sid1]表示sidn向sid1建立了连接
那么,[sid1,sidn]就没有存在的必要了,也就是n*(n+1)/2条边就够了
代码里面规定的是,正常条件下只有sid大的server向sid小的server建立连接
体现在哪
连接请求发出时,如果对方sid比自己大,仅仅发送自己sid也就是一个long过去,然后close掉
QuorumCnxManager#initiateConnection
接收连接请求时,如果对方sid比自己小,那么close掉socket然后自己去连接对方sid
QuorumCnxManager#receiveConnection
也就是说,当sid小的向sid大的server发送连接请求时,也只是在告诉对方
“你sid大,你来连我”
SendWorker以及RecvWorker的初始化
如何保证[sidn,sid1]这样的连接中,双方都有初始化两个worker
在连接请求发出时,sid大的一方,也就是sidn初始化两个worker
在接收连接请求时,sid小的一方,也就是sid1初始化两个worker
代码同样体现在上面两个函数中
为什么RecvWorker和SendWorker要互相记录对方
代码里面就是finish的时候
SendWorker#finish调用对应的RecvWorker#finish
RecvWorker#run的finally段也去调用SendWorker#finish
然后变量senderWorkerMap是final ConcurrentHashMap<Long, SendWorker>类型的,
可以看到并不存在对应RecvWorker的map,
所以原因就是 根据sid找到SendWorker,然后方便调用finish方法
为什么发送队列的长度为1,入队时满了就要把前面的踢出去
长度为1 QuorumCnxManager#SEND_CAPACITY
踢出去 QuorumCnxManager#addToSendQueue
应该参考SEND_CAPACITY注释
// Initialized to 1 to prevent sending
// stale notifications to peers
因为是选举leader投票,有特殊的要求:如果之前的票还没有投出去又产生了新的票,那么旧的票就可以直接作废了,不用真正的投出去
发送队列的生产,消费与发送的调用顺序
生产
QuorumCnxManager#toSend
QuorumCnxManager#addToSendQueue
消费
QuorumCnxManager.SendWorker#run
QuorumCnxManager#pollSendQueue
发送
QuorumCnxManager.SendWorker#run
SendWorker#send
queueSendMap的意义
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;//消息发送队列,key为各机器sid
强调一下,key是sid,代表当前机器和对应sid的机器
建立了联系或者有要发送的内容
queueSendMap的改动只有put操作,没有remove等操作
基本都是
queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
QuorumCnxManager#haveDelivered 注释和代码不一致
源码在之前贴过了
这里的代码和注释不一致,加入生产队列的函数addToSendQueue只有在QuorumCnxManager#toSend中被调用
之后讲FastLeaderElection#lookForLeader会知道toSend是建立连接的入口,也就是入口方会在queueSendMap中sid对应的发送队列添加一条记录
而haveDelivered 方法中,如果有一个sid对应的队列长度为0,就代表发送队列的 任务 被消费掉了,也就是发送出去了,所以称之为"haveDeliverd",只不过注释有问题
问题
SendWorker发送队列没有东西的时候,把最后一次发送的内容再发一遍
说是这样能解决 ”接收方在消息接收前或者接收到消息后服务器挂了“ 的问题
那么倒数第二条为什么不发,倒数第三条为什么不发???
QuorumCnxManager#addToRecvQueue接收队列满的时候就把最前面的删掉
为什么,漏发了没关系吗
总结
这个类就是网络IO的调度器
SendWorker和RecvWorker作为两个线程不断将消息进行收发
Listener监听外部来的连接
主要属性 queueSendMap记录和哪些sid建立联系,对应的发送队列是什么
refer
http://www.cnblogs.com/leesf456/p/6107600.html 第3部分 QuorumCnxManager:网络I/O
《paoxs到zk》 7.6.3