java 网络通信 NlO(non-blocking i/o 或者 new i/o) channel

  • channel 是双向的,同时读取和写入,流是单向的

  • channel 和多路复用器结合之后,有多种状态位,方便多路复用器(轮询)去识别(连接状态,阻塞状态,可读状态,可写状态)

  • channel 分为俩大类,网络读写的SelectableChannel,文件操作的FileChannel

  • 网络读写的SocketChannel和ServerSocketChannel是SelectableChannel的子类

  • SocketChannel和ServerSocketChannel 依赖于多路复用器(Selector),Selector是NIO编程的基础,提供选择已经就绪的任务的能力。简单来说,Selector会不断地轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作

  • 一个多路复用器可以负责成千上万个Channel通道,没有上限,这也是JDK使用了epoll代替了传统的select实现,获得连接句柄没有限制。这样意味着我们只要一个线程负责Selector的轮询,就可以接入成千上万个客户端,这是JDK NIO 的巨大进步。

  • Selector线程就类似一个管理者Master,管理成千上万个Channel,然后轮询哪个管道的数据已经准备好,通知CPU执行IO的读取或写入操作。

  • Selector模式:当IO事件(管道)注册到选择器后,Selector会分配给每个管道一个key值,相当于标签。Selector选择器是以轮询的方式进行查找注册所有的IO事件。当我们的IO事件(管道)准备就绪后,select就会识别,会通过key值找到相应的管道,进行相关的数据处理操作(从管道里读或写数据,写到我们的数据缓冲区Buffer去)

  • 每个管道都会对选择器进行注册到不同的事件状态,以便选择器查找:SelectionKey.OP_CONNECT SelectionKey.OP_ACCEPT SelectionKey.OP_READ SelectionKey.OP_WRITE

  • NIO、AIO学习历程

  • NIO入门

  • io与nio比较:

  1. io当客户端多时,会创建大量的处理线程。且每个线程都要占用栈空间和一些CPU时间
  2. io阻塞可能带来频繁的上下文切换,且大部分上下文切换可能是无意义的。
  3. NIO的本质是原始的tcp建立连接使用3次握手的操作,减少连接的开销
  4. IO是面向流(Stream)的,NIO是面向块(buffer)的
  5. Java NIO的selectors允许一条线程去监控多个channels的输入,你可以向一个selector上注册多个channel,然后调用selector的select()方法判断是否有新的连接进来或者已经在selector上注册时channel是否有数据进入。selector的机制让一个线程管理多个channel变得简单。(不再使用多线程处理连接)
  6. NIO允许你用一个单独的线程或几个线程管理很多个channels(网络的或者文件的),代价是程序的处理和处理IO相比更加复杂
ByteBuffer buffer = ByteBuffer.allocate(48);  

int bytesRead = inChannel.read(buffer);

ByteBuffer buffer = ByteBuffer.allocate(48);  
  
int bytesRead = inChannel.read(buffer);  
  
while(! bufferFull(bytesRead) ) {  
    bytesRead = inChannel.read(buffer);  
}  

