异步

异步通道

AsynchronousServerSocketChannel:TCP监听套接字:

// 由异步通道提供商创建异步通道,并将通道绑定到异步通道组
static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group);

// 绑定本地地址,进入监听状态
AsynchronousServerSocketChannel bind(SocketAddress local);

// 异步接受连接
// 接受连接(或失败)后将调用完成处理器handler
// 传递给handler的参数是新连接的AsynchronousSocketChannel(绑定到相同通道组)
// 为了并发处理新连接,handler不由连接操作初始化线程调用
abstract <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler);

AsynchronousSocketChannel:TCP连接套接字:

// 由异步通道提供商创建异步通道,并将通道绑定到异步通道组
static AsynchronousSocketChannel open(AsynchronousChannelGroup group);

// 异步发起连接
// 建立连接(或失败)后将调用完成处理器handler
abstract <A> void connect(SocketAddress remote, 
    A attachment, CompletionHandler<Void,? super A> handler);

// 异步从通道读取字节序列到指定缓冲区
// 读操作完成 (或失败)后将调用完成处理器handler
// 传递给handler的参数是读取的字节数(-1表示达到流末尾)
// 可以指定timeout参数进行超时等待
abstract <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, 
    A attachment, CompletionHandler<Integer,? super A> handler);

// 异步将指定缓冲区的字节序列写入通道
// 写操作完成(或失败)后将调用完成处理器handler
// 传递能handler的参数是写入的字节数
// 可以指定timeout参数进行超时等待
abstract <A> void write(ByteBuffer src, long timeout, TimeUnit unit,
    A attachment, CompletionHandler<Integer,? super A> handler);

继承树:

java.nio.channels.AsynchronousServerSocketChannel
    -> sun.nio.ch.AsynchronousServerSocketChannelImpl
        -> sun.nio.ch.UnixAsynchronousServerSocketChannelImpl

java.nio.channels.AsynchronousSocketChannel
    -> sun.nio.ch.AsynchronousSocketChannelImpl
        -> sun.nio.ch.UnixAsynchronousSocketChannelImpl

异步通道组AsynchronousChannelGroup

异步通道组封装了异步I/O完成的处理机制。

异步通道创建时要绑定到一个异步通道组(未指定则为系统默认)。

通道组使用Linux Epoll为组内所有通道进行异步IO,并将就绪IO操作分发到通道 。

通道完成IO操作后,将完成处理器提交给通道组内部的线程池进行异步执行。

继承树:

java.nio.channels.AsynchronousChannelGroup:管理异步通道提供商
    -> sun.nio.ch.AsynchronousChannelGroupImpl:管理线程池
        -> sun.nio.ch.Port:管理异步通道
            -> sun.nio.ch.EPollPort:epoll和就绪操作分发

异步通道提供商AsynchronousChannelProvider

异步通道的服务提供类。

Java虚拟机维护一个系统范围内默认的异步通道提供商实例,此实例维护系统默认异步通道组,还提供创建新的异步通道组实例的方法,所有创建的异步通道组都绑定此异步通道提供商。

继承树:

java.nio.channels.spi.AsynchronousChannelProvider
    -> sun.nio.ch.LinuxAsynchronousChannelProvider

加载系统默认异步通道提供商:

// AsynchronousChannelProvider
private static class ProviderHolder {
    static final AsynchronousChannelProvider provider = load();
    private static AsynchronousChannelProvider load() {
        AsynchronousChannelProvider p;
        // 先按系统属性java.nio.channels.spi.AsynchronousChannelProvider
        p = loadProviderFromProperty();
        if (p != null) return p;
        // 再从系统类加载器中获取第一个实现类
        p = loadProviderAsService();
        if (p != null) return p;
        // 最后加载系统默认实现
        return sun.nio.ch.DefaultAsynchronousChannelProvider.create();
    }
}

