Tomcat-NioEndpoint

概述

本节以I/O多路复用模型NioEndpoint为例分析下连接器涉及的组件;


NioEndpoint.png

1. LimitLatch

LimitLatch是连接控制器,它负责控制最大连接数,NIO模式下默认是10000,达到这个阈值后,连接请求被拒绝,直到后续组件处理完一个连接后将连接数减1。


public class LimitLatch {
    private class Sync extends AbstractQueuedSynchronizer {
     
        @Override
        protected int tryAcquireShared() {
            long newCount = count.incrementAndGet();
            if (newCount > limit) {
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
    }

    private final Sync sync;
    private final AtomicLong count;
    private volatile long limit;
    
    //线程调用这个方法来获得接收新连接的许可,线程可能被阻塞
    public void countUpOrAwait() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
    }

    //调用这个方法来释放一个连接许可,那么前面阻塞的线程可能被唤醒
    public long countDown() {
      sync.releaseShared(0);
      long result = getCount();
      return result;
   }
}
  • LimitLatch 内步定义了内部类 Sync,而 Sync 扩展了 AQS,AQS 就是一个骨架抽象类,它帮我们搭了个架子,用来控制线程的阻塞和唤醒。具体什么时候阻塞、什么时候唤醒由我们来决定;(AQS参考AQS源码分析)
  • 用户线程通过调用 LimitLatch 的 countUpOrAwait 方法来拿到锁,如果暂时无法获取,这个线程会被阻塞到 AQS 的队列中。那 AQS 怎么知道是阻塞还是不阻塞用户线程呢?其实这是由 AQS 的使用者来决定的,也就是内部类 Sync 来决定的,因为 Sync 类重写了 AQS 的 tryAcquireShared() 方法。它的实现逻辑是如果当前连接数 count 小于 limit,线程能获取锁,返回 1,否则返回 -1;
  • 如何用户线程被阻塞到了 AQS 的队列,那什么时候唤醒呢?同样是由 Sync 内部类决定,Sync 重写了 AQS 的 tryReleaseShared() 方法,其实就是当一个连接请求处理完了,这时又可以接收一个新连接了,这样前面阻塞的线程将会被唤醒;

2. Acceptor

Acceptor 跑在一个单独的线程里,它在一个死循环里调用 accept 方法来接收新连接,一旦有新的连接请求到来,accept 方法返回一个 Channel 对象,接着把 Channel 对象交给 Poller 去处理。

// Acceptor
socket = endpoint.serverSocketAccept();
// NioEndpoint
protected SocketChannel serverSocketAccept() throws Exception {
    return serverSock.accept();
}

// NioEndpoint
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
    socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
    PollerEvent r = null;
    if (eventCache != null) {
        r = eventCache.pop();
    }
    if (r == null) {
        r = new PollerEvent(socket, OP_REGISTER);
    } else {
        r.reset(socket, OP_REGISTER);
    }
    addEvent(r);
}
  • ServerSocketChannel 通过 accept() 接受新的连接,accept() 方法返回获得 SocketChannel 对象,然后将 SocketChannel 对象封装在一个 PollerEvent 对象中,并将 PollerEvent 对象压入 Poller 的 Queue 里,这是个典型的“生产者 - 消费者”模式,Acceptor 与 Poller 线程之间通过 Queue 通信。

3. Poller

Poller 的本质是一个 Selector,也跑在单独线程里。Poller 不断的通过内部的 Selector 对象向内核查询 Channel 的状态,一旦可读就生成任务类 SocketProcessor 交给 Executor 去处理。Poller 的另一个重要任务是循环遍历检查自己所管理的 SocketChannel 是否已经超时,如果有超时就关闭这个 SocketChannel。

// Poller.run
keyCount = selector.selectNow();
Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
while (iterator != null && iterator.hasNext()) {
    SelectionKey sk = iterator.next();
    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
    if (socketWrapper == null) {
        iterator.remove();
    } else {
        iterator.remove();
        processKey(sk, socketWrapper);
    }
}

// Poller.processSocket
processSocket(socketWrapper, SocketEvent.OPEN_READ, true)

// AbstractEndpoint.processSocket
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
    sc = processorCache.pop();
}
if (sc == null) {
    sc = createSocketProcessor(socketWrapper, event);
} else {
    sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
    executor.execute(sc);
} else {
    sc.run();
}

4. Executor

Executor 是 Tomcat 定制版的线程池,负责运行 SocketProcessor 任务类,SocketProcessor 的 run 方法会调用 Http11Processor 来读取和解析请求数据并通过容器来处理请求,最终会调用到我们的 Servlet。Http11Processor 是应用层协议的封装,它会调用容器获得响应,再把响应通过 Channel 写出。

5. SocketProcessor

Poller 会创建 SocketProcessor 任务类交给线程池处理,而 SocketProcessor 实现了 Runnable 接口,用来定义 Executor 中线程所执行的任务,主要就是调用 Http11Processor 组件来处理请求。

// SocketProcessor.doRun
state = getHandler().process(socketWrapper, event);

// AbstractProtocol.process
processor = getProtocol().createProcessor();
-> [Http11Processor processor = new Http11Processor(this, adapter);]
state = processor.process(wrapper, status);

这里注意:Http11Processor 并不是直接读取 Channel 的。这是因为 Tomcat 支持同步非阻塞 I/O 模型和异步 I/O 模型,在 Java API 中,相应的 Channel 类也是不一样的,比如有 AsynchronousSocketChannel 和 SocketChannel,为了对 Http11Processor 屏蔽这些差异,Tomcat 设计了一个包装类叫作 SocketWrapper,Http11Processor 只调用 SocketWrapper 的方法去读写数据。

6. Http11Processor

Http11Processor 读取 Channel 的数据来生成 org.apache.coyote.Request 对象,并通过默认适配器CoyoteAdapter调用到容器。

// Http11Processor.service
// 准备org.apache.coyote.Request
prepareRequest();
// 通过适配器调用到容器
getAdapter().service(request, response);

7. CoyoteAdapter

CoyoteAdapter主要负责将Tomcat org.apache.coyote.Request对象转成org.apache.catalina.connector.Request (implements HttpServletRequest)对象,再调用容器的Service方法;

// CoyoteAdapter.service
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception {
    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);
    if (request == null) {
        request = connector.createRequest();
        request.setCoyoteRequest(req);
        response = connector.createResponse();
        response.setCoyoteResponse(res);
        request.setResponse(response);
        response.setRequest(request);
        req.setNote(ADAPTER_NOTES, request);
        res.setNote(ADAPTER_NOTES, response);
        req.getParameters().setQueryStringCharset(connector.getURICharset());
    }

    if (connector.getXpoweredBy()) {
        response.addHeader("X-Powered-By", POWERED_BY);
    }
    boolean async = false;
    boolean postParseSuccess = false;
    req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

    try {
        postParseSuccess = postParseRequest(req, request, res, response);
        if (postParseSuccess) {
            request.setAsyncSupported(
                    connector.getService().getContainer().getPipeline().isAsyncSupported());
            connector.getService().getContainer().getPipeline().getFirst().invoke(
                    request, response);
        }
    } 
}

小结

在 Tomcat 中,Endpoint 组件的主要工作就是处理 I/O,而 NioEndpoint 利用 Java NIO API 实现了多路复用 I/O 模型。其中关键的一点是,读写数据的线程自己不会阻塞在 I/O 等待上,而是把这个工作交给 Selector。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容