socket应用之多线程与NIO

by 等流星的牧羊人

在本次网络编程作业中,对HTTPServer一共采用了两种方案进行性能改进。

第一种是比较常规的多线程,第二种则是采用了NIO的多路复用模式。

多线程

在现有的HTTPServer中,一个很大的问题在于,它只有一个用户线程。当接受一个HTTPClient的请求,并进行处理的时候,由于响应设计I/O操作,需要一定响应时间。这期间,用户线程可以看作是阻塞的,无法响应新的Client提交过来的请求。

多线程可以解决这个问题,用户线程只要负责不断接受新来的请求,而对每个请求的处理,则是通过新启的子线程处理,这样就不会导致用户线程阻塞。从而服务器可以支持并发请求。

通过implements Runnable接口封装了一个响应请求的独立的线程类。

HTTPServer.java

package com.zaper.sea.river.socketwork;

import java.net.*;
/**
 * Created by Zaper Ocean on 2016/11/16.
 */
public class HTTPServer{
    public static void main(String args[]) {
        System.out.println(HTTPServer.class.getResource("me/a.json"));
        int port;
        ServerSocket serverSocket;

        try {
            port = Integer.parseInt(args[0]);
        }catch (Exception e) {
            System.out.println("port = 8080 (默认)");
            port = 8080; //默认端口为8080
        }

        try{
            serverSocket = new ServerSocket(port);
            System.out.println("服务器正在监听端口:" + serverSocket.getLocalPort());

            while(true) { //服务器在一个无限循环中不断接收来自客户的TCP连接请求
                try{
                    //等待客户的TCP连接请求
                    final Socket socket = serverSocket.accept();
                    System.out.println("建立了与客户的一个新的TCP连接,该客户的地址为:"+
                            socket.getInetAddress()+":" + socket.getPort());
                    //启动一个新线程响应客户请求
                    Thread myHTTPServer=new Thread(new HTTPServerRunnable(socket));
                    myHTTPServer.start();
                }catch(Exception e){e.printStackTrace();}
            } //#while
        }catch (Exception e) {e.printStackTrace();}
    }
}


HTTPServerRunnable.java

package com.zaper.sea.river.socketwork;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

/**
 * Created by Zaper Ocean on 2016/11/17.
 */
public class HTTPServerRunnable implements Runnable {
    private Socket socket = null;

    public HTTPServerRunnable(Socket socket){this.socket = socket;}