// DefaultAsynchronousChannelProvider
// 根据操作系统加载对应的实现类
public static AsynchronousChannelProvider create() {
    String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.SolarisAsynchronousChannelProvider");
    if (osname.equals("Linux"))
        return createProvider("sun.nio.ch.LinuxAsynchronousChannelProvider");
    if (osname.contains("OS X"))
        return createProvider("sun.nio.ch.BsdAsynchronousChannelProvider");
    throw new InternalError("platform not recognized");
}

加载系统默认异步通道组:

// LinuxAsynchronousChannelProvider
public class LinuxAsynchronousChannelProvider extends AsynchronousChannelProvider {
    // 系统默认异步通道组
    private static volatile EPollPort defaultPort;

    private EPollPort defaultEventPort() throws IOException {
        if (defaultPort == null) {
            synchronized (LinuxAsynchronousChannelProvider.class) {
                if (defaultPort == null) {
                    // 创建异步通道组EPollPort,并启动
                    defaultPort = new EPollPort(this, ThreadPool.getDefault()).start();
                }
            }
        }
        return defaultPort;
    }
}

创建新的异步通道组:

// LinuxAsynchronousChannelProvider
public AsynchronousChannelGroup openAsynchronousChannelGroup(int nThreads, ThreadFactory factory) {
    return new EPollPort(this, ThreadPool.create(nThreads, factory)).start();
}
 
public AsynchronousChannelGroup openAsynchronousChannelGroup(ExecutorService executor, int initialSize) {
    return new EPollPort(this, ThreadPool.wrap(executor, initialSize)).start();
}

创建异步TCP监听套接字通道:

// AsynchronousServerSocketChannel
public static AsynchronousServerSocketChannel open(AsynchronousChannelGroup group) {
    // 由异步通道组绑定的异步通道提供商创建异步通道
    AsynchronousChannelProvider provider = (group == null) ?
        AsynchronousChannelProvider.provider() : group.provider();
    return provider.openAsynchronousServerSocketChannel(group);
}

// LinuxAsynchronousChannelProvider
public AsynchronousServerSocketChannel openAsynchronousServerSocketChannel(AsynchronousChannelGroup group) {
    return new UnixAsynchronousServerSocketChannelImpl(toPort(group));
}

创建异步TCP连接套接字通道:

// AsynchronousSocketChannel 
public static AsynchronousSocketChannel open(AsynchronousChannelGroup group) {
    // 由异步通道组绑定的异步通道提供商创建异步通道
    AsynchronousChannelProvider provider = (group == null) ?
        AsynchronousChannelProvider.provider() : group.provider();
    return provider.openAsynchronousSocketChannel(group);
}

// LinuxAsynchronousChannelProvider
public AsynchronousSocketChannel openAsynchronousSocketChannel(AsynchronousChannelGroup group) {
    return new UnixAsynchronousSocketChannelImpl(toPort(group));
}

异步通道组实现

通道管理

  1. 通道创建时注册到通道组

1.1 创建监听套接字

AsynchronousServerSocketChannelImpl(AsynchronousChannelGroupImpl group) {
    // 绑定异步通道提供商
    super(group.provider());
    // 创建监听套接字,设置文件描述符
    this.fd = Net.serverSocket(true);
}

UnixAsynchronousServerSocketChannelImpl(Port port) {
    super(port); // 见上
    try {
        // 设置为非阻塞模式
        IOUtil.configureBlocking(fd, false);
    } catch (IOException x) {
        nd.close(fd);
        throw x;
    }
    this.port = port;
    this.fdVal = IOUtil.fdVal(fd);
    // 将通道的文件描述符注册到通道组
    port.register(fdVal, this);
}

1.2 创建连接套接字

AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group) {
    // 绑定异步通道提供商
    super(group.provider());
    // 创建连接套接字,设置文件描述符
    this.fd = Net.socket(true);
    // 设置初始状态
    this.state = ST_UNCONNECTED;
}

UnixAsynchronousSocketChannelImpl(Port port) {
    super(port); // 见上
    try {
        // 设置为非阻塞模式
        IOUtil.configureBlocking(fd, false);
    } catch (IOException x) {
        nd.close(fd);
        throw x;
    }
    this.port = port;
    this.fdVal = IOUtil.fdVal(fd);
    // 将通道的文件描述符注册到通道组
    port.register(fdVal, this);
}

