1 I/O 基础
1.1 基本概念
同步与异步:
- 同步是一种可靠的有序运行机制,当我们进行同步操作时,后续的任务是等待当前调用返回,才会进行下一步;
- 而异步则相反,其他任务不需要等待当前调用返回,通常依靠事件、回调等机制来实现任务间次序关系。
阻塞与非阻塞:
- 在进行阻塞操作时,当前线程会处于阻塞状态,无法从事其他任务,只有当条件就绪才能继续,比如 ServerSocket 新连接建立完毕,或数据读取、写入操作完成;
- 而非阻塞则是不管 IO 操作是否结束,直接返回,相应操作在后台继续处理。
1.2 Linux网络 I/O 模型简介
1.2.1 阻塞 I/O
阻塞I/O模型是常见的I/O模型,在读写数据时客户端会发生阻塞。阻塞I/O模型的工作流程为:在用户线程发出I/O请求之后,内核会检查数据是否就绪,此时用户线程一直阻塞等待内存数据就绪;在内存数据就绪后,内核将数据复制到用户线程中,并返回I/O执行结果到用户线程,此时用户线程将解除阻塞状态并开始处理数据。典型的阻塞I/O模型的例子为data = socket.read(),如果内核数据没有就绪,Socket线程就会一直阻塞在read()中等待内核数据就绪
1.2.2 非阻塞 I/O
非阻塞I/O模型指用户线程在发起一个I/O操作后,无须阻塞便可以马上得到内核返回的一个结果。如果内核返回的结果为false,则表示内核数据还没准备好,需要稍后再发起I/O操作。一旦内核中的数据准备好了,并且再次收到用户线程的请求,内核就会立刻将数据复制到用户线程中并将复制的结果通知用户线程。
在非阻塞I/O模型中,用户线程需要不断询问内核数据是否就绪,在内存数据还未就绪时,用户线程可以处理其他任务,在内核数据就绪后可立即获取数据并进行相应的操作。典型的非阻塞I/O模型一般如下:
while(true){
data = socket.read();
if(data == true){
//内核数据就绪,获取并处理内核数据
doSomething();
break;
}else{
//内核数据未就绪,用户线程处理其他任务
}
}
1.2.3 多路复用 I/O
多路复用I/O模型只需一个线程就可以管理多个Socket(阻塞I/O模型和非阻塞 1/O模型需要为每个Socket都建立一个单独的线程处理该Socket上的数据),并且在真正有Socket读写事件时才会使用操作系统的I/O资源,大大节约了系统资源。
非阻塞I/O模型在每个用户线程中都进行Socket状态检查,而在多路复用I/O模型中是在系统内核中进行Socket状态检查的,这也是多路复用I/O模型比非阻塞I/O模型效率高的原因。
多路复用I/O模型通过在一个Selector线程上以轮询方式检测在多个Socket上是否有事件到达,并逐个进行事件处理和响应。因此,对于多路复用I/O模型来说,在事件响应体(消息体)很大时,Selector线程就会成为性能瓶颈,导致后续的事件迟迟得不到处理,影响下一轮的事件轮询。在实际应用中,在多路复用方法体内一般不建议做复杂逻辑运算,只做数据的接收和转发,将具体的业务操作转发给后面的业务线程处理。
1.2.4 信号驱动 I/O
在信号驱动I/O模型中,在用户线程发起一个I/O请求操作时,系统会为该请求对应的Socket注册一个信号函数,然后用户线程可以继续执行其他业务逻辑;在内核数据就绪时,系统会发送一个信号到用户线程,用户线程在接收到该信号后,会在信号函数中调用对应的I/O读写操作完成实际的I/O请求操作。
1.2.5 异步 I/0
在异步I/O模型中,用户线程会发起一个asynchronous read操作到内核,内核在接收到synchronous read请求后会立刻返回一个状态,来说明请求是否成功发起,在此过程中用户线程不会发生任何阻塞。接着,内核会等待数据准备完成并将数据复制到用户线程中,在数据复制完成后内核会发送一个信号到用户线程,通知用户线程asynchronous读操作已完成。在异步I/O模型中,用户线程不需要关心整个I/O操作是如何进行的,只需发起一个请求,在接收到内核返回的成功或失败信号时说明I/O操作已经完成,直接使用数据即可。
在信号驱动模型中,用户线程接收到信号便表示数据已经就绪,需要用户线程调用I/O函数进行实际的I/O读写操作,将数据读取到用户线程;而在异步I/O模型中,用户线程接收到信号便表示I/O操作已经完成(数据已经被复制到用户线程),用户可以开始使用该数据了。
2 Java的 I/O演进
在 JDK1.4 推出 Java NIO之前,Java的 I/O 类库都非常原始,很多 UNIX 网络编程中的概念或者接口在 I/O类库中都没有体现,例如Pipe、Channel、Buffer等。基于Java 的所有Socket 通信都采用了同步阻塞模式,在性能和可靠性方面却存在着巨大的瓶颈,因此在很长的一段事件里,大型的应用服务器都采用 C 或者 C++ 语言开发,因为它们可以直接使用操作系统提供的异步 I/O 能力。
2002年发布JDK1.4时,新增了java.nio包,提供了很多进行异步 I/O 开发的 API 类库,主要的接口如下:
- 进行异步 I/O 操作的缓冲区 ByteBuffer 等;
- 进行异步 I/O 操作的管道 Pipe;
- 进行各种 I/O 操作的 Channel,包括 ServerSocketChannel 和 SocketChannel;
- 多种字符集的编码和解码能力;
- 实现多路复用的 selector ;
- 基于 Perl 实现的正则表达式类库;
- 文件通道 FileChannel 。
新的 NIO 类库的提供,极大地促进了基于 Java 地异步非阻塞编程地发展和应用,但是依然有不完善地地方,特别是对文件系统地处理能力仍显不足,主要问题如下:
- 没有统一地文件属性;
- API 能力比较弱,例如目录地级联创建和递归遍历,往往需要自己实现;
- 底层存储系统的一些高级 API 无法使用;
- 所有文件操作都是同步阻塞调用,不支持异步文件读写操作。
2011年 JDK1.7 正式发布,它的一个比较大的亮点就是将原来的 NIO 类库进行了升级,被称为 NIO2.0,主要提供了如下三个方面的升级:
- 提供能够批量获取文件属性的 API ,还提供了标准文件系统的 SPI ;
- 提供 AIO 功能,支持基于文件的异步 I/O 操作和针对网络 Socket 的异步操作;
- 完成通道功能,包括对配置和多数据报的支持等。
3 Java 网络编程
3.1 传统的 BIO 编程
采用 BIO 通信模型的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接,它接收到客户端连接请求之后,为每一个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁,是典型的一请求一应答通信模型。如图所示:
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,由于线程是 Java虚拟机非常宝贵的系统资源,线程数膨胀之后,系统性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者僵死,不能对外提供服务。
下面展示采用 BIO 通信模型的代码实现
服务端监听8080端口,如果没有客户端接入,main 方法阻塞在 serverSocket.accept() 这一行,直到有客户端发起连接。
public class BioServer {
private static final int PORT = 8080;
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(PORT);
System.out.println("The Time Server starts on port:"+PORT);
Socket accept = null;
while (true){
accept = serverSocket.accept();
new Thread(new TimeServerHandler(accept)).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if( serverSocket != null){
System.out.println("The Time Server is closed");
serverSocket.close();
}
}
}
}
当有客户端发起连接请求之后,新建一个线程,并传入 TimeServerHandler 对象,TimeServerHandler 是一个 Runnable,实现了处理客户端请求的逻辑,在run 方法里面循环读取输入流中的数据,并在 finally 代码块中释放各种资源:
public class TimeServerHandler implements Runnable{
private Socket socket;
public TimeServerHandler(Socket socket){
this.socket = socket;
}
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(),true);
String body = null;
while( (body = in.readLine()) != null && body.length() != 0){
System.out.println("The Time Server receive msg:"+body);
out.println(new Date().toString());
}
}catch (Exception e){
e.printStackTrace();
} finally {
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e){
e.printStackTrace();
}
}
if(this.socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端通过 socket 创建,向服务器发送"I am client",随后读取服务端响应,关闭连接,释放资源。
public class BioClient {
private static final int PORT = 8080;
public static void main(String[] args) {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket("127.0.0.1",PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(),true);
out.println("I am client");
String response = in.readLine();
System.out.println("当前服务器时间是:"+response);
}catch (Exception e){
}finally {
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(out != null){
try {
out.close();
} catch (Exception e){
e.printStackTrace();
}
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
Server 运行结果:
The Time Server starts on port:8080
The Time Server receive msg:I am client
Client 运行结果:
当前服务器时间是:Sun Jan 23 16:41:05 CST 2022
3.2 伪异步 I/O 编程
3.2.1 伪异步 I/O实现
采用线程池和任务队列可以实现一种叫做伪异步的 I/O 通信框架,它的模型如图所示:
当有新的客户端接入时,将客户端的 Socket 封装为一个 Task,投递到后端的线程池中进行处理,JDK的线程池维护一个消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。
伪异步服务端代码:
public class TimeServer {
private static final int PORT = 8080;
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(PORT);
System.out.println("The Time Server starts on port:"+PORT);
Socket accept;
TimeServerHandlerExecutePool executePool = new TimeServerHandlerExecutePool(8, 1000);
while (true){
accept = serverSocket.accept();
executePool.execute(new TimeServerHandler(accept));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if( serverSocket != null){
System.out.println("The Time Server is closed");
serverSocket.close();
}
}
}
}
与传统 BIO 的区别就是,首先创建一个处理客户端请求的线程池,接收到任务之后,调用线程池的 execute方法执行。
public class TimeServerHandlerExecutePool {
private ExecutorService executorService;
public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize){
executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
maxPoolSize,120L, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task){
executorService.execute(task);
}
}
3.2.2 伪异步 I/O 弊端分析
1 InputStream读取数据
下面是InputStream中read方法部分注释:
/**
* Reads some number of bytes from the input stream and stores them into
* the buffer array <code>b</code>. The number of bytes actually read is
* returned as an integer. This method blocks until input data is
* available, end of file is detected, or an exception is thrown.
*......
* @return the total number of bytes read into the buffer, or
* <code>-1</code> if there is no more data because the end of
* the stream has been reached.
* @exception IOException If the first byte cannot be read for any reason
* other than the end of the file, if the input stream has been closed, or
* if some other I/O error occurs.
* @exception NullPointerException if <code>b</code> is <code>null</code>.
* @see java.io.InputStream#read(byte[], int, int)
*/
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}
当对 Socket 的输入流进行读取操作的时候,它会一直阻塞下去,直到发生下面三件事情:
- 有数据可读
- 可用数据已经读取完毕
- 发生空指针或 I/O 异常
这意味着当对方发送请求或者应答消息比较缓慢,或者网络传输较慢时,读取输入流一方的通信线程将会被长时间阻塞,其他接入消息只能在消息队列中排队。
2 OutputStream 写出数据
下面是 OutputStream 中 write 方法:
/**
* Writes <code>b.length</code> bytes from the specified byte array
* to this output stream. The general contract for <code>write(b)</code>
* is that it should have exactly the same effect as the call
* <code>write(b, 0, b.length)</code>.
*
* @param b the data.
* @exception IOException if an I/O error occurs.
* @see java.io.OutputStream#write(byte[], int, int)
*/
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}
当调用OutputStream的write方法写出数据的时候,将会被阻塞,直到所有要发送的字节全部写出完毕,或者发生异常。
通过对输入和输出流的API进行分析,我们了解到读和写操作都是同步阻塞的,阻塞的时间取决于对方 I/O 线程的处理速度和网络传输速度。伪异步 I/O实际上仅仅是对之前 I/O 模型的一个简单优化,无法从根本上解决同步 I/O 导致的通信线程阻塞问题。下面就简单分析下阻塞引其的级联故障:
- 服务端某操作处理缓慢,返回应答消息耗费 60 s,平时只需要 10 ms。
- 采用伪异步 I/O 的线程将被同步阻塞60 s。
- 假如所有的可用线程都被该操作阻塞,后续所有消息都在队列中排队。
- 线程池将根据淘汰策略拒绝或抛弃相应任务。
- 客户端发生大量连接超时。
3.3 NIO 编程
在介绍 NIO 编程之前,需要先澄清 NIO 到底是什么:有人称之为 New I/O ,原因在于它相对于之前的 I/O 类库是新增的;由于之前老的 I/O 类库是阻塞 I/O,新的类库目标是让 Java 支持非阻塞 I/O ,所以更多人喜欢称之为非阻塞 I/O(Non-block I/O)。
3.3.1 NIO 类库简介
-
缓冲区 Buffer
Buffer 是一个对象,它包含一些要写入和写出的数据。在面向流的 I/O 中,可以将数据直接写入或者将数据直接读到 Stream 中。在 NIO 库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中;写入数据时,写入到缓冲区中。
缓冲区实质上是一个数组,通常它是一个字节数组(ByteBuffer),也可以使用其他种类的数组。但是它不仅仅是一个数组,还提供了维护读写位置以及对数据的结构化访问等信息。
最常用的缓冲区是 ByteBuffer,对于每一种 Java 基本类型(除了 Boolean)都对应有一种缓冲区,具体如下:
- ByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
-
通道 Channel
Channel 是一个通道,它就像自来水管一样,网络数据通过 Channel 读取和写入。通道与流的不同之处在于通道是双向的,流只是在一个方向上移动。
因为 Channel 是全双工的,所以它可以比流更好的映射底层操作系统的 API。特别是 UNIX 网络编程模型中,底层操作系统的通道都是全双工的,同时支持读写操作。
Channel 可以分为两大类:用于网络读写的 SelectableChannel 和用于文件操作的 FileChannel。
-
多路复用器 Selector
多路复用器 selector 是 Java NIO 编程的基础,Seletor 会不断地轮询注册在其上地 Channel,如果某个 Channel 上面发生读或者写事件,这个 Channel 就处于就绪状态,会被 Selector 轮询出来,然后通过 SelectionKey 可以获取就绪 Channel 的集合,进行后续的 I/O 操作。
由于 JDK 使用了 epoll() 代替传统的 select 实现,所以一个Selector 可以接入成千上万的客户端,这是一个巨大的进步。
3.3.2 NIO 示例
示例相比 BIO 要复杂一些,代码说明已经写到注释里面了。
TimeServer:
public class TimeServer {
public static void main(String[] args) {
int port = 8080;
MultiplexerTimeServer multiplexerTimeServer = new MultiplexerTimeServer(port);
new Thread(multiplexerTimeServer).start();
}
}
MultiplexerTimeServer:
package com.ljessie.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
/**
* @author Zhao Bowen
* @version 1.0
* @date 2022/2/8 22:42
*/
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel channel;
private volatile boolean stopFlag;
public MultiplexerTimeServer(int port){
try{
selector = Selector.open();
channel = ServerSocketChannel.open();
//设置为异步非阻塞模式
channel.configureBlocking(false);
//绑定端口,并设置连接队列的最大长度为1024
channel.socket().bind(new InetSocketAddress(port),1024);
//将 channel 注册到 selector 上,并设置操作位 为监听SOCKET
channel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server starts on port:" + port);
} catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
this.stopFlag = true;
}
@Override
public void run() {
while(!stopFlag){
try {
//无论是否有读写等事件发生,selector 每隔1s被唤醒一次
//也可以使用无参的 select 方法,当有处于就绪状态的Channel时,selector返回该channel的SelectionKey集合
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey key;
while (iterator.hasNext()){
key = iterator.next();
iterator.remove();
try {
handleInput(key);
}catch (Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
} catch ( Throwable t){
t.printStackTrace();
}
}
//selector 关闭后,所有注册在上面的 channel 都会自动关闭,所以不需要重复释放资源
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if(key.isValid()){
//处理新接入的请求消息
if(key.isAcceptable()){
//接收新的连接
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//接收客户端请求,并完成三次握手
SocketChannel sc = ssc.accept();
//设置异步非阻塞
sc.configureBlocking(false);
//把新的连接注册到 selector 上
sc.register(selector,SelectionKey.OP_READ);
}
//读取数据
if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
//开辟一个1M的缓冲区,
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//读取请求码流
int readBytes = sc.read(readBuffer);
//read是非阻塞的,使用返回值判断读取到的字节数
if(readBytes > 0){
//flip()的作用是将缓冲区当前limit设置为position,position设置为0,用于后续读取缓冲区
readBuffer.flip();
//数组大小为,缓冲区可读的字节个数
byte[] bytes = new byte[readBuffer.remaining()];
//缓冲区可读的字节数组,复制到bytes中
readBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
System.out.println("The time server receive order: " + body);
String currentTime = "Query Time Order".equalsIgnoreCase(body) ?
new Date(System.currentTimeMillis()).toString():"BAD ORDER";
//异步发送消息给客户端
doWrite(sc,currentTime);
}else if ( readBytes < 0){
//对端链路关闭
key.cancel();
sc.close();
}else{
//没有读取到字节,属于正常情况,忽略。
}
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException {
if(response != null && response.trim().length() > 0){
byte[] bytes = response.getBytes();
//根据相应的字节数组大小,创建缓冲区
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
writeBuffer.flip();
//将缓冲区的字节数组发送出去,此处是异步非阻塞的,可能出现“写半包”情况
channel.write(writeBuffer);
}
}
}
TimeClient:
public class TimeClient {
public static void main(String[] args) {
new Thread(new TimeClientHandle("127.0.0.1",8080)).start();
}
}
TimeClientHandle:
package com.ljessie.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @author Zhao Bowen
* @version 1.0
* @date 2022/2/9 22:13
*/
public class TimeClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandle(String host, int port){
this.host = host == null ? "127.0.0.1": host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
//发送连接请求
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
while (!stop){
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
SelectionKey key;
while (iterator.hasNext()){
key = iterator.next();
iterator.remove();
try{
handleInput(key);
} catch (Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if(key.isValid()){
//处理新接入的请求消息
SocketChannel sc = (SocketChannel) key.channel();
//selectionKey处于连接状态
if(key.isConnectable()){
//判断是否连接成功
if(sc.finishConnect()){
sc.register(selector,SelectionKey.OP_READ);
doWrite(sc);
}else{
//连接失败,进程退出
System.exit(1);
}
}
//读取数据
if(key.isReadable()){
//开辟一个1M的缓冲区,
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//读取请求码流
int readBytes = sc.read(readBuffer);
//read是非阻塞的,使用返回值判断读取到的字节数
if(readBytes > 0){
//flip()的作用是将缓冲区当前limit设置为position,position设置为0,用于后续读取缓冲区
readBuffer.flip();
//数组大小为,缓冲区可读的字节个数
byte[] bytes = new byte[readBuffer.remaining()];
//缓冲区可读的字节数组,复制到bytes中
readBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
System.out.println("The body from server: " + body);
this.stop = true;
}else if ( readBytes < 0){
//对端链路关闭
key.cancel();
sc.close();
}else{
//没有读取到字节,属于正常情况,忽略。
}
}
}
}
private void doConnect() throws IOException {
//如果直接连接成功,则注册到 selector 上,发送请求消息,读应答
if(socketChannel.connect(new InetSocketAddress(host,port))){
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else{
//如果没有连接成功,则说明服务端没有返回TCP握手应答消息,并不代表连接失败
//在selector上注册CONNEXT,当服务端返回syn-ack消息后,selector就能轮询到这个channel
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel sc) throws IOException {
//这些操作和服务端类似,不再赘述
byte[] req = "Query Time Order".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if(!writeBuffer.hasRemaining()){
System.out.println("Send order 2 server succeed.");
}
}
}
依次启动TimeServer和TimeClient:
TimeServer运行结果:
The time server starts on port:8080
The time server receive order: Query Time Order
TimeClient运行结果:
Send order 2 server succeed.
The body from server: Thu Feb 10 21:57:56 CST 2022
这个小程序并没有考虑“半包读”和“半包写”,如果加上这些,代码将会更加复杂,NIO既然这么复杂,为什么应用却越来越广泛呢,NIO编程的优点总结如下:
- 客户端的连接操作是异步的,不用像BIO那样被同步阻塞
- SocketChannel的读写操作都是异步的,如果没有可读写数据,不会同步等待
- JDK的Selector在Linux等主流操作系统上通过epoll实现,没有连接句柄的限制
3.4 AIO
JDK1.7升级了NIO类库,被称为NIO2.0,引人注目的是,Java正式提供了异步文件I/O操作,同时提供了与UNIX网络编程事件驱动I/O对应的AIO。
NIO2.0的异步socket是真正的异步非阻塞 I/O,对应于UNIX网络编程中的异步 I/O,不需要 Selector 对注册的channel进行轮询,即可实现异步读写,从而简化了 NIO的编程模型。
下面使用 NIO2.0 实现一个基础的网络通信示例,相关说明已经写到注释上面了。
TimeServer启动一个线程,来运行服务端程序
public class TimeServer {
public static void main(String[] args) {
//创建异步的 TimeServer 服务端
AsyncTimeServerHandler asyncTimeServerHandler = new AsyncTimeServerHandler(8080);
new Thread(asyncTimeServerHandler).start();
}
}
AsyncTimeServerHandler绑定端口,并接收连接请求。在接收连接请求时,新建一个连接成功的回调AcceptCompletionHandler
package com.ljessie.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
/**
* @author Zhao Bowen
* @version 1.0
* @date 2022/2/10 22:22
*/
public class AsyncTimeServerHandler implements Runnable {
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
AsyncTimeServerHandler(int port){
try {
//创建异步的服务端 channel
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
//绑定监听端口
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("The time server starts on port:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
//异步操作,需要传入一个CompletionHandler,在accept成功之后接收通知消息。
asynchronousServerSocketChannel.accept(this,new AcceptCompletionHandler());
try {
//防止服务端程序在完成上面accept之前退出
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
AcceptCompletionHandler连接回调,接收到请求之后,调用ReadCompletionHandler执行异步读操作
package com.ljessie.aio;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
/**
* @author Zhao Bowen
* @version 1.0
* @date 2022/2/10 22:27
*/
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {
@Override
public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
/**
* 其实走到这个方法,说明已经有一个客户端连接进来了
* 而一个asynchronousServerSocketChannel可以接收成千上万个客户端
* 调我们自己的AsyncTimeServerHandler的socketChannel继续监听客户端连接
* 最终形成一个循环
*/
attachment.asynchronousServerSocketChannel.accept(attachment,this);
//接收客户端请求
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//异步读操作,同样需要传入一个CompletionHandler,接收读操作的通知
result.read(byteBuffer,byteBuffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
attachment.latch.countDown();
}
}
ReadCompletionHandler异步读取客户端请求,并将响应写回给客户端。
package com.ljessie.aio;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Date;
/**
* @author Zhao Bowen
* @version 1.0
* @date 2022/2/10 22:36
*/
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel){
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
//flip()的作用是将缓冲区当前limit设置为position,position设置为0,用于后续读取缓冲区
attachment.flip();
//创建缓冲区大小的字节数组
byte[] body = new byte[attachment.remaining()];
//缓冲区可读的字节数组,复制到bytes中
attachment.get(body);
try{
String req = new String(body,"UTF-8");
System.out.println("The time server receives request:" + req);
String currentTime = "Query Time Order".equalsIgnoreCase(req) ?
new Date(System.currentTimeMillis()).toString():"BAD ORDER";
doWrite(currentTime);
}catch (UnsupportedEncodingException e){
e.printStackTrace();
}
}
private void doWrite(String currentTime){
if(!StringUtil.isNullOrEmpty(currentTime)){
byte[] bytes = currentTime.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
//如果没有发送完成,继续发送
if(attachment.hasRemaining()){
channel.write(attachment,attachment,this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
}catch (IOException e){
e.printStackTrace();
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
TimeClient启动客户端程序
public class TimeClient {
public static void main(String[] args) {
new Thread(new AsyncTimeClientHandler()).start();
}
}
AsyncTimeClientHandler建立连接,并且异步发送请求,异步读取服务端响应
package com.ljessie.aio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
/**
* @author Zhao Bowen
* @version 1.0
* @date 2022/2/13 14:52
*/
public class AsyncTimeClientHandler implements CompletionHandler<Void,AsyncTimeClientHandler>, Runnable {
private AsynchronousSocketChannel client;
private CountDownLatch latch;
AsyncTimeClientHandler(){
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//防止异步操作还没有执行完成,线程就退出
latch = new CountDownLatch(1);
//发起异步操作
client.connect(new InetSocketAddress("127.0.0.1",8080),this,this);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void completed(Void result, AsyncTimeClientHandler attachment) {
byte[] req = "Query Time Order".getBytes();
final ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
//异步写
client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if(attachment.hasRemaining()){
//尚未发送完成,继续发送
client.write(attachment,attachment,this);
}else{
//已经发送完成,异步读
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
String body;
try {
body = new String(bytes,"UTF-8");
System.out.println("The time from server is:" + body);
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
exc.printStackTrace();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
3.5 4种 I/O对比
相关概念
-
异步非阻塞I/O
JDK1.7提供的 NIO2.0新增了异步的Socket,是真正的异步I/O,在异步I/O操作的时候可以传递信号变量,当操作完成之后会回调相关方法,异步I/O也成为AIO。
-
多路复用 Selector
JDK1.4提供的 NIO只能被称为非阻塞 I/O,不能叫异步 I/O。在JDK1.4和1.5 update10之前,JDK的Selector基于 select/poll实现。JDK1.5 update10和Linux core2.6以上版本,Sun优化了Selector的实现,它在底层使用 epoll 替代了 select/poll,上层的 API并没有变化,可以认为是JDK NIO的一次性能优化,但是它仍旧没有变化 I/O的模型。
前面已经介绍过Java NIO的实现关键是多路复用技术,多路复用的核心就是通过Selector来轮询注册在其上的Channel,当发现某个或多个Channel处于就绪状态后,从阻塞状态返回就绪的Channel的SelectedKey集合,进行I/O操作。
-
伪异步 I/O
伪异步I/O的概念完全来源于实践,在JDK NIO编程没有流行之前,为了解决 Tomcat通信线程同步 I/O 导致业务线程被挂住的问题,在通信线程和业务线程之间做个缓冲区,这个缓冲区用于隔离 I/O 线程和业务线程间的直接访问,这样业务线程就不会被 I/O线程阻塞
I/O模型对比
同步阻塞I/O | 伪异步I/O | NIO | 异步I/O | |
---|---|---|---|---|
客户端个数:I/O线程 | 1:1 | M:N | M:1(1个selector处理多个channel) | M:0(不需要启动额外的I/O线程,基于事件回调) |
I/O类型(阻塞) | 阻塞I/O | 阻塞I/O | 非阻塞I/O | 非阻塞I/O |
I/O类型(同步) | 同步I/0 | 同步I/0 | 同步I/0 | 异步I/0 |
API使用难度 | 简单 | 简单 | 非常复杂 | 复杂 |
调试难度 | 简单 | 简单 | 复杂 | 复杂 |
可靠性 | 非常差 | 差 | 高 | 高 |
吞吐量 | 低 | 中 | 高 | 高 |
并不是所有的Java网络编程都必须选择NIO和Netty,具体选择什么样的 I/O模型或者 NIO 框架,完全基于业务的实际应用场景和性能诉求,如果客户端并发连接数不多,服务器的负载也不重,完全没有必要选择 NIO 做服务端。
4 选择Netty的理由
开发出高质量的 NIO 程序并不是一件简单的事情,从可维护性角度看,由于 NIO 采用一个 I/O线程处理多条链路,它的调试和追踪非常麻烦,特别是生产环境中的问题,往往定位难度很大。
不选择 Java 原生 NIO 编程的原因:
- NIO类库和API繁杂,使用麻烦。
- 需要较好的Java 多线程基础
- 维护程序的可靠性,工作量和难度都非常大
- JDK NIO 的 BUG,例如 epoll会导致 Selector 空轮询,最终导致 CPU 100%。
选择 Netty 的原因:
- API 使用简单,开发门槛低
- 功能强大
- 定制能力强
- 性能高
- 成熟、稳定,已经修复了 JDK NIO 的 BUG
- 社区活跃
- 经历过大规模商业应用的考验