异步I/O
连网是学习异步 I/O 的很好基础,而异步 I/O 对于在 Java 语言中执行任何输入/输出过程的人来说,无疑都是必须具备的知识。NIO 中的连网与 NIO 中的其他任何操作没有什么不同 ― 它依赖通道和缓冲区,而您通常使用 InputStream 和 OutputStream 来获得通道。
异步 I/O 是一种没有阻塞地读写数据的方法。通常在代码进行 read() 调用时,代码会阻塞直至有可供读取的数据。同样 write() 调用将会阻塞直至数据能够写入。
异步 I/O 调用不会阻塞。相反,将注册特定 I/O 事件 ― 可读的数据的到达、新的套接字连接,等等,而在发生这样的事件时,会收到系统通知。
异步 I/O 的一个优势在于,它允许同时根据大量的输入和输出执行 I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步 I/O,可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。
Selectors
异步 I/O 中的核心对象名为 Selector。Selector 就是注册各种 I/O 事件的地方,而且当那些事件发生时,就是这个对象告诉我们所发生的事件。
所以,我们需要做的第一件事就是创建一个 Selector:
Selector selector = Selector.open();
然后,我们将对不同的通道对象调用 register()
方法,以便注册我们对这些对象中发生的 I/O 事件。register() 的第一个参数总是这个 Selector。
ServerSocketChannel
为了接收连接,我们需要一个 ServerSocketChannel。事实上,我们要监听的每一个端口都需要有一个 ServerSocketChannel 。对于每一个端口,我们打开一个 ServerSocketChannel,如下所示:
ServerSocketChannel ssc = ServerSocketChannel.open(); // 创建一个新的 ServerSocketChannel
ssc.configureBlocking( false ); // 将 ServerSocketChannel 设置为 非阻塞的
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress( ports[i] );
ss.bind( address ); // 将它绑定到给定的端口
我们必须对每一个要使用的套接字通道调用这个方法,否则异步 I/O 就不能工作。
SelectionKey
下一步是将新打开的 ServerSocketChannels 注册到 Selector上。为此我们使用 ServerSocketChannel.register() 方法,如下所示:
SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );
register() 的第一个参数总是这个 Selector。第二个参数是 OP_ACCEPT,这里它指定我们想要监听 accept 事件,也就是在新的连接建立时所发生的事件。这是适用于 ServerSocketChannel 的唯一事件类型。
请注意对 register() 的调用的返回值。 SelectionKey 代表这个通道在此 Selector 上的这个注册。当某个 Selector 通知您某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。
接下来 内部循环
现在已经注册了一些我们感兴趣的 I/O 事件,下面将进入主循环。使用 Selectors 的几乎每个程序都像下面这样使用内部循环:
int num = selector.select(); // 这个方法会阻塞,直到至少有一个已注册的事件发生
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey)it.next();
// ... deal with I/O event ...
}
- 首先,我们调用 Selector 的 select() 方法。这个方法会阻塞,直到至少有一个已注册的事件发生。当一个或者更多的事件发生时, select() 方法将返回所发生的事件的数量。
- 接下来,我们调用 Selector 的 selectedKeys() 方法,它返回发生了事件的 SelectionKey 对象的集合 。
- 我们通过迭代 SelectionKeys 并依次处理每个 SelectionKey 来处理事件。对于每一个 SelectionKey,您必须确定发生的是什么 I/O 事件,以及这个事件影响哪些 I/O 对象。
监听新连接
程序执行到这里,我们仅注册了 ServerSocketChannel,并且仅注册它们“接收”事件。为确认这一点,我们对 SelectionKey 调用 readyOps() 方法,并检查发生了什么类型的事件:
if ((key.readyOps() & SelectionKey.OP_ACCEPT)
== SelectionKey.OP_ACCEPT) {
// Accept the new connection
// ...
}
可以肯定地说, readOps() 方法告诉我们该事件是新的连接。
接受新的连接
因为我们知道这个服务器套接字上有一个传入连接在等待,所以可以安全地接受它;也就是说,不用担心 accept() 操作会阻塞:
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();
下一步是将新连接的 SocketChannel 配置为非阻塞的。而且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将 SocketChannel 注册到 Selector上,如下所示:
sc.configureBlocking( false );
SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );
注意我们使用 register() 的 OP_READ 参数,将 SocketChannel 注册用于 读取 而不是 接受 新连接。
删除处理过的 SelectionKey
在处理 SelectionKey 之后,我们几乎可以返回主循环了。但是我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。我们调用迭代器的 remove() 方法来删除处理过的 SelectionKey:
it.remove();
现在我们可以返回主循环并接受从一个套接字中传入的数据(或者一个传入的 I/O 事件)了。
传入的 I/O
当来自一个套接字的数据到达时,它会触发一个 I/O 事件。这会导致在主循环中调用 Selector.select(),并返回一个或者多个 I/O 事件。这一次, SelectionKey 将被标记为 OP_READ 事件,如下所示:
} else if ((key.readyOps() & SelectionKey.OP_READ)
== SelectionKey.OP_READ) {
// Read the data
SocketChannel sc = (SocketChannel)key.channel();
// ...
}
与以前一样,我们取得发生 I/O 事件的通道并处理它。
回到主循环
每次返回主循环,我们都要调用 select 的 Selector()方法,并取得一组 SelectionKey。每个键代表一个 I/O 事件。我们处理事件,从选定的键集中删除 SelectionKey,然后返回主循环的顶部。
这个程序有点过于简单,因为它的目的只是展示异步 I/O 所涉及的技术。在现实的应用程序中,您需要通过将通道从 Selector 中删除来处理关闭的通道。而且您可能要使用多个线程。这个程序可以仅使用一个线程,因为它只是一个演示,但是在现实场景中,创建一个线程池来负责 I/O 事件处理中的耗时部分会更有意义。
下面时我们的EchoServer
首先是Echo协议:
What is 'Echo Protocol' ?
Network Working Group J. Postel
Request for Comments: 862 ISI
May 1983
Echo Protocol
This RFC specifies a standard for the ARPA Internet community. Hosts on
the ARPA Internet that choose to implement an Echo Protocol are expected
to adopt and implement this standard.
A very useful debugging and measurement tool is an echo service. An
echo service simply sends back to the originating source any data it
receives.
TCP Based Echo Service
One echo service is defined as a connection based application on TCP.
A server listens for TCP connections on TCP port 7. Once a
connection is established any data received is sent back. This
continues until the calling user terminates the connection.
UDP Based Echo Service
Another echo service is defined as a datagram based application on
UDP. A server listens for UDP datagrams on UDP port 7. When a
datagram is received, the data from it is sent back in an answering
datagram.
Echo Server
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* Created by dongsj on 16/11/25.
*
* Echo Server
*/
public class EchoServer {
final int[] ports = new int[]{8080, 8181, 8282};
private void start() {
try {
//
Selector selector = Selector.open();
//
for (int i=0; i< ports.length; i++) {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress(ports[i]);
ss.bind(address);
SelectionKey selectionKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
}
while (true) {
//
int num = selector.select();
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> iterator = set.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
iterator.remove();
System.out.println(">>> ACCEPT");
} else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
System.out.println(">>> Start Read Data...");
//FileOutputStream fout = new FileOutputStream("/Users/dongsj/workspace/dsj/javaSpace/nettyDemo/src/test/resources/nio/writeshow.log", true);
//FileChannel fileChannel = fout.getChannel();
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(10);
int result = -1;
do {
result = sc.read(buffer);
buffer.flip();
// fileChannel.write(buffer);
sc.write(buffer);
buffer.clear();
} while (-1 == result);
System.out.println(">>> End Read Data");
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new EchoServer().start();
}
}
测试:
- 执行上面的main方法
这个服务监听了8080/8181/8282三个端口 - telnet localhost 8080/8181/8282
异步IO有没有让你想到哪个设计模式呢,- -!