1.3 注册通道

// Port
final void register(int fd, PollableChannel ch) {
    // Map<Integer,PollableChannel> fdToChannel
    // Port维护一个Map,保存通道文件描述符到通道实例的映射
    fdToChannel.put(Integer.valueOf(fd), ch);
}
  1. 通道关闭时从通道组注销

2.1 关闭套接字

// UnixAsynchronousServerSocketChannelImpl
void implClose() throws IOException {
    // 从通道组注销
    port.unregister(fdVal);
    // 关闭文件描述符
    nd.close(fd);
    ...
}

// UnixAsynchronousSocketChannelImpl
void implClose() throws IOException {
    // 从通道组注销
    port.unregister(fdVal);
    // 关闭文件描述符
    nd.close(fd);
    ...
}

2.2 注销通道

// Port
final void unregister(int fd) {
    // 从映射中移除
    fdToChannel.remove(Integer.valueOf(fd));
    ...
}

注册监听操作

  1. accept
// UnixAsynchronousServerSocketChannelImpl
Future<AsynchronousSocketChannel> implAccept(Object att, 
    CompletionHandler<AsynchronousSocketChannel,Object> handler) {
    
    // 检查并设置标志位,以防止并发进行accept操作
    if (!accepting.compareAndSet(false, true))
        throw new AcceptPendingException();

    try {
        begin();
        
        // 初始化accept操作
        int n = accept0(this.fd, newfd, isaa);

        if (n == IOStatus.UNAVAILABLE) {  // 尚无接受连接
            PendingFuture<AsynchronousSocketChannel,Object> result = null;

            synchronized (updateLock) {
                if (handler == null) {
                    // 若无完成处理器,则生成Future,以供获取操作完成结果
                    this.acceptHandler = null;
                    result = new PendingFuture<AsynchronousSocketChannel,Object>(this);
                    this.acceptFuture = result;

                } else {
                   // 若有完成处理器,则设置处理器和附件对象,以供操作完成后调用
                    this.acceptHandler = handler;
                    this.acceptAttachment = att;
                }

                this.acceptPending = true; // accept等待状态
            }
            
            // 注册监听accept操作
            port.startPoll(fdVal, Port.POLLIN);
            return result;
        }
    } catch (Throwable x) {
        ...
    } finally {
        end();
    }
    ...
}
  1. connect
// UnixAsynchronousSocketChannelImpl
<A> Future<Void> implConnect(SocketAddress remote,
    A attachment, CompletionHandler<Void,? super A> handler) {
    // 连接地址
    InetSocketAddress isa = Net.checkAddress(remote);

    try {
        begin();
        
        // 初始化connect操作
        int n = Net.connect(fd, isa.getAddress(), isa.getPort());
        
        if (n == IOStatus.UNAVAILABLE) {  // 尚未建立连接
            PendingFuture<Void,A> result = null;

            synchronized (updateLock) {
                if (handler == null) {
                    // 若无完成处理器,则生成Future,以供获取操作完成结果
                    result = new PendingFuture<Void,A>(this, OpType.CONNECT);
                    this.connectFuture = (PendingFuture<Void,Object>)result;

                } else {
                    // 若有完成处理器,则设置处理器和附件对象,以供操作完成后调用
                    this.connectHandler = (CompletionHandler<Void,Object>)handler;
                    this.connectAttachment = attachment;
                }

                this.connectPending = true;  // connect等待状态
                // 注册监听操作
                updateEvents();
            }
            return result;
        }
        ...
    } catch (Throwable x) {
        ...
    } finally {
        end();
    }
    ...
}

// 注册监听操作(读、写、连接)
private void updateEvents() {
    int events = 0;
    if (readPending)
        events |= Port.POLLIN;
    if (connectPending || writePending)
        events |= Port.POLLOUT;
    if (events != 0)
        port.startPoll(fdVal, events);
}
  1. read
