异步通道
AsynchronousServerSocketChannel:TCP监听套接字:
// 由异步通道提供商创建异步通道,并将通道绑定到异步通道组
static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group);
// 绑定本地地址,进入监听状态
AsynchronousServerSocketChannel bind(SocketAddress local);
// 异步接受连接
// 接受连接(或失败)后将调用完成处理器handler
// 传递给handler的参数是新连接的AsynchronousSocketChannel(绑定到相同通道组)
// 为了并发处理新连接,handler不由连接操作初始化线程调用
abstract <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler);
AsynchronousSocketChannel:TCP连接套接字:
// 由异步通道提供商创建异步通道,并将通道绑定到异步通道组
static AsynchronousSocketChannel open(AsynchronousChannelGroup group);
// 异步发起连接
// 建立连接(或失败)后将调用完成处理器handler
abstract <A> void connect(SocketAddress remote,
A attachment, CompletionHandler<Void,? super A> handler);
// 异步从通道读取字节序列到指定缓冲区
// 读操作完成 (或失败)后将调用完成处理器handler
// 传递给handler的参数是读取的字节数(-1表示达到流末尾)
// 可以指定timeout参数进行超时等待
abstract <A> void read(ByteBuffer dst, long timeout, TimeUnit unit,
A attachment, CompletionHandler<Integer,? super A> handler);
// 异步将指定缓冲区的字节序列写入通道
// 写操作完成(或失败)后将调用完成处理器handler
// 传递能handler的参数是写入的字节数
// 可以指定timeout参数进行超时等待
abstract <A> void write(ByteBuffer src, long timeout, TimeUnit unit,
A attachment, CompletionHandler<Integer,? super A> handler);
继承树:
java.nio.channels.AsynchronousServerSocketChannel
-> sun.nio.ch.AsynchronousServerSocketChannelImpl
-> sun.nio.ch.UnixAsynchronousServerSocketChannelImpl
java.nio.channels.AsynchronousSocketChannel
-> sun.nio.ch.AsynchronousSocketChannelImpl
-> sun.nio.ch.UnixAsynchronousSocketChannelImpl
异步通道组AsynchronousChannelGroup
异步通道组封装了异步I/O完成的处理机制。
异步通道创建时要绑定到一个异步通道组(未指定则为系统默认)。
通道组使用Linux Epoll为组内所有通道进行异步IO,并将就绪IO操作分发到通道 。
通道完成IO操作后,将完成处理器提交给通道组内部的线程池进行异步执行。
继承树:
java.nio.channels.AsynchronousChannelGroup:管理异步通道提供商
-> sun.nio.ch.AsynchronousChannelGroupImpl:管理线程池
-> sun.nio.ch.Port:管理异步通道
-> sun.nio.ch.EPollPort:epoll和就绪操作分发
异步通道提供商AsynchronousChannelProvider
异步通道的服务提供类。
Java虚拟机维护一个系统范围内默认的异步通道提供商实例,此实例维护系统默认异步通道组,还提供创建新的异步通道组实例的方法,所有创建的异步通道组都绑定此异步通道提供商。
继承树:
java.nio.channels.spi.AsynchronousChannelProvider
-> sun.nio.ch.LinuxAsynchronousChannelProvider
加载系统默认异步通道提供商:
// AsynchronousChannelProvider
private static class ProviderHolder {
static final AsynchronousChannelProvider provider = load();
private static AsynchronousChannelProvider load() {
AsynchronousChannelProvider p;
// 先按系统属性java.nio.channels.spi.AsynchronousChannelProvider
p = loadProviderFromProperty();
if (p != null) return p;
// 再从系统类加载器中获取第一个实现类
p = loadProviderAsService();
if (p != null) return p;
// 最后加载系统默认实现
return sun.nio.ch.DefaultAsynchronousChannelProvider.create();
}
}
// DefaultAsynchronousChannelProvider
// 根据操作系统加载对应的实现类
public static AsynchronousChannelProvider create() {
String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.SolarisAsynchronousChannelProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.LinuxAsynchronousChannelProvider");
if (osname.contains("OS X"))
return createProvider("sun.nio.ch.BsdAsynchronousChannelProvider");
throw new InternalError("platform not recognized");
}
加载系统默认异步通道组:
// LinuxAsynchronousChannelProvider
public class LinuxAsynchronousChannelProvider extends AsynchronousChannelProvider {
// 系统默认异步通道组
private static volatile EPollPort defaultPort;
private EPollPort defaultEventPort() throws IOException {
if (defaultPort == null) {
synchronized (LinuxAsynchronousChannelProvider.class) {
if (defaultPort == null) {
// 创建异步通道组EPollPort,并启动
defaultPort = new EPollPort(this, ThreadPool.getDefault()).start();
}
}
}
return defaultPort;
}
}
创建新的异步通道组:
// LinuxAsynchronousChannelProvider
public AsynchronousChannelGroup openAsynchronousChannelGroup(int nThreads, ThreadFactory factory) {
return new EPollPort(this, ThreadPool.create(nThreads, factory)).start();
}
public AsynchronousChannelGroup openAsynchronousChannelGroup(ExecutorService executor, int initialSize) {
return new EPollPort(this, ThreadPool.wrap(executor, initialSize)).start();
}
创建异步TCP监听套接字通道:
// AsynchronousServerSocketChannel
public static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group) {
// 由异步通道组绑定的异步通道提供商创建异步通道
AsynchronousChannelProvider provider = (group == null) ?
AsynchronousChannelProvider.provider() : group.provider();
return provider.openAsynchronousServerSocketChannel(group);
}
// LinuxAsynchronousChannelProvider
public AsynchronousServerSocketChannel openAsynchronousServerSocketChannel(AsynchronousChannelGroup group) {
return new UnixAsynchronousServerSocketChannelImpl(toPort(group));
}
创建异步TCP连接套接字通道:
// AsynchronousSocketChannel
public static AsynchronousSocketChannel open(AsynchronousChannelGroup group) {
// 由异步通道组绑定的异步通道提供商创建异步通道
AsynchronousChannelProvider provider = (group == null) ?
AsynchronousChannelProvider.provider() : group.provider();
return provider.openAsynchronousSocketChannel(group);
}
// LinuxAsynchronousChannelProvider
public AsynchronousSocketChannel openAsynchronousSocketChannel(AsynchronousChannelGroup group) {
return new UnixAsynchronousSocketChannelImpl(toPort(group));
}
异步通道组实现
通道管理
- 通道创建时注册到通道组
1.1 创建监听套接字
AsynchronousServerSocketChannelImpl(AsynchronousChannelGroupImpl group) {
// 绑定异步通道提供商
super(group.provider());
// 创建监听套接字,设置文件描述符
this.fd = Net.serverSocket(true);
}
UnixAsynchronousServerSocketChannelImpl(Port port) {
super(port); // 见上
try {
// 设置为非阻塞模式
IOUtil.configureBlocking(fd, false);
} catch (IOException x) {
nd.close(fd);
throw x;
}
this.port = port;
this.fdVal = IOUtil.fdVal(fd);
// 将通道的文件描述符注册到通道组
port.register(fdVal, this);
}
1.2 创建连接套接字
AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group) {
// 绑定异步通道提供商
super(group.provider());
// 创建连接套接字,设置文件描述符
this.fd = Net.socket(true);
// 设置初始状态
this.state = ST_UNCONNECTED;
}
UnixAsynchronousSocketChannelImpl(Port port) {
super(port); // 见上
try {
// 设置为非阻塞模式
IOUtil.configureBlocking(fd, false);
} catch (IOException x) {
nd.close(fd);
throw x;
}
this.port = port;
this.fdVal = IOUtil.fdVal(fd);
// 将通道的文件描述符注册到通道组
port.register(fdVal, this);
}
1.3 注册通道
// Port
final void register(int fd, PollableChannel ch) {
// Map<Integer,PollableChannel> fdToChannel
// Port维护一个Map,保存通道文件描述符到通道实例的映射
fdToChannel.put(Integer.valueOf(fd), ch);
}
- 通道关闭时从通道组注销
2.1 关闭套接字
// UnixAsynchronousServerSocketChannelImpl
void implClose() throws IOException {
// 从通道组注销
port.unregister(fdVal);
// 关闭文件描述符
nd.close(fd);
...
}
// UnixAsynchronousSocketChannelImpl
void implClose() throws IOException {
// 从通道组注销
port.unregister(fdVal);
// 关闭文件描述符
nd.close(fd);
...
}
2.2 注销通道
// Port
final void unregister(int fd) {
// 从映射中移除
fdToChannel.remove(Integer.valueOf(fd));
...
}
注册监听操作
- accept
// UnixAsynchronousServerSocketChannelImpl
Future<AsynchronousSocketChannel> implAccept(Object att,
CompletionHandler<AsynchronousSocketChannel,Object> handler) {
// 检查并设置标志位,以防止并发进行accept操作
if (!accepting.compareAndSet(false, true))
throw new AcceptPendingException();
try {
begin();
// 初始化accept操作
int n = accept0(this.fd, newfd, isaa);
if (n == IOStatus.UNAVAILABLE) { // 尚无接受连接
PendingFuture<AsynchronousSocketChannel,Object> result = null;
synchronized (updateLock) {
if (handler == null) {
// 若无完成处理器,则生成Future,以供获取操作完成结果
this.acceptHandler = null;
result = new PendingFuture<AsynchronousSocketChannel,Object>(this);
this.acceptFuture = result;
} else {
// 若有完成处理器,则设置处理器和附件对象,以供操作完成后调用
this.acceptHandler = handler;
this.acceptAttachment = att;
}
this.acceptPending = true; // accept等待状态
}
// 注册监听accept操作
port.startPoll(fdVal, Port.POLLIN);
return result;
}
} catch (Throwable x) {
...
} finally {
end();
}
...
}
- connect
// UnixAsynchronousSocketChannelImpl
<A> Future<Void> implConnect(SocketAddress remote,
A attachment, CompletionHandler<Void,? super A> handler) {
// 连接地址
InetSocketAddress isa = Net.checkAddress(remote);
try {
begin();
// 初始化connect操作
int n = Net.connect(fd, isa.getAddress(), isa.getPort());
if (n == IOStatus.UNAVAILABLE) { // 尚未建立连接
PendingFuture<Void,A> result = null;
synchronized (updateLock) {
if (handler == null) {
// 若无完成处理器,则生成Future,以供获取操作完成结果
result = new PendingFuture<Void,A>(this, OpType.CONNECT);
this.connectFuture = (PendingFuture<Void,Object>)result;
} else {
// 若有完成处理器,则设置处理器和附件对象,以供操作完成后调用
this.connectHandler = (CompletionHandler<Void,Object>)handler;
this.connectAttachment = attachment;
}
this.connectPending = true; // connect等待状态
// 注册监听操作
updateEvents();
}
return result;
}
...
} catch (Throwable x) {
...
} finally {
end();
}
...
}
// 注册监听操作(读、写、连接)
private void updateEvents() {
int events = 0;
if (readPending)
events |= Port.POLLIN;
if (connectPending || writePending)
events |= Port.POLLOUT;
if (events != 0)
port.startPoll(fdVal, events);
}
- read
// AsynchronousSocketChannelImpl
public final <A> void read(ByteBuffer dst, long timeout, TimeUnit unit,
A attachment, CompletionHandler<Integer,? super A> handler) {
read(false, dst, null, timeout, unit, attachment, handler);
}
// UnixAsynchronousSocketChannelImpl
<V extends Number,A> Future<V> implRead(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts,
long timeout, TimeUnit unit,
A attachment, CompletionHandler<V,? super A> handler) {
int n = IOStatus.UNAVAILABLE;
boolean pending = false;
try {
begin();
if (n == IOStatus.UNAVAILABLE) {
PendingFuture<V,A> result = null;
synchronized (updateLock) {
this.isScatteringRead = isScatteringRead;
this.readBuffer = dst;
this.readBuffers = dsts;
if (handler == null) {
// 若无完成处理器,则生成Future,以供获取操作完成结果
this.readHandler = null;
result = new PendingFuture<V,A>(this, OpType.READ);
this.readFuture = (PendingFuture<Number,Object>)result;
this.readAttachment = null;
} else {
// 若有完成处理器,则设置处理器和附件对象,以供操作完成后调用
this.readHandler = (CompletionHandler<Number,Object>)handler;
this.readAttachment = attachment;
this.readFuture = null;
}
if (timeout > 0L) {
// 若超时等待,则提交超时中断任务
this.readTimer = port.schedule(readTimeoutTask, timeout, unit);
}
this.readPending = true; // read等待状态
// 注册监听操作
updateEvents();
}
pending = true;
return result;
}
} catch (Throwable x) {
...
} finally {
if (!pending)
enableReading();
end();
}
...
}
private Runnable readTimeoutTask = new Runnable() {
public void run() {
CompletionHandler<Number,Object> handler = null;
Object att = null;
PendingFuture<Number,Object> future = null;
synchronized (updateLock) {
if (!readPending)
return; // 如果read已完成则返回
readPending = false; // 不再read等待
handler = readHandler;
att = readAttachment;
future = readFuture;
}
// 不再继续读取
enableReading(true);
// 设置结果或调用完成处理器
Exception exc = new InterruptedByTimeoutException();
if (handler == null) {
future.setFailure(exc);
} else {
AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this;
Invoker.invokeIndirectly(ch, handler, att, null, exc);
}
}
};
- write
// AsynchronousSocketChannelImpl
public final <A> void write(ByteBuffer src, long timeout, TimeUnit unit,
A attachment, CompletionHandler<Integer,? super A> handler) {
write(false, src, null, timeout, unit, attachment, handler);
}
// UnixAsynchronousSocketChannelImpl
<V extends Number,A> Future<V> implWrite(boolean isGatheringWrite, ByteBuffer src, ByteBuffer[] srcs,
long timeout, TimeUnit unit,
A attachment, CompletionHandler<V,? super A> handler) {
int n = IOStatus.UNAVAILABLE;
boolean pending = false;
try {
begin();
if (n == IOStatus.UNAVAILABLE) {
PendingFuture<V,A> result = null;
synchronized (updateLock) {
this.isGatheringWrite = isGatheringWrite;
this.writeBuffer = src;
this.writeBuffers = srcs;
if (handler == null) {
// 若无完成处理器,则生成Future,以供获取操作完成结果
this.writeHandler = null;
result = new PendingFuture<V,A>(this, OpType.WRITE);
this.writeFuture = (PendingFuture<Number,Object>)result;
this.writeAttachment = null;
} else {
// 若有完成处理器,则设置处理器和附件对象,以供操作完成后调用
this.writeHandler = (CompletionHandler<Number,Object>)handler;
this.writeAttachment = attachment;
this.writeFuture = null;
}
if (timeout > 0L) {
// 若超时等待,则提交超时中断任务
this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit);
}
this.writePending = true; // write等待状态
// 注册监听操作
updateEvents();
}
pending = true;
return result;
}
} catch (Throwable x) {
...
} finally {
if (!pending)
enableWriting();
end();
}
...
}
private Runnable writeTimeoutTask = new Runnable() {
public void run() {
CompletionHandler<Number,Object> handler = null;
Object att = null;
PendingFuture<Number,Object> future = null;
synchronized (updateLock) {
if (!writePending)
return;
writePending = false;
handler = writeHandler;
att = writeAttachment;
future = writeFuture;
}
// kill further writing before releasing waiters
enableWriting(true);
// invoke handler or set result
Exception exc = new InterruptedByTimeoutException();
if (handler != null) {
Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this, handler, att, null, exc);
} else {
future.setFailure(exc);
}
}
};
- 注册监听操作
// EPollPort
void startPoll(int fd, int events) {
// 添加或修改监听描述符和监听操作
// epfd:EPoll描述符
// EPOLLONESHOT:设置一次通知状态
int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
if (err == ENOENT)
err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
if (err != 0)
throw new AssertionError(); // should not happen
}
EPoll
异步通道组创建时立即启动,循环进行epoll,并对就绪通道操作进行分发。
- 异步通道组创建:
EPollPort(AsynchronousChannelProvider provider, ThreadPool pool) {
super(provider, pool);
// 创建epoll描述符
this.epfd = epollCreate();
// create socket pair for wakeup mechanism
int[] sv = new int[2];
try {
socketpair(sv);
// register one end with epoll
epollCtl(epfd, EPOLL_CTL_ADD, sv[0], POLLIN);
} catch (IOException x) {
close0(epfd);
throw x;
}
this.sp = sv;
// 为epoll检测到的就绪事件分配空间
this.address = allocatePollArray(MAX_EPOLL_EVENTS);
// epoll检测到的就绪事件被封装为Event类实例,并加入队列,等待分发处理
this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);
// 往队列中加入一个标记Event,以启动epoll
this.queue.offer(NEED_TO_POLL);
}
// 就绪通道和操作封装成Event类实例
static class Event {
final PollableChannel channel;
final int events;
Event(PollableChannel channel, int events) {
this.channel = channel;
this.events = events;
}
PollableChannel channel() { return channel; }
int events() { return events; }
}
- 循环epoll:
// EPollPort
EPollPort start() {
startThreads(new EventHandlerTask());
return this;
}
// 通道组启动执行的任务
private class EventHandlerTask implements Runnable {
public void run() {
Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
Invoker.getGroupAndInvokeCount();
final boolean isPooledThread = (myGroupAndInvokeCount != null);
boolean replaceMe = false;
Event ev;
try {
for (;;) {
// reset invoke count
if (isPooledThread)
myGroupAndInvokeCount.resetInvokeCount();
try {
replaceMe = false;
// 从队列中取出Event进行分发
ev = queue.take();
// 队列中无Event处理时,会添加NEED_TO_POLL标记Event,以再次启动epoll
if (ev == NEED_TO_POLL) {
try {
ev = poll();
} catch (IOException x) {
x.printStackTrace();
return;
}
}
} catch (InterruptedException x) {
continue;
}
// handle wakeup to execute task or shutdown
if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
Runnable task = pollTask();
if (task == null) {
// shutdown request
return;
}
// run task (may throw error/exception)
replaceMe = true;
task.run();
continue;
}
// 分发到Event对应的通道
// 调用通道的onEvent()方法进行处理
try {
ev.channel().onEvent(ev.events(), isPooledThread);
} catch (Error x) {
replaceMe = true; throw x;
} catch (RuntimeException x) {
replaceMe = true; throw x;
}
}
} finally {
// last handler to exit when shutdown releases resources
int remaining = threadExit(this, replaceMe);
if (remaining == 0 && isShutdown()) {
implClose();
}
}
}
private Event poll() throws IOException {
try {
for (;;) {
// 检测就绪通道和操作
int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
/*
* 'n' events have been read. Here we map them to their
* corresponding channel in batch and queue n-1 so that
* they can be handled by other handler threads. The last
* event is handled by this thread (and so is not queued).
*/
fdToChannelLock.readLock().lock();
try {
while (n-- > 0) {
// 就绪事件地址
long eventAddress = getEvent(address, n);
// 就绪通道描述符
int fd = getDescriptor(eventAddress);
// wakeup
if (fd == sp[0]) {
if (wakeupCount.decrementAndGet() == 0) {
// no more wakeups so drain pipe
drain1(sp[0]);
}
// queue special event if there are more events to handle.
if (n > 0) {
queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
continue;
}
return EXECUTE_TASK_OR_SHUTDOWN;
}
// 将通道描述符映射到通道
PollableChannel channel = fdToChannel.get(fd);
if (channel != null) {
// 通道就绪操作
int events = getEvents(eventAddress);
// 将就绪通道和操作封装为Event实例
Event ev = new Event(channel, events);
// 将最后一个Event返回处理,其它Event排队
if (n > 0) {
queue.offer(ev);
} else {
return ev;
}
}
}
} finally {
fdToChannelLock.readLock().unlock();
}
}
} finally {
// 每次epoll完成,都需要向队列中加入NEED_TO_POLL标记Event,以便能够再次epoll
queue.offer(NEED_TO_POLL);
}
}
}
- 处理就绪操作
- 监听套接字就绪(accept):
// UnixAsynchronousServerSocketChannelImpl
public void onEvent(int events, boolean mayInvokeDirect) {
synchronized (updateLock) {
if (!acceptPending)
return; // 可能已经强制异步关闭
acceptPending = false; // 不再是accept等待状态
}
// newfd:连接描述符
// isaa:连接地址
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
Throwable exc = null;
try {
begin();
// 进行连接
int n = accept0(this.fd, newfd, isaa);
// 仍无连接:设置accept等待状态,再次注册accept监听
if (n == IOStatus.UNAVAILABLE) {
synchronized (updateLock) {
acceptPending = true;
}
port.startPoll(fdVal, Port.POLLIN);
return;
}
} catch (Throwable x) {
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
exc = x;
} finally {
end();
}
AsynchronousSocketChannel child = null;
if (exc == null) {
try {
// 连接成功,创建连接套接字通道
child = finishAccept(newfd, isaa[0], acceptAcc);
} catch (Throwable x) {
if (!(x instanceof IOException) && !(x instanceof SecurityException))
x = new IOException(x);
exc = x;
}
}
// 取出accept完成处理器和Future
CompletionHandler<AsynchronousSocketChannel,Object> handler = acceptHandler;
Object att = acceptAttachment;
PendingFuture<AsynchronousSocketChannel,Object> future = acceptFuture;
// 设置标记位,以便能够再次accept
enableAccept();
if (handler == null) {
// 设置结果:连接套接字
future.setResult(child, exc);
// 如果已经异步取消accept,则要关闭连接套接字
if (child != null && future.isCancelled()) {
try {
child.close();
} catch (IOException ignore) { }
}
} else {
// 调用完成处理器
Invoker.invoke(this, handler, att, child, exc);
}
}
- 异步调用完成处理器:
static <V,A> void invokeIndirectly(AsynchronousChannel channel,
final CompletionHandler<V,? super A> handler,
final A attachment,
final V result,
final Throwable exc) {
try {
// 将完成处理器调用封装成任务,提交给线程池异步执行
((Groupable)channel).group().executeOnPooledThread(new Runnable() {
public void run() {
invokeUnchecked(handler, attachment, result, exc);
}
});
} catch (RejectedExecutionException ree) {
throw new ShutdownChannelGroupException();
}
}
static <V,A> void invokeUnchecked(CompletionHandler<V,? super A> handler,
A attachment,
V value,
Throwable exc) {
// 若无异常,调用处理器的completed方法
// 若有异常,调用处理器的failed方法
if (exc == null) {
handler.completed(value, attachment);
} else {
handler.failed(exc, attachment);
}
// 清除线程中断
Thread.interrupted();
}
- 连接套接字就绪(connect, read, write)
// UnixAsynchronousSocketChannelImpl
public void onEvent(int events, boolean mayInvokeDirect) {
// 判断就绪操作
boolean readable = (events & Port.POLLIN) > 0; // read
boolean writable = (events & Port.POLLOUT) > 0; // connect/write
if ((events & (Port.POLLERR | Port.POLLHUP)) > 0) {
readable = true;
writable = true;
}
finish(mayInvokeDirect, readable, writable);
}
private void finish(boolean mayInvokeDirect, boolean readable, boolean writable) {
boolean finishRead = false;
boolean finishWrite = false;
boolean finishConnect = false;
// 判断就绪操作,设置标记
synchronized (updateLock) {
if (readable && this.readPending) {
this.readPending = false;
finishRead = true;
}
if (writable) {
if (this.writePending) {
this.writePending = false;
finishWrite = true;
} else if (this.connectPending) {
this.connectPending = false;
finishConnect = true;
}
}
}
// complete the I/O operation. Special case for when channel is
// ready for both reading and writing. In that case, submit task to
// complete write if write operation has a completion handler.
if (finishRead) {
if (finishWrite)
finishWrite(false);
finishRead(mayInvokeDirect);
return;
}
if (finishWrite) {
finishWrite(mayInvokeDirect);
}
if (finishConnect) {
finishConnect(mayInvokeDirect);
}
}
// 完成read操作,并异步调用完成处理器
private void finishRead(boolean mayInvokeDirect) {
int n = -1;
Throwable exc = null;
// copy fields as we can't access them after reading is re-enabled.
boolean scattering = isScatteringRead;
CompletionHandler<Number,Object> handler = readHandler;
Object att = readAttachment;
PendingFuture<Number,Object> future = readFuture;
Future<?> timeout = readTimer;
try {
begin();
// 完成read操作:将读取内容写入缓冲区
if (scattering) {
n = (int)IOUtil.read(fd, readBuffers, nd);
} else {
n = IOUtil.read(fd, readBuffer, -1, nd);
}
// 读操作实际未就绪:重置read等待状态,返回
if (n == IOStatus.UNAVAILABLE) {
synchronized (updateLock) {
readPending = true;
}
return;
}
// allow objects to be GC'ed.
this.readBuffer = null;
this.readBuffers = null;
this.readAttachment = null;
// allow another read to be initiated
enableReading();
} catch (Throwable x) {
enableReading();
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
exc = x;
} finally {
// restart poll in case of concurrent write
if (!(exc instanceof AsynchronousCloseException))
lockAndUpdateEvents();
end();
}
// cancel the associated timer
if (timeout != null)
timeout.cancel(false);
// create result
Number result = (exc != null) ? null : (scattering) ?
(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
// 无完成处理器,则设置结果;有则异步调用完成处理器
if (handler == null) {
future.setResult(result, exc);
} else {
if (mayInvokeDirect) {
Invoker.invokeUnchecked(handler, att, result, exc);
} else {
Invoker.invokeIndirectly(this, handler, att, result, exc);
}
}
}
// 完成write操作,并异步调用完成处理器
private void finishWrite(boolean mayInvokeDirect) {
int n = -1;
Throwable exc = null;
// copy fields as we can't access them after reading is re-enabled.
boolean gathering = this.isGatheringWrite;
CompletionHandler<Number,Object> handler = this.writeHandler;
Object att = this.writeAttachment;
PendingFuture<Number,Object> future = this.writeFuture;
Future<?> timer = this.writeTimer;
try {
begin();
if (gathering) {
n = (int)IOUtil.write(fd, writeBuffers, nd);
} else {
n = IOUtil.write(fd, writeBuffer, -1, nd);
}
if (n == IOStatus.UNAVAILABLE) {
// spurious wakeup, is this possible?
synchronized (updateLock) {
writePending = true;
}
return;
}
// allow objects to be GC'ed.
this.writeBuffer = null;
this.writeBuffers = null;
this.writeAttachment = null;
// allow another write to be initiated
enableWriting();
} catch (Throwable x) {
enableWriting();
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
exc = x;
} finally {
// restart poll in case of concurrent write
if (!(exc instanceof AsynchronousCloseException))
lockAndUpdateEvents();
end();
}
// cancel the associated timer
if (timer != null)
timer.cancel(false);
// create result
Number result = (exc != null) ? null : (gathering) ?
(Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
// invoke handler or set result
if (handler == null) {
future.setResult(result, exc);
} else {
if (mayInvokeDirect) {
Invoker.invokeUnchecked(handler, att, result, exc);
} else {
Invoker.invokeIndirectly(this, handler, att, result, exc);
}
}
}
// 完成connect操作,并异步调用完成处理器
private void finishConnect(boolean mayInvokeDirect) {
Throwable e = null;
try {
begin();
checkConnect(fdVal);
setConnected();
} catch (Throwable x) {
if (x instanceof ClosedChannelException)
x = new AsynchronousCloseException();
e = x;
} finally {
end();
}
if (e != null) {
// close channel if connection cannot be established
try {
close();
} catch (Throwable suppressed) {
e.addSuppressed(suppressed);
}
}
// invoke handler and set result
CompletionHandler<Void,Object> handler = connectHandler;
Object att = connectAttachment;
PendingFuture<Void,Object> future = connectFuture;
if (handler == null) {
future.setResult(null, e);
} else {
if (mayInvokeDirect) {
Invoker.invokeUnchecked(handler, att, null, e);
} else {
Invoker.invokeIndirectly(this, handler, att, null, e);
}
}
}
线程池
AsynchronousChannelGroupImpl维护一个内部线程池
abstract class AsynchronousChannelGroupImpl
extends AsynchronousChannelGroup implements Executor {
// 线程池
private final ThreadPool pool;
// 配合固定线程池时使用
private final Queue<Runnable> taskQueue;
// 通道完成IO操作后,将完成处理器包装成任务提交给线程池异步执行
final void executeOnPooledThread(Runnable task) {
// 固定线程池:任务排队,等待执行
// 其它线程池:直接提交任务执行
if (isFixedThreadPool()) {
executeOnHandlerTask(task);
} else {
pool.executor().execute(bindToGroup(task));
}
}
}