概述
本节以I/O多路复用模型NioEndpoint为例分析下连接器涉及的组件;
1. LimitLatch
LimitLatch是连接控制器,它负责控制最大连接数,NIO模式下默认是10000,达到这个阈值后,连接请求被拒绝,直到后续组件处理完一个连接后将连接数减1。
public class LimitLatch {
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected int tryAcquireShared() {
long newCount = count.incrementAndGet();
if (newCount > limit) {
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
private final Sync sync;
private final AtomicLong count;
private volatile long limit;
//线程调用这个方法来获得接收新连接的许可,线程可能被阻塞
public void countUpOrAwait() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//调用这个方法来释放一个连接许可,那么前面阻塞的线程可能被唤醒
public long countDown() {
sync.releaseShared(0);
long result = getCount();
return result;
}
}
- LimitLatch 内步定义了内部类 Sync,而 Sync 扩展了 AQS,AQS 就是一个骨架抽象类,它帮我们搭了个架子,用来控制线程的阻塞和唤醒。具体什么时候阻塞、什么时候唤醒由我们来决定;(AQS参考AQS源码分析)
- 用户线程通过调用 LimitLatch 的 countUpOrAwait 方法来拿到锁,如果暂时无法获取,这个线程会被阻塞到 AQS 的队列中。那 AQS 怎么知道是阻塞还是不阻塞用户线程呢?其实这是由 AQS 的使用者来决定的,也就是内部类 Sync 来决定的,因为 Sync 类重写了 AQS 的 tryAcquireShared() 方法。它的实现逻辑是如果当前连接数 count 小于 limit,线程能获取锁,返回 1,否则返回 -1;
- 如何用户线程被阻塞到了 AQS 的队列,那什么时候唤醒呢?同样是由 Sync 内部类决定,Sync 重写了 AQS 的 tryReleaseShared() 方法,其实就是当一个连接请求处理完了,这时又可以接收一个新连接了,这样前面阻塞的线程将会被唤醒;
2. Acceptor
Acceptor 跑在一个单独的线程里,它在一个死循环里调用 accept 方法来接收新连接,一旦有新的连接请求到来,accept 方法返回一个 Channel 对象,接着把 Channel 对象交给 Poller 去处理。
// Acceptor
socket = endpoint.serverSocketAccept();
// NioEndpoint
protected SocketChannel serverSocketAccept() throws Exception {
return serverSock.accept();
}
// NioEndpoint
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent r = null;
if (eventCache != null) {
r = eventCache.pop();
}
if (r == null) {
r = new PollerEvent(socket, OP_REGISTER);
} else {
r.reset(socket, OP_REGISTER);
}
addEvent(r);
}
- ServerSocketChannel 通过 accept() 接受新的连接,accept() 方法返回获得 SocketChannel 对象,然后将 SocketChannel 对象封装在一个 PollerEvent 对象中,并将 PollerEvent 对象压入 Poller 的 Queue 里,这是个典型的“生产者 - 消费者”模式,Acceptor 与 Poller 线程之间通过 Queue 通信。
3. Poller
Poller 的本质是一个 Selector,也跑在单独线程里。Poller 不断的通过内部的 Selector 对象向内核查询 Channel 的状态,一旦可读就生成任务类 SocketProcessor 交给 Executor 去处理。Poller 的另一个重要任务是循环遍历检查自己所管理的 SocketChannel 是否已经超时,如果有超时就关闭这个 SocketChannel。
// Poller.run
keyCount = selector.selectNow();
Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
if (socketWrapper == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, socketWrapper);
}
}
// Poller.processSocket
processSocket(socketWrapper, SocketEvent.OPEN_READ, true)
// AbstractEndpoint.processSocket
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
4. Executor
Executor 是 Tomcat 定制版的线程池,负责运行 SocketProcessor 任务类,SocketProcessor 的 run 方法会调用 Http11Processor 来读取和解析请求数据并通过容器来处理请求,最终会调用到我们的 Servlet。Http11Processor 是应用层协议的封装,它会调用容器获得响应,再把响应通过 Channel 写出。
5. SocketProcessor
Poller 会创建 SocketProcessor 任务类交给线程池处理,而 SocketProcessor 实现了 Runnable 接口,用来定义 Executor 中线程所执行的任务,主要就是调用 Http11Processor 组件来处理请求。
// SocketProcessor.doRun
state = getHandler().process(socketWrapper, event);
// AbstractProtocol.process
processor = getProtocol().createProcessor();
-> [Http11Processor processor = new Http11Processor(this, adapter);]
state = processor.process(wrapper, status);
这里注意:Http11Processor 并不是直接读取 Channel 的。这是因为 Tomcat 支持同步非阻塞 I/O 模型和异步 I/O 模型,在 Java API 中,相应的 Channel 类也是不一样的,比如有 AsynchronousSocketChannel 和 SocketChannel,为了对 Http11Processor 屏蔽这些差异,Tomcat 设计了一个包装类叫作 SocketWrapper,Http11Processor 只调用 SocketWrapper 的方法去读写数据。
6. Http11Processor
Http11Processor 读取 Channel 的数据来生成 org.apache.coyote.Request 对象,并通过默认适配器CoyoteAdapter调用到容器。
// Http11Processor.service
// 准备org.apache.coyote.Request
prepareRequest();
// 通过适配器调用到容器
getAdapter().service(request, response);
7. CoyoteAdapter
CoyoteAdapter主要负责将Tomcat org.apache.coyote.Request对象转成org.apache.catalina.connector.Request (implements HttpServletRequest
)对象,再调用容器的Service方法;
// CoyoteAdapter.service
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
request.setResponse(response);
response.setRequest(request);
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
req.getParameters().setQueryStringCharset(connector.getURICharset());
}
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean async = false;
boolean postParseSuccess = false;
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
try {
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
}
}
小结
在 Tomcat 中,Endpoint 组件的主要工作就是处理 I/O,而 NioEndpoint 利用 Java NIO API 实现了多路复用 I/O 模型。其中关键的一点是,读写数据的线程自己不会阻塞在 I/O 等待上,而是把这个工作交给 Selector。