// AsynchronousSocketChannelImpl
public final <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, 
    A attachment, CompletionHandler<Integer,? super A> handler) {
    read(false, dst, null, timeout, unit, attachment, handler);
}

// UnixAsynchronousSocketChannelImpl
<V extends Number,A> Future<V> implRead(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, 
    long timeout, TimeUnit unit, 
    A attachment, CompletionHandler<V,? super A> handler) {
    
    int n = IOStatus.UNAVAILABLE;
    boolean pending = false;

    try {
        begin();
        
        if (n == IOStatus.UNAVAILABLE) {
            PendingFuture<V,A> result = null;
            
            synchronized (updateLock) {
                this.isScatteringRead = isScatteringRead;
                this.readBuffer = dst;
                this.readBuffers = dsts;
                
                if (handler == null) {
                    // 若无完成处理器,则生成Future,以供获取操作完成结果
                    this.readHandler = null;
                    result = new PendingFuture<V,A>(this, OpType.READ);
                    this.readFuture = (PendingFuture<Number,Object>)result;
                    this.readAttachment = null;

                } else {
                    // 若有完成处理器,则设置处理器和附件对象,以供操作完成后调用
                    this.readHandler = (CompletionHandler<Number,Object>)handler;
                    this.readAttachment = attachment;
                    this.readFuture = null;
                }
                
                if (timeout > 0L) {
                    // 若超时等待,则提交超时中断任务
                    this.readTimer = port.schedule(readTimeoutTask, timeout, unit);
                }

                this.readPending = true;  // read等待状态
                // 注册监听操作
                updateEvents();
            }
            pending = true;
            return result;
        }
    } catch (Throwable x) {
        ...
    } finally {
        if (!pending)
            enableReading();
        end();
    }
    ...
}

private Runnable readTimeoutTask = new Runnable() {
    public void run() {
        CompletionHandler<Number,Object> handler = null;
        Object att = null;
        PendingFuture<Number,Object> future = null;
        
        synchronized (updateLock) {
            if (!readPending)
                return;  // 如果read已完成则返回
            readPending = false;  // 不再read等待
            handler = readHandler;
            att = readAttachment;
            future = readFuture;
        }

        // 不再继续读取
        enableReading(true);
        
        // 设置结果或调用完成处理器
        Exception exc = new InterruptedByTimeoutException();
        if (handler == null) {
            future.setFailure(exc);
        } else {
            AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this;
            Invoker.invokeIndirectly(ch, handler, att, null, exc);
        }
    }
};
  1. write
// AsynchronousSocketChannelImpl
public final <A> void write(ByteBuffer src, long timeout, TimeUnit unit,
    A attachment, CompletionHandler<Integer,? super A> handler) {
    write(false, src, null, timeout, unit, attachment, handler);
}

// UnixAsynchronousSocketChannelImpl
<V extends Number,A> Future<V> implWrite(boolean isGatheringWrite, ByteBuffer src, ByteBuffer[] srcs,
    long timeout, TimeUnit unit,
    A attachment, CompletionHandler<V,? super A> handler) {

    int n = IOStatus.UNAVAILABLE;
    boolean pending = false;

    try {
        begin();

        if (n == IOStatus.UNAVAILABLE) {
            PendingFuture<V,A> result = null;

            synchronized (updateLock) {
                this.isGatheringWrite = isGatheringWrite;
                this.writeBuffer = src;
                this.writeBuffers = srcs;

                if (handler == null) {
                    // 若无完成处理器,则生成Future,以供获取操作完成结果
                    this.writeHandler = null;
                    result = new PendingFuture<V,A>(this, OpType.WRITE);
                    this.writeFuture = (PendingFuture<Number,Object>)result;
                    this.writeAttachment = null;

                } else {
                    // 若有完成处理器,则设置处理器和附件对象,以供操作完成后调用
                    this.writeHandler = (CompletionHandler<Number,Object>)handler;
                    this.writeAttachment = attachment;
                    this.writeFuture = null;
                }

                if (timeout > 0L) {
                    // 若超时等待,则提交超时中断任务
                    this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit);
                }

                this.writePending = true;  // write等待状态
                // 注册监听操作
                updateEvents();
            }
            pending = true;
            return result;
        }
    } catch (Throwable x) {
        ...
    } finally {
        if (!pending)
            enableWriting();
        end();
    }
    ...
}

