NIO-异步IO

异步I/O

Github Demo

连网是学习异步 I/O 的很好基础,而异步 I/O 对于在 Java 语言中执行任何输入/输出过程的人来说,无疑都是必须具备的知识。NIO 中的连网与 NIO 中的其他任何操作没有什么不同 ― 它依赖通道和缓冲区,而您通常使用 InputStream 和 OutputStream 来获得通道。

异步 I/O 是一种没有阻塞地读写数据的方法。通常在代码进行 read() 调用时,代码会阻塞直至有可供读取的数据。同样 write() 调用将会阻塞直至数据能够写入。

异步 I/O 调用不会阻塞。相反,将注册特定 I/O 事件 ― 可读的数据的到达、新的套接字连接,等等,而在发生这样的事件时,会收到系统通知。

异步 I/O 的一个优势在于,它允许同时根据大量的输入和输出执行 I/O。同步程序常常要求助于轮询,或者创建许许多多的线程以处理大量的连接。使用异步 I/O,可以监听任何数量的通道上的事件,不用轮询,也不用额外的线程。


Selectors

异步 I/O 中的核心对象名为 Selector。Selector 就是注册各种 I/O 事件的地方,而且当那些事件发生时,就是这个对象告诉我们所发生的事件。
所以,我们需要做的第一件事就是创建一个 Selector:

Selector selector = Selector.open();

然后,我们将对不同的通道对象调用 register() 方法,以便注册我们对这些对象中发生的 I/O 事件。register() 的第一个参数总是这个 Selector。

ServerSocketChannel

为了接收连接,我们需要一个 ServerSocketChannel。事实上,我们要监听的每一个端口都需要有一个 ServerSocketChannel 。对于每一个端口,我们打开一个 ServerSocketChannel,如下所示:

ServerSocketChannel ssc = ServerSocketChannel.open();  // 创建一个新的 ServerSocketChannel
ssc.configureBlocking( false ); // 将 ServerSocketChannel 设置为 非阻塞的

ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress( ports[i] );
ss.bind( address ); // 将它绑定到给定的端口

我们必须对每一个要使用的套接字通道调用这个方法,否则异步 I/O 就不能工作。

SelectionKey

下一步是将新打开的 ServerSocketChannels 注册到 Selector上。为此我们使用 ServerSocketChannel.register() 方法,如下所示:

SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );

register() 的第一个参数总是这个 Selector。第二个参数是 OP_ACCEPT,这里它指定我们想要监听 accept 事件,也就是在新的连接建立时所发生的事件。这是适用于 ServerSocketChannel 的唯一事件类型。
请注意对 register() 的调用的返回值。 SelectionKey 代表这个通道在此 Selector 上的这个注册。当某个 Selector 通知您某个传入事件时,它是通过提供对应于该事件的 SelectionKey 来进行的。SelectionKey 还可以用于取消通道的注册。

接下来 内部循环

现在已经注册了一些我们感兴趣的 I/O 事件,下面将进入主循环。使用 Selectors 的几乎每个程序都像下面这样使用内部循环:

int num = selector.select(); // 这个方法会阻塞,直到至少有一个已注册的事件发生

Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();

while (it.hasNext()) {
     SelectionKey key = (SelectionKey)it.next();
     // ... deal with I/O event ...
}
  1. 首先,我们调用 Selector 的 select() 方法。这个方法会阻塞,直到至少有一个已注册的事件发生。当一个或者更多的事件发生时, select() 方法将返回所发生的事件的数量。
  2. 接下来,我们调用 Selector 的 selectedKeys() 方法,它返回发生了事件的 SelectionKey 对象的集合 。
  3. 我们通过迭代 SelectionKeys 并依次处理每个 SelectionKey 来处理事件。对于每一个 SelectionKey,您必须确定发生的是什么 I/O 事件,以及这个事件影响哪些 I/O 对象。

监听新连接

程序执行到这里,我们仅注册了 ServerSocketChannel,并且仅注册它们“接收”事件。为确认这一点,我们对 SelectionKey 调用 readyOps() 方法,并检查发生了什么类型的事件:

if ((key.readyOps() & SelectionKey.OP_ACCEPT)
     == SelectionKey.OP_ACCEPT) {

     // Accept the new connection
     // ...
}

可以肯定地说, readOps() 方法告诉我们该事件是新的连接。

接受新的连接

因为我们知道这个服务器套接字上有一个传入连接在等待,所以可以安全地接受它;也就是说,不用担心 accept() 操作会阻塞:

ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();

下一步是将新连接的 SocketChannel 配置为非阻塞的。而且由于接受这个连接的目的是为了读取来自套接字的数据,所以我们还必须将 SocketChannel 注册到 Selector上,如下所示:

