1、选择器基础
1.1、选择器、可选择通道、选择键类
选择器(Selector):
选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。当这么做的时候,可以选择将被激发的线程挂起,直到有就绪的的通道。
可选择通道(SelectableChannel):
这个抽象类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。FileChannel对象不是可选择的,因为它们没有继承SelectableChannel。所有socket通道都是可选择的,包括从管道(Pipe)对象的中获得的通道。SelectableChannel可以被注册到Selector对象上,同时可以指定对那个选择器而言,那种操作是感兴趣的。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。
选择键(SelectionKey):
选择键封装了特定的通道与特定的选择器的注册关系。选择键对象被SelectableChannel.register( ) 返回并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数的形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。
public abstract class SelectableChannel extends AbstractChannel implements Channel {
// This is a partial API listing
public abstract SelectionKey register (Selector sel, int ops) throws ClosedChannelException;
public abstract SelectionKey register (Selector sel, int ops, Object att) throws ClosedChannelException;
public abstract boolean isRegistered( );
public abstract int validOps( );
public abstract void configureBlocking (boolean block) throws IOException;
public abstract boolean isBlocking( );
public abstract Object blockingLock( );
}
就绪选择相关类的关系:
调用可选择通道的register( )方法会将它注册到一个选择器上。如果您试图注册一个处于阻塞状态的通道,register( )将抛出未检查的IllegalBlockingModeException异常。此外,通道一旦被注册,就不能回到阻塞状态。试图这么做的话,将在调用configureBlocking( )方法时将抛出IllegalBlockingModeException异常。并且,理所当然地,试图注册一个已经关闭的SelectableChannel实例的话,也将抛出ClosedChannelException异常,就像方法原型指示的那样。
selector的API:
public abstract class Selector {
public static Selector open( ) throws IOException
public abstract boolean isOpen( );
public abstract void close( ) throws IOException;
public abstract SelectionProvider provider( );
public abstract int select( ) throws IOException;
public abstract int select (long timeout) throws IOException;
public abstract int selectNow( ) throws IOException;
public abstract void wakeup( );
public abstract Set keys( );
public abstract Set selectedKeys( );
}
尽管SelectableChannel类上定义了register( )方法,还是应该将通道注册到选择器上,而不是另一种方式。选择器维护了一个需要监控的通道的集合。一个给定的通道可以被注册到多于一个的选择器上,而且不需要知道它被注册了那个Selector对象上。将register( )放在SelectableChannel上而不是Selector上,这种做法看起来有点随意。它将返回一个封装了两个对象的关系的选择键对象。重要的是要记住选择器对象控制了被注册到它之上的通道的选择过程。
public abstract class SelectionKey {
public static final int OP_READ
public static final int OP_WRITE
public static final int OP_CONNECT
public static final int OP_ACCEPT
public abstract SelectableChannel channel( );
public abstract Selector selector( );
public abstract void cancel( );
public abstract boolean isValid( );
public abstract int interestOps( );
public abstract void interestOps (int ops);
public abstract int readyOps( );
public final boolean isReadable( )
public final boolean isWritable( )
public final boolean isConnectable( )
public final boolean isAcceptable( )
public final Object attach (Object ob)
public final Object attachment( )
}
对于键的interest(感兴趣的操作)集合和ready(已经准备好的操作)集合的解释是和特定的通道相关的。每个通道的实现,将定义它自己的选择键类。在register( )方法中构造它并将它传递给所提供的选择器对象。
1.2、建立选择器
为了建立监控三个Socket通道的选择器,您需要做像这样的事情:
Selector selector = Selector.open( );
channel1.register (selector, SelectionKey.OP_READ);
channel2.register (selector, SelectionKey.OP_WRITE);
channel3.register (selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// Wait up to 10 seconds for a channel to become ready
readyCount = selector.select (10000);
这些代码创建了一个新的选择器,然后将这三个(已经存在的)socket通道注册到选择器上,而且感兴趣的操作各不相同。select( )方法在将线程置于睡眠状态,直到这些刚兴趣的事情中的操作中的一个发生或者10秒钟的时间过去。
Selector对象是通过调用静态工厂方法open( )来实例化的。选择器不是像通道或流(stream)那样的基本I/O对象:数据从来没有通过它们进行传递。类方法open( )向SPI发出请求,通过默认的SelectorProvider对象获取一个新的实例。通过调用一个自定义的SelectorProvider对象的openSelector( )方法来创建一个Selector实例也是可行的。您可以通过调用provider( )方法来决定由哪个SelectorProvider对象来创建给定的Selector实例。大多数情况下,您不需要关心SPI;只需要调用open( )方法来创建新的Selector对象。
继续关于将Select作为I/O对象进行处理的话题的探讨:当您不再使用它时,需要调用close( )方法来释放它可能占用的资源并将所有相关的选择键设置为无效。一旦一个选择器被关闭,试图调用它的大多数方法都将导致ClosedSelectorException。注意ClosedSelectorException是一个非检查(运行时的)错误。您可以通过isOpen( )方法来测试一个选择器是否处于被打开的状态。
并非所有的操作都在所有的可选择通道上被支持。例如,SocketChannel不支持accept。试图注册不支持的操作将导致IllegalArgumentException。您可以通过调用validOps( )方法来获取特定的通道所支持的操作集合。
选择器包含了注册到它们之上的通道的集合。在任意给定的时间里,对于一个给定的选择器和一个给定的通道而言,只有一种注册关系是有效的。但是,将一个通道注册到多于一个的选择器上允许的。这么做的话,在更新interest集合为指定的值的同时,将返回与之前相同的选择键。实际上,后续的注册都只是简单地将与之前的注册关系相关的键进行更新。
任何一个通道和选择器的注册关系都被封装在一个SelectionKey对象中。keyFor( )方法将返回与该通道和指定的选择器相关的键。如果通道被注册到指定的选择器上,那么相关的键将被返回。如果它们之间没有注册关系,那么将返回null。
2、 使用选择键
SelectionKey类的API:
public abstract class SelectionKey {
public static final int OP_READ
public static final int OP_WRITE
public static final int OP_CONNECT
public static final int OP_ACCEPT
public abstract SelectableChannel channel( );
public abstract Selector selector( );
public abstract void cancel( );
public abstract boolean isValid( );
public abstract int interestOps( );
public abstract void interestOps (int ops);
public abstract int readyOps( );
public final boolean isReadable( )
public final boolean isWritable( )
public final boolean isConnectable( )
public final boolean isAcceptable( )
public final Object attach (Object ob)
public final Object attachment( )
}
一个键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。您可以看到前两个方法中反映了这种关系。channel( )方法返回与该键相关的SelectableChannel对象,而selector( )则返回相关的Selector对象。
当应该终结这种关系的时候,可以调用SelectionKey对象的cancel( )方法。可以通过调用isValid( )方法来检查它是否仍然表示一种有效的关系。当键被取消时,它将被放在相关的选择器的已取消的键的集合里。注册不会立即被取消,但键会立即失效。当再次调用select( )方法时(或者一个正在进行的select()调用结束时),已取消的键的集合中的被取消的键将被清理掉,并且相应的注销也将完成。通道会被注销,而新的SelectionKey将被返回。
当通道关闭时,所有相关的键会自动取消(记住,一个通道可以被注册到多个选择器上)。当选择器关闭时,所有被注册到该选择器的通道都将被注销,并且相关的键将立即被无效化(取消)。一旦键被无效化,调用它的与选择相关的方法就将抛出CancelledKeyException。
一个SelectionKey对象包含两个以整数形式进行编码的比特掩码:一个用于指示那些通道/选择器组合体所关心的操作(instrest集合),另一个表示通道准备好要执行的操作(ready集合)。当前的interest集合可以通过调用键对象的interestOps( )方法来获取。最初,这应该是通道被注册时传进来的值。这个interset集合永远不会被选择器改变,但您可以通过调用interestOps( )方法并传入一个新的比特掩码参数来改变它。interest集合也可以通过将通道注册到选择器上来改变(实际上使用一种迂回的方式调用interestOps( ))。当相关的Selector上的select( )操作正在进行时改变键的interest集合,不会影响那个正在进行的选择操作。所有更改将会在select( )的下一个调用中体现出来。
可以通过调用键的readyOps( )方法来获取相关的通道的已经就绪的操作。ready集合是interest集合的子集,并且表示了interest集合中从上次调用select( )以来已经就绪的那些操作。
SelectionKey类定义了四个便于使用的布尔方法来为您测试这些比特值:isReadable( ),isWritable( ),isConnectable( ), 和isAcceptable( )。每一个方法都与使用特定掩码来测试readyOps( )方法的结果的效果相同。
需要注意的是,通过相关的选择键的readyOps( )方法返回的就绪状态指示只是一个提示,不是保证。底层的通道在任何时候都会不断改变。其他线程可能在通道上执行操作并影响它的就绪状态。同时,操作系统的特点也总是需要考虑的。
public abstract class SelectionKey {
// This is a partial API listing
public final Object attach (Object ob)
public final Object attachment( )
}
这两个方法允许您在键上放置一个“附件”,并在后面获取它。这是一种允许您将任意对象与键关联的便捷的方法。这个对象可以引用任何对您而言有意义的对象,例如业务对象、会话句柄、其他通道等等。这将允许您遍历与选择器相关的键,使用附加在上面的对象句柄作为引用来获取相关的上下文。
attach( )方法将在键对象中保存所提供的对象的引用。SelectionKey类除了保存它之外,不会将它用于任何其他用途。任何一个之前保存在键中的附件引用都会被替换。可以使用null值来清除附件。可以通过调用attachment( )方法来获取与键关联的附件句柄。
3、使用选择器
3.1、选择过程
每一个Selector对象维护三个键的集合:
public abstract class Selector {
// This is a partial API listing
public abstract Set keys( );
public abstract Set selectedKeys( );
public abstract int select( ) throws IOException;
public abstract int select (long timeout) throws IOException;
public abstract int selectNow( ) throws IOException;
public abstract void wakeup( );
}
已注册的键的集合(Registered key set):
与选择器关联的已经注册的键的集合。并不是所有注册过的键都仍然有效。这个集合通过keys( )方法返回,并且可能是空的。这个已注册的键的集合不是可以直接修改的;试图这么做的话将引java.lang.UnsupportedOperationException。
已选择的键的集合(Selected key set):
这个集合的每个成员都是相关的通道被选择器(在前一个选择操作中)判断为已经准备好的,并且包含于键的interest集合中的操作。这个集合通过selectedKeys( )方法返回(并有可能是空的)。
不要将已选择的键的集合与ready集合弄混了。这是一个键的集合,每个键都关联一个已经准备好至少一种操作的通道。每个键都有一个内嵌的ready集合,指示了所关联的通道已经准备好的操作。
键可以直接从这个集合中移除,但不能添加。试图向已选择的键的集合中添加元素将抛出java.lang.UnsupportedOperationException。
已取消的键的集合(Cancelled key set)
已注册的键的集合的子集,这个集合包含了cancel( )方法被调用过的键(这个键已经被无效化),但它们还没有被注销。这个集合是选择器对象的私有成员,因而无法直接访问。
Selector类的核心是选择过程。这个名词您已经在之前看过多次了——现在应该解释一下了。基本上来说,选择器是对select( )、poll( )等本地调用(native call)或者类似的操作系统特定的系统调用的一个包装。但是Selector所作的不仅仅是简单地向本地代码传送参数。它对每个选择操作应用了特定的过程。对这个过程的理解是合理地管理键和它们所表示的状态信息的基础。
选择操作是当三种形式的select( )中的任意一种被调用时,由选择器执行的。不管是哪一种形式的调用,下面步骤将被执行:
(1)已取消的键的集合将会被检查。如果它是非空的,每个已取消的键的集合中的键将从另外两个集合中移除,并且相关的通道将被注销。这个步骤结束后,已取消的键的集合将是空的。
(2)已注册的键的集合中的键的interest集合将被检查。在这个步骤中的检查执行过后,对interest集合的改动不会影响剩余的检查过程。
一旦就绪条件被定下来,底层操作系统将会进行查询,以确定每个通道所关心的操作的真实就绪状态。依赖于特定的select( )方法调用,如果没有通道已经准备好,线程可能会在这时阻塞,通常会有一个超时值。
直到系统调用完成为止,这个过程可能会使得调用线程睡眠一段时间,然后当前每个通道的就绪状态将确定下来。对于那些还没准备好的通道将不会执行任何的操作。对于那些操作系统指示至少已经准备好interest集合中的一种操作的通道,将执行以下两种操作中的一种:
(a)如果通道的键还没有处于已选择的键的集合中,那么键的ready集合将被清空,然后表示操作系统发现的当前通道已经准备好的操作的比特掩码将被设置。
(b)否则,也就是键在已选择的键的集合中。键的ready集合将被表示操作系统发现的当前已经准备好的操作的比特掩码更新。所有之前的已经不再是就绪状态的操作不会被清除。事实上,所有的比特位都不会被清理。由操作系统决定的ready集合是与之前的ready集合按位分离的,一旦键被放置于选择器的已选择的键的集合中,它的ready集合将是累积的。比特位只会被设置,不会被清理。
(3)步骤2可能会花费很长时间,特别是所激发的线程处于休眠状态时。与该选择器相关的键可能会同时被取消。当步骤2结束时,步骤1将重新执行,以完成任意一个在选择进行的过程中,键已经被取消的通道的注销。
(4)select操作返回的值是ready集合在步骤2中被修改的键的数量,而不是已选择的键的集合中的通道的总数。返回值不是已准备好的通道的总数,而是从上一个select( )调用之后进入就绪状态的通道的数量。之前的调用中就绪的,并且在本次调用中仍然就绪的通道不会被计入,而那些在前一次调用中已经就绪但已经不再处于就绪状态的通道也不会被计入。这些通道可能仍然在已选择的键的集合中,但不会被计入返回值中。返回值可能是0。
使用内部的已取消的键的集合来延迟注销,是一种防止线程在取消键时阻塞,并防止与正在进行的选择操作冲突的优化。注销通道是一个潜在的代价很高的操作,这可能需要重新分配资源(请记住,键是与通道相关的,并且可能与它们相关的通道对象之间有复杂的交互)。清理已取消的键,并在选择操作之前和之后立即注销通道,可以消除它们可能正好在选择的过程中执行的潜在棘手问题。这是另一个兼顾健壮性的折中方案。
3.2、停止选择过程
Selector的API中的最后一个方法,wakeup( ),提供了使线程从被阻塞的select( )方法中优雅地退出的能力:
public abstract class Selector {
// This is a partial API listing
public abstract void wakeup( );
}
有三种方式可以唤醒在select( )方法中睡眠的线程:
调用wakeup( ):
调用Selector对象的wakeup( )方法将使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有在进行中的选择,那么下一次对select( )方法的一种形式的调用将立即返回。后续的选择操作将正常进行。在选择操作之间多次调用wakeup( )方法与调用它一次没有什么不同。
调用close( ):
如果选择器的close( )方法被调用,那么任何一个在选择操作中阻塞的线程都将被唤醒,就像wakeup( )方法被调用了一样。与选择器相关的通道将被注销,而键将被取消。
调用interrupt( ):
如果睡眠中的线程的interrupt( )方法被调用,它的返回状态将被设置。如果被唤醒的线程之后将试图在通道上执行I/O操作,通道将立即关闭,然后线程将捕捉到一个异常。这是由于在第三章中已经探讨过的通道的中断语义。使用wakeup( )方法将会优雅地将一个在select( )方法中睡眠的线程唤醒。如果您想让一个睡眠的线程在直接中断之后继续执行,需要执行一些步骤来清理中断状态。
Selector对象将捕捉InterruptedException异常并调用wakeup( )方法。请注意这些方法中的任意一个都不会关闭任何一个相关的通道。中断一个选择器与中断一个通道是不一样的(参见3.3节)。选择器不会改变任意一个相关的通道,它只会检查它们的状态。当一个在select( )方法中睡眠的线程中断时,对于通道的状态而言,是不会产生歧义的。
3.3、管理选择键
合理地使用选择器的秘诀是理解选择器维护的选择键集合所扮演的角色。(参见4.3.1小节,特别是选择过程的第二步。)最重要的部分是当键已经不再在已选择的键的集合中时将会发生什么。当通道上的至少一个感兴趣的操作就绪时,键的ready集合就会被清空,并且当前已经就绪的操作将会被添加到ready集合中。该键之后将被添加到已选择的键的集合中。
清理一个SelectKey的ready集合的方式是将这个键从已选择的键的集合中移除。选择键的就绪状态只有在选择器对象在选择操作过程中才会修改。处理思想是只有在已选择的键的集合中的键才被认为是包含了合法的就绪信息的。这些信息将在键中长久地存在,直到键从已选择的键的集合中移除,以通知选择器您已经看到并对它进行了处理。如果下一次通道的一些感兴趣的操作发生时,键将被重新设置以反映当时通道的状态并再次被添加到已选择的键的集合中。
这种框架提供了很多灵活性。通常的做法是在选择器上调用一次select操作(这将更新已选择的键的集合),然后遍历selectKeys( )方法返回的键的集合。在按顺序进行检查每个键的过程中,相关的通道也根据键的就绪集合进行处理。然后键将从已选择的键的集合中被移除(通过在Iterator对象上调用remove( )方法),然后检查下一个键。完成后,通过再次调用select( )方法重复这个循环。
代码示例:
public class NioEchoServer {
private static final int SELECTOR_TIMEOUT = 1000;
public void start(int port){
try {
//打开服务socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//打开selector
Selector selector = Selector.open();
//服务监听port端口,配置为非阻塞模式
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
//将channel注册到selector中,并监听accept事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
long cnt = 1;
while (true){
if(selector.select(SELECTOR_TIMEOUT) == 0){
System.out.println("wait " + (cnt ++) + "s");
continue;
}
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
keyIterator.remove();
try {
if(key.isAcceptable()){
SocketChannel client = ((ServerSocketChannel)key.channel()).accept();
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("connect accept!!");
}
if(key.isReadable()){
SocketChannel client = (SocketChannel)key.channel();
ByteBuffer buffer = (ByteBuffer)key.attachment();
buffer.compact();
int count = client.read(buffer);
if(count <= -1){
client.close();
}else if(count > 0){
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
System.out.println("read count=" + count);
System.out.println("read data= " + new String(buffer.array()));
}
}
if(key.isValid() && key.isWritable()){
ByteBuffer buffer = (ByteBuffer)key.attachment();
buffer.flip();
((SocketChannel)key.channel()).write(buffer);
if(!buffer.hasRemaining()){
key.interestOps(SelectionKey.OP_READ);
}
buffer.compact();
}
}catch (Exception e){
((SocketChannel)key.channel()).close();
e.printStackTrace();
}
}
}
}catch (Exception e){
e.printStackTrace();
}
}
public static void main(String[] args){
try {
new NioEchoServer().start(9999);
}catch (Exception e){
e.printStackTrace();
}
}
}
3.4、并发性
选择器对象是线程安全的,但它们包含的键集合不是。通过keys( )和selectKeys( )返回的键的集合是Selector对象内部的私有的Set对象集合的直接引用。这些集合可能在任意时间被改变。已注册的键的集合是只读的。如果您试图修改它,那么您得到的奖品将是一个java.lang.UnsupportedOperationException,但是当您在观察它们的时候,它们可能发生了改变的话,您仍然会遇到麻烦。Iterator对象是快速失败的(fail-fast):如果底层的Set被改变了,它们将会抛出java.util.ConcurrentModificationException,因此如果您期望在多个线程间共享选择器和/或键,请对此做好准备。您可以直接修改选择键,但请注意您这么做时可能会彻底破坏另一个线程的Iterator。
如果在多个线程并发地访问一个选择器的键的集合的时候存在任何问题,您可以采取一些步骤来合理地同步访问。在执行选择操作时,选择器在Selector对象上进行同步,然后是已注册的键的集合,最后是已选择的键的集合,按照这样的顺序。已取消的键的集合也在选择过程的的第1步和第3步之间保持同步
在多线程的场景中,如果您需要对任何一个键的集合进行更改,不管是直接更改还是其他操作带来的副作用,您都需要首先以相同的顺序,在同一对象上进行同步。锁的过程是非常重要的。如果竞争的线程没有以相同的顺序请求锁,就将会有死锁的潜在隐患。如果您可以确保否其他线程不会同时访问选择器,那么就不必要进行同步了。
Selector类的close( )方法与slect( )方法的同步方式是一样的,因此也有一直阻塞的可能性。在选择过程还在进行的过程中,所有对close( )的调用都会被阻塞,直到选择过程结束,或者执行选择的线程进入睡眠。在后面的情况下,执行选择的线程将会在执行关闭的线程获得锁是立即被唤醒,并关闭选择器。
4、异步关闭功能
关闭通道的过程不应该是一个耗时的操作。NIO的设计者们特别想要阻止这样的可能性:一个线程在关闭一个处于选择操作中的通道时,被阻塞于无限期的等待。当一个通道关闭时,它相关的键也就都被取消了。这并不会影响正在进行的select( ),但这意味着在您调用select( )之前仍然是有效的键,在返回时可能会变为无效。您总是可以使用由选择器的selectKeys( )方法返回的已选择的键的集合:请不要自己维护键的集合。
如果您试图使用一个已经失效的键,大多数方法将抛出CancelledKeyException。但是,您可以安全地从从已取消的键中获取通道的句柄。如果通道已经关闭时,仍然试图使用它的话,在大多数情况下将引发ClosedChannelException。
5、选择过程的可扩展性
如果您想要将更多的线程来为通道提供服务,请抵抗住使用多个选择器的欲望。在大量通道上执行就绪选择并不会有很大的开销,大多数工作是由底层操作系统完成的。管理多个选择器并随机地将通道分派给它们当中的一个并不是这个问题的合理的解决方案。这只会形成这个场景的一个更小的版本。
一个更好的策略是对所有的可选择通道使用一个选择器,并将对就绪通道的服务委托给其他线程。您只用一个线程监控通道的就绪状态并使用一个协调好的工作线程池来处理共接收到的数据。根据部署的条件,线程池的大小是可以调整的(或者它自己进行动态的调整)。对可选择通道的管理仍然是简单的,而简单的就是好的。
某些通道要求比其他通道更高的响应速度,可以通过使用两个选择器来解决:一个为命令连接服务,另一个为普通连接服务。但这种场景也可以使用与第一个场景十分相似的办法来解决。与将所有准备好的通道放到同一个线程池的做法不同,通道可以根据功能由不同的工作线程来处理。它们可能可以是日志线程池,命令/控制线程池,状态请求线程池,等等。
相关阅读:
Reactor和Preactor模型 【//www.greatytc.com/p/b4de9b85c79d】
Channel【//www.greatytc.com/p/eb9d23113dfa】
Buffer【//www.greatytc.com/p/49d20a7547f6】
IO相关基本概念【//www.greatytc.com/p/177021e33428】