private Runnable writeTimeoutTask = new Runnable() {
    public void run() {
        CompletionHandler<Number,Object> handler = null;
        Object att = null;
        PendingFuture<Number,Object> future = null;

        synchronized (updateLock) {
            if (!writePending)
                return;
            writePending = false;
            handler = writeHandler;
            att = writeAttachment;
            future = writeFuture;
        }

        // kill further writing before releasing waiters
        enableWriting(true);

        // invoke handler or set result
        Exception exc = new InterruptedByTimeoutException();
        if (handler != null) {
            Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this, handler, att, null, exc);
        } else {
            future.setFailure(exc);
        }
    }
};
  1. 注册监听操作
// EPollPort
void startPoll(int fd, int events) {
    // 添加或修改监听描述符和监听操作
    // epfd:EPoll描述符
    // EPOLLONESHOT:设置一次通知状态
    int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
    if (err == ENOENT)
        err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
    if (err != 0)
        throw new AssertionError();     // should not happen
}

EPoll

异步通道组创建时立即启动,循环进行epoll,并对就绪通道操作进行分发。

  1. 异步通道组创建:
EPollPort(AsynchronousChannelProvider provider, ThreadPool pool) {
    super(provider, pool);

    // 创建epoll描述符
    this.epfd = epollCreate();

    // create socket pair for wakeup mechanism
    int[] sv = new int[2];
    try {
        socketpair(sv);
        // register one end with epoll
        epollCtl(epfd, EPOLL_CTL_ADD, sv[0], POLLIN);
    } catch (IOException x) {
        close0(epfd);
        throw x;
    }
    this.sp = sv;

    // 为epoll检测到的就绪事件分配空间
    this.address = allocatePollArray(MAX_EPOLL_EVENTS);

    // epoll检测到的就绪事件被封装为Event类实例,并加入队列,等待分发处理
    this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);
    
    // 往队列中加入一个标记Event,以启动epoll
    this.queue.offer(NEED_TO_POLL);
}

// 就绪通道和操作封装成Event类实例
static class Event {
    final PollableChannel channel;
    final int events;

    Event(PollableChannel channel, int events) {
        this.channel = channel;
        this.events = events;
    }

    PollableChannel channel()   { return channel; }
    int events()                { return events; }
}
  1. 循环epoll:
// EPollPort
EPollPort start() {
    startThreads(new EventHandlerTask());
    return this;
}

// 通道组启动执行的任务
private class EventHandlerTask implements Runnable {