sc.configureBlocking( false );
SelectionKey newKey = sc.register( selector, SelectionKey.OP_READ );

注意我们使用 register() 的 OP_READ 参数,将 SocketChannel 注册用于 读取 而不是 接受 新连接。

删除处理过的 SelectionKey

在处理 SelectionKey 之后,我们几乎可以返回主循环了。但是我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。我们调用迭代器的 remove() 方法来删除处理过的 SelectionKey:

it.remove();

现在我们可以返回主循环并接受从一个套接字中传入的数据(或者一个传入的 I/O 事件)了。

传入的 I/O

当来自一个套接字的数据到达时,它会触发一个 I/O 事件。这会导致在主循环中调用 Selector.select(),并返回一个或者多个 I/O 事件。这一次, SelectionKey 将被标记为 OP_READ 事件,如下所示:

} else if ((key.readyOps() & SelectionKey.OP_READ)
     == SelectionKey.OP_READ) {
     // Read the data
     SocketChannel sc = (SocketChannel)key.channel();
     // ...
}

与以前一样,我们取得发生 I/O 事件的通道并处理它。

回到主循环

每次返回主循环,我们都要调用 select 的 Selector()方法,并取得一组 SelectionKey。每个键代表一个 I/O 事件。我们处理事件,从选定的键集中删除 SelectionKey,然后返回主循环的顶部。

这个程序有点过于简单,因为它的目的只是展示异步 I/O 所涉及的技术。在现实的应用程序中,您需要通过将通道从 Selector 中删除来处理关闭的通道。而且您可能要使用多个线程。这个程序可以仅使用一个线程,因为它只是一个演示,但是在现实场景中,创建一个线程池来负责 I/O 事件处理中的耗时部分会更有意义。


下面时我们的EchoServer

首先是Echo协议:

What is 'Echo Protocol' ?

Network Working Group                                          J. Postel
Request for Comments: 862                                            ISI
                                                                May 1983



                             Echo Protocol




This RFC specifies a standard for the ARPA Internet community.  Hosts on
the ARPA Internet that choose to implement an Echo Protocol are expected
to adopt and implement this standard.

A very useful debugging and measurement tool is an echo service.  An
echo service simply sends back to the originating source any data it
receives.

TCP Based Echo Service

   One echo service is defined as a connection based application on TCP.
   A server listens for TCP connections on TCP port 7.  Once a
   connection is established any data received is sent back.  This
   continues until the calling user terminates the connection.

UDP Based Echo Service

   Another echo service is defined as a datagram based application on
   UDP.  A server listens for UDP datagrams on UDP port 7.  When a
   datagram is received, the data from it is sent back in an answering
   datagram.

Echo Server

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by dongsj on 16/11/25.
 *
 * Echo Server
 */
public class EchoServer {
    final int[] ports = new int[]{8080, 8181, 8282};


    private void start() {
        try {
            //
            Selector selector = Selector.open();

            //
            for (int i=0; i< ports.length; i++) {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.configureBlocking(false);

                ServerSocket ss = ssc.socket();
                InetSocketAddress address = new InetSocketAddress(ports[i]);
                ss.bind(address);

                SelectionKey selectionKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
            }

            while (true) {
                //
                int num = selector.select();

                Set<SelectionKey> set = selector.selectedKeys();
                Iterator<SelectionKey> iterator = set.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();

                    if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                        SocketChannel sc = ssc.accept();

                        sc.configureBlocking(false);
                        SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);

                        iterator.remove();

                        System.out.println(">>> ACCEPT");
                    } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                        System.out.println(">>> Start Read Data...");

                        //FileOutputStream fout = new FileOutputStream("/Users/dongsj/workspace/dsj/javaSpace/nettyDemo/src/test/resources/nio/writeshow.log", true);
                        //FileChannel fileChannel = fout.getChannel();

                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(10);
                        int result = -1;
                        do {
                            result = sc.read(buffer);
                            buffer.flip();

                            // fileChannel.write(buffer);
                            sc.write(buffer);

                            buffer.clear();
                        } while (-1 == result);



                        System.out.println(">>> End Read Data");
                    }
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        new EchoServer().start();
    }
}

测试:

  • 执行上面的main方法
    这个服务监听了8080/8181/8282三个端口
  • telnet localhost 8080/8181/8282
结果
结果

异步IO有没有让你想到哪个设计模式呢,- -!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,454评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,553评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,921评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,648评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,770评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,950评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,090评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,817评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,275评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,592评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,724评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,409评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,052评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,815评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,043评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,503评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,627评论 2 350

推荐阅读更多精彩内容