    public void run() {
        try {
            /*读取HTTP请求信息*/
            InputStream socketIn= null; //获得输入流
            socketIn = this.socket.getInputStream();
            Thread.sleep(500);  //睡眠500毫秒,等待HTTP请求
            int size=socketIn.available();
            byte[] requestBuffer=new byte[size];
            socketIn.read(requestBuffer);
            String request=new String(requestBuffer);
            System.out.println(request); //打印HTTP请求数据

            /*解析HTTP请求*/
            //获得HTTP请求的第一行
            String firstLineOfRequest=request.substring(0,request.indexOf("\r\n"));
            //解析HTTP请求的第一行
            String[] parts=firstLineOfRequest.split(" ");
            String uri=parts[1]; //获得HTTP请求中的uri

            /*决定HTTP响应正文的类型*/
            String contentType;
            if(uri.indexOf("html")!=-1 || uri.indexOf("htm")!=-1)
                contentType="text/html";
            else if(uri.indexOf("jpg")!=-1 || uri.indexOf("jpeg")!=-1)
                contentType="image/jpeg";
            else if(uri.indexOf("gif")!=-1)
                contentType="image/gif";
            else
                contentType="application/octet-stream";


            /*创建HTTP响应结果 */
            //HTTP响应的第一行
            String responseFirstLine="HTTP/1.1 200 OK\r\n";
            //HTTP响应头
            String responseHeader="Content-Type:"+contentType+"\r\n\r\n";
            //获得读取响应正文数据的输入流
            System.out.println(uri);
            System.out.println(HTTPServer.class.getResource("/root/"+uri));
            InputStream in=HTTPServer.class.getResourceAsStream("/root/"+uri);

            /*发送HTTP响应结果 */
            OutputStream socketOut=socket.getOutputStream(); //获得输出流
            //发送HTTP响应的第一行
            socketOut.write(responseFirstLine.getBytes());
            //发送HTTP响应的头
            socketOut.write(responseHeader.getBytes());
            //发送HTTP响应的正文
            int len=0;
            byte[] buffer=new byte[128];
            while((len=in.read(buffer))!=-1)
                socketOut.write(buffer,0,len);

            Thread.sleep(1000);  //睡眠1秒,等待客户接收HTTP响应结果
            socket.close(); //关闭TCP连接
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行结果

HTTPServer运行
HTTPClient运行

NIO

除了第一种常规的MultiThread方案,还采取了NIO方案。

在我们的HTTPServer中,需要进行的IO操作是非常多的。读写文件与读写socket都是,而众所周知,IO操作是最耗时间的操作。那么如何减少这部分时间呢?JDK在1.4之后为我们提供了一个新思路,NIO。

对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是kernel。当一个read操作发生时,它会经历两个阶段:

  1. 等待数据准备
  2. 将数据从内核拷贝到进程中

接着介绍一下IO的几种模型,最权威的总结来自Stevens的UNP(Unix Network Progamming),有以下五种:

  • blocking IO
  • nonblocking IO
  • IO multiplexing
  • signal driven IO
  • asynchronous IO

而无论原始的HTTPServer还是多线程方案都属于第一种BIO

NIO

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

所以,很明显BIO对性能的影响很明显。

回到第一种方案,我们之所以使用多线程,主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。其实这也是所有使用多线程的本质:

  • 利用多核。

  • 当I/O阻塞系统,但CPU空闲的时候,可以利用多线程使用CPU资源。

多线程模型
多线程模型

不过,这个多线程模型存在致命缺陷,那就是严重依赖于线程。而线程是很"贵"的资源,主要表现在:

  • 线程的创建和销毁成本很高,在Linux这样的操作系统中,线程本质上就是一个进程。创建和销毁都是重量级的系统函数。

  • 线程本身占用较大内存,像Java的线程栈,一般至少分配512K~1M的空间,如果系统中的线程数过千,恐怕整个JVM的内存都会被吃掉一半。

  • 线程的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统load偏高、CPU sy使用率特别高(超过20%以上),导致系统几乎陷入不可用的状态。

所以,当面对十万甚至百万级连接的时候,传统的BIO模型是无能为力的。随着移动端应用的兴起和各种网络游戏的盛行,百万级长连接日趋普遍,此时,必然需要一种更高效的I/O处理模型。

当连接数变多,就到了NIO发挥用处的时候。NIO其实就是 IO multiplexing 的一种实践,本质上也就是大家比较熟悉的select,epoll。

IO multiplexing

在NIO中,单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。 select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

NIO模型

BIO模型之所以需要多线程,是因为在进行I/O操作的时候,一是没有办法知道到底能不能写、能不能读,只能"傻等",即使通过各种估算,算出来操作系统没有能力进行读写,也没法在socket.read()和socket.write()函数中返回,这两个函数无法进行有效的中断。所以除了多开线程另起炉灶,没有好的办法利用CPU。

NIO的读写函数可以立刻返回,这就给了我们不开线程利用CPU的最好机会:如果一个连接不能读写(socket.read()返回0或者socket.write()返回0),我们可以把这件事记下来,记录的方式通常是在Selector上注册标记位,然后切换到其它就绪的连接(channel)继续进行读写。

下面具体看下如何利用事件模型单线程处理所有I/O请求:

NIO的主要事件有几个:读就绪、写就绪、有新连接到来。

我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。对于写操作,就是写不出去的时候对写事件感兴趣;对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。

其次,用一个死循环选择就绪的事件,会执行系统调用,还会阻塞的等待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连接到来。

注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的。所以我们可以放心大胆地在一个while(true)里面调用这个函数而不用担心CPU空转。

了解完NIO的multiplexing原理,然后还有几个概念。

Buffer ,是一个对象, 它包含一些要写入或者刚读出的数据.最常用的缓冲区类型是 ByteBuffer。常用状态变量包括 position,limit和capacity
Channel ,是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流。
所有数据都通过 Buffer 对象来处理。我们永远不会将字节直接写入通道中,相反,是将数据写入包含一个或者多个字节的缓冲区。同样,也不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。

最后就是代码了。

NioHTTPServer

package com.zaper.sea.river.socketwork.niosocket;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.logging.Logger;

/**
 * Created by Zaper Ocean on 2016/11/19.
 */
public class NioHTTPServer {
    private static final Logger log = Logger.getLogger(NioHTTPServer.class.getName());
    private Selector selector;

    public NioHTTPServer bindInet(String ip,int port) throws IOException {
        ServerSocketChannel serverChannel=ServerSocketChannel.open();
        /**
         *  与Selector一起使用时,Channel必须处于非阻塞模式下。
         */
        serverChannel.configureBlocking(false);
        serverChannel.socket().bind(new InetSocketAddress(ip,port));

        /**Opens a selector.
         *
         */
        selector=Selector.open();

        /**
         * Operation-set bit for socket-accept operations.
         *
         * <p> Suppose that a selection key's interest set contains
         * <tt>OP_ACCEPT</tt> at the start of a <a
         * href="Selector.html#selop">selection operation</a>.  If the selector
         * detects that the corresponding server-socket channel is ready to accept
         * another connection, or has an error pending, then it will add
         * <tt>OP_ACCEPT</tt> to the key's ready set and add the key to its
         * selected-key set.  </p>
         *
         * 通过Selector监听Channel时对连接感兴趣
         */
        serverChannel.register(selector,SelectionKey.OP_ACCEPT);
        return this;
    }

    public void polling() throws IOException {
        log.info("Nio HTTP Server stated polling :");
        while (true){
            if(selector.select()>0){
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                while (it.hasNext()){
                    SelectionKey sk=it.next();
                    if(sk.isAcceptable()){
                        log.info("Nio HTTP Server: SelectionKey is acceptable.");

                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)sk.channel();

                        /**
                         * Accepts a connection made to this channel's socket.
                         *
                         * <p> If this channel is in non-blocking mode then this method will
                         * immediately return <tt>null</tt> if there are no pending connections.
                         * Otherwise it will block indefinitely until a new connection is available
                         * or an I/O error occurs.
                         *
                         * 获得客户端连接通道
                         */
                        SocketChannel socketChannel = serverSocketChannel.accept();

                        log.info("Nio HTTP Server: accept client socket " + socketChannel);
                        socketChannel.configureBlocking(false);
                        socketChannel.register(sk.selector(), SelectionKey.OP_READ);
                    }
                    else if(sk.isReadable()){
                        log.info("Nio HTTP Server: SelectionKey is readable.");

                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        SocketChannel socketChannel = (SocketChannel)sk.channel();

                        /**SocketChannel.read()将数据从SocketChannel读到Buffer中。
                         * read()方法返回的int值表示读了多少字节进Buffer里。
                         * 如果返回的是-1,表示已经读到了流的末尾(连接关闭了)。
                         */
                        while(true) {
                            int readBytes = socketChannel.read(byteBuffer);
                            if(readBytes>0) {
                                log.info("Nio HTTP Server: readBytes = " + readBytes);
                                String request=new String(byteBuffer.array(), 0, readBytes);
                                log.info("Nio HTTP Server: data = \n" + request);
                                byteBuffer.flip();
                                socketChannel.write(getResponseBuffer(request));
                                //socketChannel.write(byteBuffer);
                                sk.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                break;
                            }
                        }
                        socketChannel.close();
                    }
                    else if(sk.isWritable()){
                        log.info("Nio HTTP Server: SelectionKey is writable.");

                        //获取与该信道关联的缓冲区,里面有之前读取到的数据
                        ByteBuffer buf = (ByteBuffer) sk.attachment();
                        //重置缓冲区,准备将数据写入信道
                        buf.flip();
                        SocketChannel clntChan = (SocketChannel) sk.channel();
                        //将数据写入到信道中
                        clntChan.write(buf);
                        if (!buf.hasRemaining()){
                            //如果缓冲区中的数据已经全部写入了信道,则将该信道感兴趣的操作设置为可读
                            sk.interestOps(SelectionKey.OP_READ);
                        }
                        //为读入更多的数据腾出空间
                        buf.compact();
                    }
                    /**
                     * 注意每次迭代末尾的keyIterator.remove()调用。
                     */
                    it.remove();

                }
            }
        }
    }

    public ByteBuffer getResponseBuffer(String request) throws IOException {
        //清空ByteBuffer
//        byteBuffer.clear();

        /*解析HTTP请求*/
        //获得HTTP请求的第一行
        String firstLineOfRequest=request.substring(0,request.indexOf("\r\n"));
        //解析HTTP请求的第一行
        String[] parts=firstLineOfRequest.split(" ");
        String uri=parts[1]; //获得HTTP请求中的uri

        /*决定HTTP响应正文的类型*/
        String contentType;
        if(uri.indexOf("html")!=-1 || uri.indexOf("htm")!=-1)
            contentType="text/html";
        else if(uri.indexOf("jpg")!=-1 || uri.indexOf("jpeg")!=-1)
            contentType="image/jpeg";
        else if(uri.indexOf("gif")!=-1)
            contentType="image/gif";
        else
            contentType="application/octet-stream";

        /*创建HTTP响应结果 */
        //HTTP响应的第一行
        String responseFirstLine="HTTP/1.1 200 OK\r\n";
        //HTTP响应头
        String responseHeader="Content-Type:"+contentType+"\r\n\r\n";
        //获得读取响应正文数据的输入流
        System.out.println(uri);
        System.out.println(NioHTTPServer.class.getResource("/root/"+uri));

        String path=NioHTTPServer.class.getResource("/root/"+uri).getPath();
        FileInputStream fis = new FileInputStream(path);
        Charset charset = Charset.forName("GBK");// 创建GBK字符集
        // 得到文件通道
        FileChannel fc = fis.getChannel();
        // 分配与文件尺寸等大的缓冲区
        ByteBuffer bf = ByteBuffer.allocate((int) fc.size());

        try {
            // 整个文件内容全读入缓冲区,即是内存映射文件
            fc.read(bf);
            System.out.println(bf.position());
            // 把缓冲中当前位置回复为零
            bf.rewind();
            System.out.println(bf.position());
            // 输出缓冲区中的内容
            System.out.println(charset.decode(bf));
        } catch (IOException e) {
            e.printStackTrace();
        }
        bf.flip();
        return bf;
    }

    public static void main(String[] args) throws IOException {
        new NioHTTPServer().bindInet("localhost", 8080).polling();
    }
}

运行结果

NioHTTPServer运行
HTTPClient运行
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容