    public void run() {
        Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
            Invoker.getGroupAndInvokeCount();
        final boolean isPooledThread = (myGroupAndInvokeCount != null);
        boolean replaceMe = false;
        Event ev;
        try {
            for (;;) {
                // reset invoke count
                if (isPooledThread)
                    myGroupAndInvokeCount.resetInvokeCount();

                try {
                    replaceMe = false;
                    
                    // 从队列中取出Event进行分发
                    ev = queue.take();
                    
                    // 队列中无Event处理时,会添加NEED_TO_POLL标记Event,以再次启动epoll
                    
                    if (ev == NEED_TO_POLL) {
                        try {
                            ev = poll();
                        } catch (IOException x) {
                            x.printStackTrace();
                            return;
                        }
                    }
                } catch (InterruptedException x) {
                    continue;
                }

                // handle wakeup to execute task or shutdown
                if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
                    Runnable task = pollTask();
                    if (task == null) {
                        // shutdown request
                        return;
                    }
                    // run task (may throw error/exception)
                    replaceMe = true;
                    task.run();
                    continue;
                }

                // 分发到Event对应的通道
                // 调用通道的onEvent()方法进行处理
                try {
                    ev.channel().onEvent(ev.events(), isPooledThread);
                } catch (Error x) {
                    replaceMe = true; throw x;
                } catch (RuntimeException x) {
                    replaceMe = true; throw x;
                }
            }
        } finally {
            // last handler to exit when shutdown releases resources
            int remaining = threadExit(this, replaceMe);
            if (remaining == 0 && isShutdown()) {
                implClose();
            }
        }
    }

    private Event poll() throws IOException {
        try {
            for (;;) {
                // 检测就绪通道和操作
                int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
                /*
                * 'n' events have been read. Here we map them to their
                * corresponding channel in batch and queue n-1 so that
                * they can be handled by other handler threads. The last
                * event is handled by this thread (and so is not queued).
                */
                fdToChannelLock.readLock().lock();
                try {
                    while (n-- > 0) {
                        // 就绪事件地址
                        long eventAddress = getEvent(address, n);
                        // 就绪通道描述符
                        int fd = getDescriptor(eventAddress);
                        
                        // wakeup
                        if (fd == sp[0]) {
                            if (wakeupCount.decrementAndGet() == 0) {
                                // no more wakeups so drain pipe
                                drain1(sp[0]);
                            }

                            // queue special event if there are more events to handle.
                            if (n > 0) {
                                queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
                                continue;
                            }
                            return EXECUTE_TASK_OR_SHUTDOWN;
                        }

                        // 将通道描述符映射到通道
                        PollableChannel channel = fdToChannel.get(fd);
                        if (channel != null) {
                            // 通道就绪操作
                            int events = getEvents(eventAddress);
                            // 将就绪通道和操作封装为Event实例
                            Event ev = new Event(channel, events);
                            
                            // 将最后一个Event返回处理,其它Event排队
                            if (n > 0) {
                                queue.offer(ev);
                            } else {
                                return ev;
                            }
                        }
                    }
                } finally {
                    fdToChannelLock.readLock().unlock();
                }
            }
        } finally {
            // 每次epoll完成,都需要向队列中加入NEED_TO_POLL标记Event,以便能够再次epoll
            queue.offer(NEED_TO_POLL);
        }
    }

}
  1. 处理就绪操作
  • 监听套接字就绪(accept):
// UnixAsynchronousServerSocketChannelImpl
public void onEvent(int events, boolean mayInvokeDirect) {
    synchronized (updateLock) {
        if (!acceptPending)
            return;  // 可能已经强制异步关闭
        acceptPending = false; // 不再是accept等待状态
    }

    // newfd:连接描述符
    // isaa:连接地址
    FileDescriptor newfd = new FileDescriptor();
    InetSocketAddress[] isaa = new InetSocketAddress[1];

    Throwable exc = null;
    try {
        begin();

        // 进行连接
        int n = accept0(this.fd, newfd, isaa);

        // 仍无连接:设置accept等待状态,再次注册accept监听
        if (n == IOStatus.UNAVAILABLE) {
            synchronized (updateLock) {
                acceptPending = true;
            }
            port.startPoll(fdVal, Port.POLLIN);
            return;
        }

    } catch (Throwable x) {
        if (x instanceof ClosedChannelException)
            x = new AsynchronousCloseException();
        exc = x;
    } finally {
        end();
    }

    AsynchronousSocketChannel child = null;
    if (exc == null) {
        try {
            // 连接成功,创建连接套接字通道
            child = finishAccept(newfd, isaa[0], acceptAcc);
        } catch (Throwable x) {
            if (!(x instanceof IOException) && !(x instanceof SecurityException))
                x = new IOException(x);
            exc = x;
        }
    }

    // 取出accept完成处理器和Future
    CompletionHandler<AsynchronousSocketChannel,Object> handler = acceptHandler;
    Object att = acceptAttachment;
    PendingFuture<AsynchronousSocketChannel,Object> future = acceptFuture;

    // 设置标记位,以便能够再次accept
    enableAccept();

    if (handler == null) {
        // 设置结果:连接套接字
        future.setResult(child, exc);
        // 如果已经异步取消accept,则要关闭连接套接字
        if (child != null && future.isCancelled()) {
            try {
                child.close();
            } catch (IOException ignore) { }
        }
    } else {
        // 调用完成处理器
        Invoker.invoke(this, handler, att, child, exc);
    }
}
  • 异步调用完成处理器:
