NIO来源:
传统的io操作性能低,从jdk1.4开始引入nio概念, Nio顾名思义就是Non-Blocking IO,非阻塞型IO操作,与传统的java io操作一样,NIO也提供SocketChannel和ServerSocketChannel两种不同的套接字通道实现,这两种都支持阻塞和非阻塞两种模式。
与IO的区别:
标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似.
核心概念:
一、缓冲区Buffer
在传统的io操作中,请求是面向流的操作,数据直接写入到Stream或者程序直接从Stream中读取数据,在NIO操作中,所有数据都是用缓冲区处理,任何访问NIO的数据都是通过缓冲区进行。
缓冲区在NIO中抽象为Buffer类.
我们看下Buffer类的注释信息
/**
* A container for data of a specific primitive type.
*
* <p> A buffer is a linear, finite sequence of elements of a specific
* primitive type. Aside from its content, the essential properties of a
* buffer are its capacity, limit, and position: </p>
Buffer是一个存储数据的特殊原始类型,并且是线性、存储有限元素的数据结构(数组),Buffer核心的属性除了存储内容之外还有容量capacity,限制Limit,和当前位置position
Buffer是一个顶层抽象,最常用的API有以下几个:
1、Buffer clear()
清除buffer,将读写位置position置为0,将最大可存储数量limit置为最大容量capacity,一般在读写操作之前都需要调用改方法
2、Buffer flip()
翻转Buffer.意思大概是这样的:调换这个buffer的当前位置,并且设置当前位置是0。说的意思就是:将缓存字节数组的指针设置为数组的开始序列即数组下标0。这样就可以从buffer开头,对该buffer进行遍历(读取)了。 一般用于当读写操作后调用改方法进行写和读操作。
说白了就是当向buffer写入数据完毕,如果要从Buffer中读取数据,那么需要调用flip翻转成可读状态,否则读取的数据不正确
更详细的结束 大家请阅读这篇文章 Java nio 之 Buffer反转flip - 枯鸦专栏 - CSDN博客
由于传输的数据结构不同具体实现的Buffer也很多,其中最常用的就是ByteBuffer
下面通过一个简历的例子熟悉下ByteBuffer的常用的API
```
public static void main(String[] args) {
//分配88字节空间
ByteBuffer byteBuffer = ByteBuffer.allocate(88);
System.out.println("此时 limit = "+byteBuffer.limit()+" position="+byteBuffer.position()+" capacity="+byteBuffer.capacity());
String value ="Netty权威指南";
//向buffer写数据
byteBuffer.put(value.getBytes());
System.out.println("写入数据后 limit = "+byteBuffer.limit()+" position="+byteBuffer.position()+" capacity="+byteBuffer.capacity());
//翻转buffer
byteBuffer.flip();
System.out.println("翻转后 limit = "+byteBuffer.limit()+" position="+byteBuffer.position()+" capacity="+byteBuffer.capacity());
byte[] tbyte =new byte[17];
//从buffer读数据
byteBuffer.get(tbyte);
System.out.println("读后 limit = "+byteBuffer.limit()+" position="+byteBuffer.position()+" capacity="+byteBuffer.capacity());
}
```
执行结果:
二、通道Channel
Channel是一个通道,可以通过他读取或者写入数据,并且可以全双工同时读写操作,Channel的状态除了打开就是关闭,在NIO中Channel一般注册在多路复用器Selector中
主要有以下API
1、isOpen 通道是否关闭
2、close 关闭通道
Channel主要有两类,分别用于网络读写SelectableChannel和文件读写的FileChannel
三、多路复用器Selector
Selector提供选择已经就绪任务的能力,它会不断轮询注册在其上的Channel,如果某个Channel上面有新的Tcp连接,读写事件,这个Channel就处于就绪状态会被Selector选择出来,然后通过SelectionKey获取就绪Channel的集合后续进行对应IO操作。使用Selector的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。
常用API:
1、Selector open() 创建一个Selector
2、select() 返回准备好的channel个数,该方法会阻塞直到有Channel就绪
3、selectedKeys 返回就绪的selectedKeys
将Channel注册到Selector上:
serverChannel.configureBlocking(false);//开启非阻塞模式
serverChannel.socket().bind(new InetSocketAddress(port),1024);//绑定端口
serverChannel.register(selector, SelectionKey.OP_ACCEPT);//将channel 注册到selector,监听客户端连接ACCEPT请求
其中register第二个参数是一个事件的集合。
可以监听四种不同类型的事件:Connect、Accept、Read、Write,通过SelectKey封装实现
示例:
public class Server {
private static int DEFAULT_PORT =12345;
private static ServerHandlerserverHandle;
public static void start() {
start(DEFAULT_PORT);
}
public static synchronized void start(int port) {
if (serverHandle !=null) {serverHandle.stop(); }
serverHandle =new ServerHandler(port);
new Thread(serverHandle, "Server").start();
}
public static void main(String[] args) {
start();
}
}
```
```
/**
* NIO服务端
* @author wcs
* @version 1.0
*/
public class ServerHandlerimplements Runnable{
private Selectorselector;
private ServerSocketChannelserverChannel;
private volatile boolean started;
/**
* 构造方法
* @param port 指定要监听的端口号
*/
public ServerHandler(int port) {
try{
//创建reactor线程,创建多路复用器
selector = Selector.open();
//打开监听通道 监听客户端连接
serverChannel = ServerSocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
serverChannel.configureBlocking(false);//开启非阻塞模式
//绑定监听端口 设置连接为非阻塞模式 backlog设为1024
serverChannel.socket().bind(new InetSocketAddress(port),1024);
//将channel 注册到selector,监听客户端连接ACCEPT请求
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//标记服务器已开启
started =true;
System.out.println("服务器已启动,端口号:" + port);
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started =false;
}
@Override
public void run() {
//循环遍历selector
while(started){
try{
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(1000);
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
// selector.select();
Set keys =selector.selectedKeys();
Iterator it = keys.iterator();
SelectionKey key =null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key !=null){
key.cancel();
if(key.channel() !=null){
key.channel().close();
}
}
}
}
}catch(Throwable t){
t.printStackTrace();
}
}
//selector关闭后会自动释放里面管理的资源
if(selector !=null)
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key)throws IOException{
if(key.isValid()){
//处理新接入的请求消息
if(key.isAcceptable()){
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//通过ServerSocketChannel的accept创建SocketChannel实例
//完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
SocketChannel sc = ssc.accept();
//设置为非阻塞的
sc.configureBlocking(false);
//注册为读
sc.register(selector, SelectionKey.OP_READ);
}
//读消息
if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
//创建ByteBuffer,并开辟一个1K的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes =new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String expression =new String(bytes,"UTF-8");
System.out.println("服务器收到消息:" + expression);
//处理数据
String result =null;
try{
result = expression+"nio hshs";
}catch(Exception e){
result ="计算错误:" + e.getMessage();
}
//发送应答消息
doWrite(sc,result);
}
//没有读取到字节 忽略
// else if(readBytes==0);
//链路已经关闭,释放资源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//异步发送应答消息
private void doWrite(SocketChannel channel,String response)throws IOException{
//将消息编码为字节数组
byte[] bytes = response.getBytes();
//根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//发送缓冲区的字节数组
channel.write(writeBuffer);
//****此处不含处理“写半包”的代码
}
}
```
客户端代码
```
public class Client {
private static StringDEFAULT_HOST ="127.0.0.1";
private static int DEFAULT_PORT =12345;
private static ClientHandlerclientHandle;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
if(clientHandle!=null)
clientHandle.stop();
clientHandle =new ClientHandler(ip,port);
new Thread(clientHandle,"Server").start();
}
//向服务器发送消息
public static boolean sendMsg(String msg)throws Exception{
if(msg.equals("q"))return false;
clientHandle.sendMsg(msg);
return true;
}
public static void main(String[] args){
start();
}
}
```
```
package com.wcs.learn.NIO;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* NIO客户端
* @author wcs
* @version 1.0
*/
public class ClientHandlerimplements Runnable{
private Stringhost;
private int port;
private Selectorselector;
private SocketChannelsocketChannel;
private volatile boolean started;
public ClientHandler(String ip,int port) {
this.host = ip;
this.port = port;
try{
//创建选择器
selector = Selector.open();
//打开监听通道
socketChannel = SocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
socketChannel.configureBlocking(false);//开启非阻塞模式
started =true;
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started =false;
}
@Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循环遍历selector
while(started){
try{
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(1000);
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
// selector.select();
Set keys =selector.selectedKeys();
Iterator it = keys.iterator();
SelectionKey key =null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key !=null){
key.cancel();
if(key.channel() !=null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector关闭后会自动释放里面管理的资源
if(selector !=null)
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key)throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect());
else System.exit(1);
}
//读消息
if(key.isReadable()){
//创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes =new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result =new String(bytes,"UTF-8");
System.out.println("客户端收到消息:" + result);
}
//没有读取到字节 忽略
// else if(readBytes==0);
//链路已经关闭,释放资源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//异步发送消息
private void doWrite(SocketChannel channel,String request)throws IOException{
//将消息编码为字节数组
byte[] bytes = request.getBytes();
//根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//发送缓冲区的字节数组
channel.write(writeBuffer);
//****此处不含处理“写半包”的代码
}
private void doConnect()throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port)));
else socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
public void sendMsg(String msg)throws Exception{
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}
}
```