注意第二行从channel中读取数据到ByteBuffer,当这个方法返回你不知道是否你需要的所有数据都被读到buffer了,你所知道的一切就是有一些数据被读到了buffer中,但是你并不知道具体有多少数据,这使程序的处理变得稍微有些困难
想象一下,调用了read(buffer)方法后,只有半行数据被读进了buffer,例如:“Name: An”,你能现在就处理数据吗?当然不能。你需要等待直到至少一整行数据被读到buffer中,在这之前确保程序不要处理buffer中的数据
你如何知道buffer中是否有足够的数据可以被处理呢?你不知道,唯一的方法就是检查buffer中的数据。可能你会进行几次无效的检查(检查了几次数据都不够进行处理),这会令程序设计变得比较混乱复杂
bufferFull方法负责检查有多少数据被读到了buffer中,根据返回值是true还是false来判断数据是否够进行处理。bufferFull方法扫描buffer但不能改变buffer的内部状态

  1. NIO允许你用一个单独的线程或几个线程管理很多个channels(网络的或者文件的),代价是程序的处理和处理IO相比更加复杂,如果你需要同时管理成千上万的连接,但是每个连接只发送少量数据,例如一个聊天服务器,用NIO实现会更好一些,相似的,如果你需要保持很多个到其他电脑的连接,例如P2P网络,用一个单独的线程来管理所有出口连接是比较合适的,如果你只有少量的连接但是每个连接都占有很高的带宽,同时发送很多数据,传统的IO会更适合
  • NIO实现步骤
    • 打开多路复用器
    • 打开服务器端通道
    • 设置服务器通道为非阻塞模式
    • 服务端通道绑定端口和地址
    • 把服务器通道注册到多路复用器上,并且监听阻塞事件
    • 多路复用器轮询监听
    • 返回多路复用器已经选择的SelectionKey结果集
    • 结果集进行遍历,遍历时移除SelectionKey元素,防止重复处理
    • 对selectionkey进行isValid()有效性校验
    • 如果selectionkey是连接状态的,用socketchannel.finishConnect();
    • 如果selectionkey是阻塞状态的,用socketchannel.register(seletor, SelectionKey.OP_READ)可读状态
    • 如果selectionkey是可读状态的,进行读取
    • 如果selectionkey是可写状态的,OP_WRITE比较特殊,表示本地的写缓冲区可用,一般只有在一次写没有把数据写完的情况下需要注册OP_WRITE,写完后要及时关闭,否则每次循环都有可能被调用,因为写缓冲区在大多数情况下是始终可用的。
  • 双向通信示例
    服务器端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class Server implements Runnable{
    //1 多路复用器(管理所有的通道)
    private Selector seletor;
    //2 建立缓冲区
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);
    //3 
    private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
    public Server(int port){
        try {
            //1 打开路复用器
            this.seletor = Selector.open();
            //2 打开服务器通道
            ServerSocketChannel ssc = ServerSocketChannel.open();
            //3 设置服务器通道为非阻塞模式
            ssc.configureBlocking(false);
            //4 绑定地址
            ssc.bind(new InetSocketAddress(port));
            //5 把服务器通道注册到多路复用器上,并且监听阻塞事件
            ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
            
            System.out.println("Server start, port :" + port);
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while(true){
            try {
                //1 必须要让多路复用器开始监听
                this.seletor.select();
                //2 返回多路复用器已经选择的结果集
                Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
                //3 进行遍历
                while(keys.hasNext()){
                    //4 获取一个选择的元素
                    SelectionKey key = keys.next();
                    //5 直接从容器中移除就可以了
                    keys.remove();
                    //6 如果是有效的
                    if(key.isValid()){
                        //7 如果为阻塞状态
                        if(key.isAcceptable()){
                            this.accept(key);
                        }
                        //8 如果为可读状态
                        if(key.isReadable()){
                            this.read(key);
                        }
                        //9 写数据
                        if(key.isWritable()){
                        }
                    }
                    
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    


    private void read(SelectionKey key) {
        try {
            //1 清空缓冲区旧的数据
            this.readBuf.clear();
            //2 获取之前注册的socket通道对象
            SocketChannel sc = (SocketChannel) key.channel();
            //3 读取数据
            int count = sc.read(this.readBuf);
            //4 如果没有数据
            if(count == -1){
                key.channel().close();
                key.cancel();
                return;
            }
            //5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
            this.readBuf.flip();
            //6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
            byte[] bytes = new byte[this.readBuf.remaining()];
            //7 接收缓冲区数据
            this.readBuf.get(bytes);
            //8 打印结果
            String body = new String(bytes).trim();
            System.out.println("收到客户端 : " + body);
            
            // 9..可以写回给客户端数据
            readBuf.flip();
            sc.write(readBuf);
            sc.register(this.seletor, SelectionKey.OP_READ);
            
        } catch (IOException e) {
            e.printStackTrace();
        }
        
    }

    private void accept(SelectionKey key) {
        try {
            //1 获取服务通道
            ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
            //2 执行阻塞方法
            SocketChannel sc = ssc.accept();
            //3 设置阻塞模式
            sc.configureBlocking(false);
            //4 注册到多路复用器上,并设置读取标识
            sc.register(this.seletor, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        
        new Thread(new Server(8765)).start();;
    }
    
    
}

客户端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Client {

    //需要一个Selector
    private Selector selector;
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);

    public Client(){
        try {
            // 获得一个Socket通道
            SocketChannel channel = SocketChannel.open();
            // 设置通道为非阻塞
            channel.configureBlocking(false);
            // 获得一个通道管理器
            this.selector = Selector.open();

            // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
            //用channel.finishConnect();才能完成连接
            channel.connect(new InetSocketAddress("127.0.0.1", 8765));
            //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。
            channel.register(selector, SelectionKey.OP_CONNECT);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    public void listen() throws IOException {
        // 轮询访问selector
        while (true) {
            selector.select();
            // 获得selector中选中的项的迭代器
            Iterator ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // 删除已选的key,以防重复处理
                ite.remove();
                // 连接事件发生
                if (key.isConnectable()) {
                    SocketChannel channel = (SocketChannel) key
                            .channel();
                    // 如果正在连接,则完成连接
                    if(channel.isConnectionPending()){
                        channel.finishConnect();

                    }
                    // 设置成非阻塞
                    channel.configureBlocking(false);

                    //在这里可以给服务端发送信息哦
                    channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes()));
                    //在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
                    channel.register(this.selector, SelectionKey.OP_READ);

                    // 获得了可读的事件
                } else if (key.isReadable()) {
                    read(key);
                }

            }

        }
    }
    private void read(SelectionKey key) {
        try {
            //1 清空缓冲区旧的数据
            this.readBuf.clear();
            //2 获取之前注册的socket通道对象
            SocketChannel sc = (SocketChannel) key.channel();
            //3 读取数据
            int count = sc.read(this.readBuf);
            //4 如果没有数据
            if(count == -1){
                key.channel().close();
                key.cancel();
                return;
            }
            //5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
            this.readBuf.flip();
            //6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
            byte[] bytes = new byte[this.readBuf.remaining()];
            //7 接收缓冲区数据
            this.readBuf.get(bytes);
            //8 打印结果
            String body = new String(bytes).trim();
            System.out.println("收到Server : " + body);

            // 9..可以写回给客户端数据
//          readBuf.flip();
//          sc.write(readBuf);

        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    public static void main(String[] args) {


        try {
            new Client().listen();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    
}

运行之后并不停止,服务器和客户端还在轮询

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,454评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,553评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,921评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,648评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,770评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,950评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,090评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,817评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,275评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,592评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,724评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,409评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,052评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,815评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,043评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,503评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,627评论 2 350

推荐阅读更多精彩内容