static <V,A> void invokeIndirectly(AsynchronousChannel channel,
                                final CompletionHandler<V,? super A> handler, 
                                final A attachment,
                                final V result, 
                                final Throwable exc) {
    try {
        // 将完成处理器调用封装成任务,提交给线程池异步执行
        ((Groupable)channel).group().executeOnPooledThread(new Runnable() {
            public void run() {
                invokeUnchecked(handler, attachment, result, exc);
            }
        });
    } catch (RejectedExecutionException ree) {
        throw new ShutdownChannelGroupException();
    }
}

static <V,A> void invokeUnchecked(CompletionHandler<V,? super A> handler,
                                A attachment, 
                                V value, 
                                Throwable exc) {
    // 若无异常,调用处理器的completed方法
    // 若有异常,调用处理器的failed方法
    if (exc == null) {
        handler.completed(value, attachment);
    } else {
        handler.failed(exc, attachment);
    }
    // 清除线程中断
    Thread.interrupted();
}
  • 连接套接字就绪(connect, read, write)
// UnixAsynchronousSocketChannelImpl
public void onEvent(int events, boolean mayInvokeDirect) {
    // 判断就绪操作
    boolean readable = (events & Port.POLLIN) > 0;  // read
    boolean writable = (events & Port.POLLOUT) > 0;  // connect/write
    if ((events & (Port.POLLERR | Port.POLLHUP)) > 0) {
        readable = true;
        writable = true;
    }
    finish(mayInvokeDirect, readable, writable);
}

private void finish(boolean mayInvokeDirect, boolean readable, boolean writable) {
    boolean finishRead = false;
    boolean finishWrite = false;
    boolean finishConnect = false;

    // 判断就绪操作,设置标记
    synchronized (updateLock) {
        if (readable && this.readPending) {
            this.readPending = false;
            finishRead = true;
        }
        if (writable) {
            if (this.writePending) {
                this.writePending = false;
                finishWrite = true;
            } else if (this.connectPending) {
                this.connectPending = false;
                finishConnect = true;
            }
        }
    }

    // complete the I/O operation. Special case for when channel is
    // ready for both reading and writing. In that case, submit task to
    // complete write if write operation has a completion handler.
    if (finishRead) {
        if (finishWrite)
            finishWrite(false);
        finishRead(mayInvokeDirect);
        return;
    }
    if (finishWrite) {
        finishWrite(mayInvokeDirect);
    }
    if (finishConnect) {
        finishConnect(mayInvokeDirect);
    }
}

