nio为所有的原始类型(boolean类型除外)提供缓存支持的数据容器,提供多路(no-blocking)非阻塞式的高伸缩性网络I/O
本文先介绍NIO三大组件 Buffr、Channel、Selector
Buffer
Buffer其实本质就是内存块,所有的数据读写都要依赖这个,我们可以先将数据读取到这个内存块,然后从这个内存块读取数据。
我们可以把Buffer理解为数组,Buffer有几个重要的属性:position、limit、capacity。
position 的初始值是 0,每往 Buffer 中写入一个值,position 就自动加 1,代表下一次的写入位置。读操作的时候也是类似的,每读一个值,position 就自动加 1。
Buffer可以读写模式切换,从写操作模式到读操作模式切换的时候(flip),position 都会归零,这样就可以从头开始读写了。
Limit:写操作模式下,limit 代表的是最大能写入的数据,这个时候 limit 等于 capacity。写结束后,切换到读模式,此时的 limit 等于 Buffer 中实际的数据大小,因为 Buffer 不一定被写满了。
读写模式切换flip,实际就是limit和position值交换,position值清零
public final Buffer flip() {
limit = position; // 将 limit 设置为实际写入的数据数量
position = 0; // 重置 position 为 0
mark = -1; // mark 之后再说
return this;
}
初始化Buffer方法
public static ByteBuffer wrap(byte[] array) {
...
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
读取数据到Bufer方法
int num = channel.read(buf);
将Buffer数据写入channel
int num = channel.write(buf);
附加一个Buffer经常使用方法
new String(buffer.array()).trim();
Channel
所有NIO操作都始于通过,发起请求时会选择相应的通道通信,通道是数据来源地或写入的目的地。
*FileChannel:文件通道,用于文件的读写
*DatagramChannel:用于UDP连接的接收和发送
*SocketChannel:TCP客户端
*ServerSocketChannel:TCP服务端
Selector
Selector建立在非阻塞的模式下,所以注册到Selector的channel必须要支持非阻塞模式。Selector实现了多路复用,用于一个线程可以管理多个Channel。
Selector selector = Selector.open();
// 将通道设置为非阻塞模式,因为默认都是阻塞模式的
channel.configureBlocking(false);
// 注册
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
SelectKey有四个事件
- SelectKey.OP_ACCEPT:接收TCP连接
- SelectKey.OP_READ:通道有数据可以读
- SelectKey.OP_WRITE:通道有数据可以写
- SelectKey.OP_CONNECT:成功建立TCP连接
上面介绍了NIO三大组件,这边介绍NIO特性以及阻塞和异步概念以及实现。
NIO,JDK1.4,New IO,Non-Blocking IO
NIO.2,JDK7,More New IO,Asynchronous IO,严格地说 NIO.2 不仅仅引入了 AIO
NIO的工作原理
- 由一个专门的线程来处理所有的 IO 事件,并负责分发。
- 事件驱动机制:事件到的时候触发,而不是同步的去监视事件。
- 线程通讯:线程之间通过 wait,notify 等方式通讯。保证每次上下文切换都是有意义的。减少无谓的线程切换。
阻塞IO模式
服务端
public class Server {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 监听 8080 端口进来的 TCP 链接
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while (true) {
// 这里会阻塞,直到有一个请求的连接进来
SocketChannel socketChannel = serverSocketChannel.accept();
// 开启一个新的线程来处理这个请求,然后在 while 循环中继续监听 8080 端口
SocketHandler handler = new SocketHandler(socketChannel);
new Thread(handler).start();
}
}
}
SocketHandler实现
public class SocketHandler implements Runnable {
private SocketChannel socketChannel;
public SocketHandler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// 将请求数据读入 Buffer 中
int num;
while ((num = socketChannel.read(buffer)) > 0) {
// 读取 Buffer 内容之前先 flip 一下
buffer.flip();
// 提取 Buffer 中的数据
byte[] bytes = new byte[num];
buffer.get(bytes);
String re = new String(bytes, "UTF-8");
System.out.println("收到请求:" + re);
// 回应客户端
ByteBuffer writeBuffer = ByteBuffer.wrap(("我已经收到你的请求,你的请求内容是:" + re).getBytes());
socketChannel.write(writeBuffer);
buffer.flip();
}
} catch (IOException e) {
IOUtils.closeQuietly(socketChannel);
}
}
}
客户端
public class SocketChannelTest {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));
// 发送请求
ByteBuffer buffer = ByteBuffer.wrap("1234567890".getBytes());
socketChannel.write(buffer);
// 读取响应
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int num;
if ((num = socketChannel.read(readBuffer)) > 0) {
readBuffer.flip();
byte[] re = new byte[num];
readBuffer.get(re);
String result = new String(re, "UTF-8");
System.out.println("返回值: " + result);
}
}
}
阻塞模式的IO其实就是服务端为每次客户端请求分配一个线程去执行,首先accept是个阻塞操作,当有请求到达时才会返回。然后立即分配一个线程去处理这个请求。请注意这个线程不会立即读写,还需要等到通道读写准备就绪才可以读写,在这之前会一直阻塞。在多线程高并发的情况,线程创建过多,内存开销过大,以及线程切换上下文开销太大,导致系统假死。这种方式不可取。
非阻塞IO
非阻塞IO核心是一个Selector管理多个通道,将各个通道注册到 Selector 上,指定监听的事件,之后可以只用一个线程来轮询这个 Selector,看看上面是否有通道是准备好的,当通道准备好可读或可写,然后才去开始真正的读写,这样速度就很快了。我们就完全没有必要给每个通道都起一个线程。
Selector底层实现的三种方式
- select
缺点:
1.单个进程能够监视的文件描述符的数量存在最大限制
2.内核 / 用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销
3.select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件
4.select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。
- poll
相比select模型,poll使用链表保存文件描述符,因此没有了监视文件数量的限制,但其他三个缺点依然存在。
- epoll
通过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效,解决了select/poll缺点。
每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件,这些事件都会挂载在红黑树中。而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
1)调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)
2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字
3)调用epoll_wait收集发生的事件的连接
select 和 poll 都有一个共同的问题,那就是它们都只会告诉你有几个通道准备好了,但是不会告诉你具体是哪几个通道。所以,一旦知道有通道准备好以后,自己还是需要进行一次扫描,显然这个不太好,通道少的时候还行,一旦通道的数量是几十万个以上的时候,扫描一次的时间都很可观了,时间复杂度 O(n)。
服务端
public class SelectorServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress(8080));
// 将其注册到 Selector 中,监听 OP_ACCEPT 事件
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 需要不断地去调用 select() 方法获取最新的准备好的通道
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
// 遍历
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
// 有已经接受的新的到服务端的连接
SocketChannel socketChannel = server.accept();
// 有新的连接并不代表这个通道就有数据,
// 这里将这个新的 SocketChannel 注册到 Selector,监听 OP_READ 事件,等待数据
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 有数据可读
// 上面一个 if 分支中注册了监听 OP_READ 事件的 SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int num = socketChannel.read(readBuffer);
if (num > 0) {
// 处理进来的数据...
System.out.println("收到数据:" + new String(readBuffer.array()).trim());
socketChannel.register(selector, SelectionKey.OP_WRITE);
} else if (num == -1) {
// -1 代表连接已经关闭
socketChannel.close();
}
}
else if (key.isWritable()) {
// 通道可写
// 给用户返回数据的通道可以进行写操作了
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.wrap("返回给客户端的数据...".getBytes());
socketChannel.write(buffer);
// 重新注册这个通道,监听 OP_READ 事件,客户端还可以继续发送内容过来
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
}
}
}
客户端代码同阻塞模式客户端代码
NIO.2 异步 IO
异步IO两种实现方式
1.返回 Future 实例
2.提供 CompletionHandler 回调函数
代码实现
服务端
public class Server {
public static void main(String[] args) throws IOException {
// 实例化,并监听端口
AsynchronousServerSocketChannel server =
AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));
// 自己定义一个 Attachment 类,用于传递一些信息
Attachment att = new Attachment();
att.setServer(server);
server.accept(att, new CompletionHandler<AsynchronousSocketChannel, Attachment>() {
@Override
public void completed(AsynchronousSocketChannel client, Attachment att) {
try {
SocketAddress clientAddr = client.getRemoteAddress();
System.out.println("收到新的连接:" + clientAddr);
// 收到新的连接后,server 应该重新调用 accept 方法等待新的连接进来
att.getServer().accept(att, this);
Attachment newAtt = new Attachment();
newAtt.setServer(server);
newAtt.setClient(client);
newAtt.setReadMode(true);
newAtt.setBuffer(ByteBuffer.allocate(2048));
// 这里也可以继续使用匿名实现类,不过代码不好看,所以这里专门定义一个类
client.read(newAtt.getBuffer(), newAtt, new ChannelHandler());
} catch (IOException ex) {
ex.printStackTrace();
}
}
@Override
public void failed(Throwable t, Attachment att) {
System.out.println("accept failed");
}
});
// 为了防止 main 线程退出
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
}
}
}
ChannelHandler
public class ChannelHandler implements CompletionHandler<Integer, Attachment> {
@Override
public void completed(Integer result, Attachment att) {
if (att.isReadMode()) {
// 读取来自客户端的数据
ByteBuffer buffer = att.getBuffer();
buffer.flip();
byte bytes[] = new byte[buffer.limit()];
buffer.get(bytes);
String msg = new String(buffer.array()).toString().trim();
System.out.println("收到来自客户端的数据: " + msg);
// 响应客户端请求,返回数据
buffer.clear();
buffer.put("Response from server!".getBytes(Charset.forName("UTF-8")));
att.setReadMode(false);
buffer.flip();
// 写数据到客户端也是异步
att.getClient().write(buffer, att, this);
} else {
// 到这里,说明往客户端写数据也结束了,有以下两种选择:
// 1. 继续等待客户端发送新的数据过来
// att.setReadMode(true);
// att.getBuffer().clear();
// att.getClient().read(att.getBuffer(), att, this);
// 2. 既然服务端已经返回数据给客户端,断开这次的连接
try {
att.getClient().close();
} catch (IOException e) {
}
}
}
@Override
public void failed(Throwable t, Attachment att) {
System.out.println("连接断开");
}
}
客户端
public class Client {
public static void main(String[] args) throws Exception {
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
// 来个 Future 形式的
Future<?> future = client.connect(new InetSocketAddress(8080));
// 阻塞一下,等待连接成功
future.get();
Attachment att = new Attachment();
att.setClient(client);
att.setReadMode(false);
att.setBuffer(ByteBuffer.allocate(2048));
byte[] data = "I am obot!".getBytes();
att.getBuffer().put(data);
att.getBuffer().flip();
// 异步发送数据到服务端
client.write(att.getBuffer(), att, new ClientChannelHandler());
// 这里休息一下再退出,给出足够的时间处理数据
Thread.sleep(2000);
}
}
ClientChannelHandler
public class ClientChannelHandler implements CompletionHandler<Integer, Attachment> {
@Override
public void completed(Integer result, Attachment att) {
ByteBuffer buffer = att.getBuffer();
if (att.isReadMode()) {
// 读取来自服务端的数据
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
String msg = new String(bytes, Charset.forName("UTF-8"));
System.out.println("收到来自服务端的响应数据: " + msg);
// 接下来,有以下两种选择:
// 1. 向服务端发送新的数据
// att.setReadMode(false);
// buffer.clear();
// String newMsg = "new message from client";
// byte[] data = newMsg.getBytes(Charset.forName("UTF-8"));
// buffer.put(data);
// buffer.flip();
// att.getClient().write(buffer, att, this);
// 2. 关闭连接
try {
att.getClient().close();
} catch (IOException e) {
}
} else {
// 写操作完成后,会进到这里
att.setReadMode(true);
buffer.clear();
att.getClient().read(buffer, att, this);
}
}
@Override
public void failed(Throwable t, Attachment att) {
System.out.println("服务器无响应");
}
}
从代码可以看出来,阻塞IO从连接请求(accept)就开始阻塞一直到通道读写完成;非阻塞IO从连接请求(accept)之前一直阻塞,连接之后非阻塞,通过注册读写事件,委托工作线程执行;异步IO从连接请求(accept)之前就是异步的,通过回调函数或者Future实现。
这边开始介绍netty 一个高性能NIO网络通信框架,这p边以应用为主,理论上文已经说得很多了。
Netty搭建一个构建Http RPC框架
服务端NettyHttpServer
@Component
public class NettyHttpServer implements ApplicationListener<ContextRefreshedEvent>,Ordered{
public void start() {
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup childGroup = new NioEventLoopGroup();
EventLoopGroup parentGroup = new NioEventLoopGroup();
//accept,read,write
serverBootstrap.group(parentGroup, childGroup);
serverBootstrap.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
//http解码,编码器
ch.pipeline().addLast(new HttpRequestDecoder());
ch.pipeline().addLast(new HttpResponseEncoder());
ch.pipeline().addLast(new HttpObjectAggregator(1048576));
ch.pipeline().addLast(new HttpServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); //
ChannelFuture future = null;
try {
future = serverBootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
}
}
public int getOrder() {
return 20;
}
public void onApplicationEvent(ContextRefreshedEvent arg0) {
start();
}
}
HttpServerHandler
public class HttpServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = new Object();
try {
if(msg instanceof FullHttpRequest){
String content = ((FullHttpRequest)msg).content().toString(Charset.defaultCharset());
System.out.println(content);
//首先根据request content获取是哪个controller,并且要获取对应的请求方法
RequestParam requestParam = JSONObject.parseObject(content, RequestParam.class);
String command = requestParam.getCommand();
//然后去执行相对应的 方法
BeanMethod beanMethod = Media.commandBeans.get(command);
if(beanMethod !=null){
Object bean = beanMethod.getBean();
Method m = beanMethod.getM();
Class<?> paramType = m.getParameterTypes()[0];
Object param=null;
if(paramType.isAssignableFrom(List.class)){
param = JSONArray.parseArray(JSONArray.toJSONString(requestParam.getContent()), paramType);
}else{
param = JSON.parseObject(JSONObject.toJSONString(requestParam.getContent()), paramType);
}
result = m.invoke(bean, param);
ResponseParam responseParam = new ResponseParam();
responseParam.setCode("00000");
responseParam.setResult(result);
result = responseParam;
}
}
} catch (Exception e) {
e.printStackTrace();
ResponseParam responseParam = new ResponseParam();
String failMsg = "您的请求异常!";
responseParam.setCode("33333");
responseParam.setResult(failMsg);
result = responseParam;
}
DefaultFullHttpResponse response =new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus .OK, Unpooled.wrappedBuffer(JSONObject.toJSONString(result).getBytes(Charset.defaultCharset()))); response.headers().set(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE); response.headers().set(HttpHeaderNames.CONTENT_LENGTH,response.content().readableBytes()); response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
ctx.channel().writeAndFlush(response);
}
}
MediaInit 初始化Mapping
@Component
public class MediaInit implements ApplicationListener<ContextRefreshedEvent>,Ordered{
public void onApplicationEvent(ContextRefreshedEvent event) {
//根据Spring容器,找到包含有Controller注解的所有bean
Map<String,Object> beans = event.getApplicationContext().getBeansWithAnnotation(Controller.class);
Map<String,BeanMethod> commandBeans = Media.commandBeans;
for(String key : beans.keySet()){
Object bean = beans.get(key);
Method[] ms = bean.getClass().getDeclaredMethods();
for(Method m : ms){
if(m.isAnnotationPresent(Remote.class)){
Remote remote = m.getAnnotation(Remote.class);
String command = remote.value();
BeanMethod beanMethod = new BeanMethod();
beanMethod.setBean(bean);
beanMethod.setM(m);
commandBeans.put(command, beanMethod);
}
}
}
}
public int getOrder() {
return 0;
}
}
客户端NettyClient
public class NettyHttpClient {
public static void main(String[] args) {
Bootstrap b = new Bootstrap();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new HttpObjectAggregator(1048576));
ch.pipeline().addLast(new HttpClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect("localhost", 8080).sync(); // (5)
String uri ="http://localhost:8080/";
RequestParam requestParam = new RequestParam();
requestParam.setCommand("productPlanSearch");
requestParam.setContent("1");
String requestContent = JSONObject.toJSONString(requestParam);
ByteBuf content = Unpooled.wrappedBuffer(requestContent.getBytes(Charset.defaultCharset()));
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
HttpMethod.POST, uri , content );
request.headers().set(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE);
request.headers().set(HttpHeaderNames.CONTENT_LENGTH,request.content().readableBytes());
request.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
f.channel().writeAndFlush(request);
f.channel().closeFuture().sync();
ResponseParam response = (ResponseParam)f.channel().attr(AttributeKey.valueOf("httpResultKey")).get();
if(response.getCode().equals("00000")){
System.out.println(response.getResult());
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
workerGroup.shutdownGracefully();
}
}
}
HttpClientHandler
public class HttpClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof DefaultHttpResponse){
}
if(msg instanceof FullHttpResponse){
String result = ((FullHttpResponse)msg).content().toString(Charset.defaultCharset());
ResponseParam response = JSONObject.parseObject(result,ResponseParam.class);
ctx.channel().attr(AttributeKey.valueOf("httpResultKey")).set(response);
ctx.channel().close();
}
}
}
补充:dubbo底层网络通信也用得是netty,dubbo协议默认是长连接,客户端一次连接会发送多个数据包,当客户端闲置的时候通过心跳检测来维持长连接通信。至于dubbo如何解决多线程 粘包拆包问题,这个会在下一个文章介绍。