// 完成read操作,并异步调用完成处理器
private void finishRead(boolean mayInvokeDirect) {
    int n = -1;
    Throwable exc = null;

    // copy fields as we can't access them after reading is re-enabled.
    boolean scattering = isScatteringRead;
    CompletionHandler<Number,Object> handler = readHandler;
    Object att = readAttachment;
    PendingFuture<Number,Object> future = readFuture;
    Future<?> timeout = readTimer;

    try {
        begin();

        // 完成read操作:将读取内容写入缓冲区
        if (scattering) {
            n = (int)IOUtil.read(fd, readBuffers, nd);
        } else {
            n = IOUtil.read(fd, readBuffer, -1, nd);
        }

        // 读操作实际未就绪:重置read等待状态,返回
        if (n == IOStatus.UNAVAILABLE) {
            synchronized (updateLock) {
                readPending = true;
            }
            return;
        }

        // allow objects to be GC'ed.
        this.readBuffer = null;
        this.readBuffers = null;
        this.readAttachment = null;

        // allow another read to be initiated
        enableReading();

    } catch (Throwable x) {
        enableReading();
        if (x instanceof ClosedChannelException)
            x = new AsynchronousCloseException();
        exc = x;
    } finally {
        // restart poll in case of concurrent write
        if (!(exc instanceof AsynchronousCloseException))
            lockAndUpdateEvents();
        end();
    }

    // cancel the associated timer
    if (timeout != null)
        timeout.cancel(false);

    // create result
    Number result = (exc != null) ? null : (scattering) ? 
        (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
    
    // 无完成处理器,则设置结果;有则异步调用完成处理器
    if (handler == null) {
        future.setResult(result, exc);
    } else {
        if (mayInvokeDirect) {
            Invoker.invokeUnchecked(handler, att, result, exc);
        } else {
            Invoker.invokeIndirectly(this, handler, att, result, exc);
        }
    }
}

// 完成write操作,并异步调用完成处理器
private void finishWrite(boolean mayInvokeDirect) {
    int n = -1;
    Throwable exc = null;

    // copy fields as we can't access them after reading is re-enabled.
    boolean gathering = this.isGatheringWrite;
    CompletionHandler<Number,Object> handler = this.writeHandler;
    Object att = this.writeAttachment;
    PendingFuture<Number,Object> future = this.writeFuture;
    Future<?> timer = this.writeTimer;

    try {
        begin();

        if (gathering) {
            n = (int)IOUtil.write(fd, writeBuffers, nd);
        } else {
            n = IOUtil.write(fd, writeBuffer, -1, nd);
        }
        if (n == IOStatus.UNAVAILABLE) {
            // spurious wakeup, is this possible?
            synchronized (updateLock) {
                writePending = true;
            }
            return;
        }

        // allow objects to be GC'ed.
        this.writeBuffer = null;
        this.writeBuffers = null;
        this.writeAttachment = null;

        // allow another write to be initiated
        enableWriting();

    } catch (Throwable x) {
        enableWriting();
        if (x instanceof ClosedChannelException)
            x = new AsynchronousCloseException();
        exc = x;
    } finally {
        // restart poll in case of concurrent write
        if (!(exc instanceof AsynchronousCloseException))
            lockAndUpdateEvents();
        end();
    }

    // cancel the associated timer
    if (timer != null)
        timer.cancel(false);

    // create result
    Number result = (exc != null) ? null : (gathering) ?
        (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);

    // invoke handler or set result
    if (handler == null) {
        future.setResult(result, exc);
    } else {
        if (mayInvokeDirect) {
            Invoker.invokeUnchecked(handler, att, result, exc);
        } else {
            Invoker.invokeIndirectly(this, handler, att, result, exc);
        }
    }
}

// 完成connect操作,并异步调用完成处理器
private void finishConnect(boolean mayInvokeDirect) {
    Throwable e = null;
    try {
        begin();
        checkConnect(fdVal);
        setConnected();
    } catch (Throwable x) {
        if (x instanceof ClosedChannelException)
            x = new AsynchronousCloseException();
        e = x;
    } finally {
        end();
    }
    if (e != null) {
        // close channel if connection cannot be established
        try {
            close();
        } catch (Throwable suppressed) {
            e.addSuppressed(suppressed);
        }
    }

    // invoke handler and set result
    CompletionHandler<Void,Object> handler = connectHandler;
    Object att = connectAttachment;
    PendingFuture<Void,Object> future = connectFuture;
    if (handler == null) {
        future.setResult(null, e);
    } else {
        if (mayInvokeDirect) {
            Invoker.invokeUnchecked(handler, att, null, e);
        } else {
            Invoker.invokeIndirectly(this, handler, att, null, e);
        }
    }
}

线程池

AsynchronousChannelGroupImpl维护一个内部线程池

abstract class AsynchronousChannelGroupImpl 
    extends AsynchronousChannelGroup implements Executor {

    // 线程池
    private final ThreadPool pool;
    // 配合固定线程池时使用
    private final Queue<Runnable> taskQueue;
    
    // 通道完成IO操作后,将完成处理器包装成任务提交给线程池异步执行
    final void executeOnPooledThread(Runnable task) {
        // 固定线程池:任务排队,等待执行
        // 其它线程池:直接提交任务执行
        if (isFixedThreadPool()) {
            executeOnHandlerTask(task);
        } else {
            pool.executor().execute(bindToGroup(task));
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容