Netty实践与NIO原理
一、阻塞IO与非阻塞IO
Linux网络IO模型(5种)
(1)阻塞IO模型
所有文件操作都是阻塞的,以套接字接口为例,在进程空间中调用recvfrom,系统调用直到数据包到达且被复制到应用进程缓冲区或发生错误时才返回,期间会一直等待(阻塞)。模型如图:
(2)非阻塞IO模型
recvfrom从应用层到内核时,如果该缓冲区没数据,直接返回一个EWOULDBLOCK错误,反复轮询检查这个状态,看是否有数据到来。如图:
(3)IO复用模型
Linux提高select/poll,进程通过将一个或多个fd(file descriptor)传递给select或poll系统调用,阻塞在select操作上,侦测多个fd是否处于就绪状态。select/poll顺序扫描fd是否就绪,而且支持的fd数量有限。Linux还提供了一个epoll系统调用,使用基于事件驱动的方式代替顺序扫描,性能更高。当有fd就绪时,立即回调函数rollback。如图:
(4)信号驱动IO模型
首先开启套接口信号驱动IO功能,通过系统调用sigaction执行一个信号处理函数,该函数立即返回,进程继续工作,它是非阻塞的。当数据准备就绪时,就为该进程生成一个SIGIO信号,通过信号回调通知应用程序调用recfrom来读取数据,通知主循环函数处理数据。如图:
(5)异步IO模型
告知内核启动某个操作,让内核在整个操作完成后(包括将数据从内核复制到用户自己的缓冲区)通知我们。它与信号驱动的主要区别是:信号驱动IO由内核告知我们何时开始一个IO操作,异步IO模型由内核通知我们IO操作何时已经完成。如图所示:
IO多路复用的应用:
通过把多个IO的阻塞复用到一个select的阻塞上,使系统在单线程下可处理多个客户端请求。与传统多线程模型相比,最大优势是系统开销小,不需要创建额外进程或线程。主要应用场景如下:
(1)服务器需要同时处理多个处于监听状态或连接状态的套接字
(2)服务器需要同时处理多种网络协议的套接字
Linux最终选择epoll支持IO多路复用的系统调用,优点如下:
(1)支持一个进程打开的socket描述符(FD)不受限制(select单线程默认1024太少,epoll仅受限操作系统最大文件句柄数,1GB内存机器大约10万句柄)
(2)IO效率不会随FD数目增加而线性下降(只对“活跃”的socke进行t操作,活跃socket才会去主动调用callback函数)
(3)使用mmap加速内核与用户空间消息传递(同一块内存,避免不必要复制)
(4)API简单:创建epoll描述符,添加监听事件,阻塞等待监听事件发生,关闭epoll描述符等
二、阻塞IO的例子(结合线程池)
//1.服务端
package com.xbai.io;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import com.xbai.executor.TimeServerHandlerExecutePool;
import com.xbai.handler.TimeServerHandler;
public class TimeServerExecutor {
public static void main(String[] args)throws IOException {
int port =8080;
if(args !=null && args.length >0){
try {
port = Integer.valueOf(args[0]);
}catch (Exception e) {
// TODO: handle exception
}
}
ServerSocket server =null;
try {
server =new ServerSocket(port);
System.out.println("The time server is started in port : " + port);
TimeServerHandlerExecutePool singleExecutor =new TimeServerHandlerExecutePool(50,10000);
while(true){
Socket socket = server.accept();
singleExecutor.execute(new TimeServerHandler(socket));
}
}finally {
if(server !=null){
System.out.println("The time server closed");
server.close();
server =null;
}
}
}
}
//2.服务端线程池
package com.xbai.executor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TimeServerHandlerExecutePool {
private ExecutorServiceexecutor;
public TimeServerHandlerExecutePool(int maxPoolSize,int queueSize){
executor =new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),maxPoolSize,120L,TimeUnit.SECONDS,
new ArrayBlockingQueue(queueSize));//线程池要执行的任务阻塞成一个队列,其内部的机制是等待唤醒生产者和消费者线程,有一个生产就可唤醒一个消费,去看源码的线程池原理
}
public void execute(Runnable task){
executor.execute(task);
}
}
//3.服务端处理器
package com.xbai.handler;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
import java.sql.Date;
public class TimeServerHandlerimplements Runnable{
private Socketsocket;
public TimeServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
// TODO Auto-generated method stub
BufferedReader br =null;
PrintWriter pw =null;
try {
br =new BufferedReader(new InputStreamReader(socket.getInputStream()));
pw =new PrintWriter(socket.getOutputStream(),true);
String curTime =null;
String msg =null;
while(true){
msg = br.readLine();
if(msg ==null){
break;
}
System.out.println("The time server received order:" + msg);
curTime ="query time order".equalsIgnoreCase(msg) ?new Date(
System.currentTimeMillis()).toString() :"bad order";
pw.println(curTime);//这里不写println,就无法插入换行符,那边就不能readLine,一直阻塞,无法获取数据
}
}catch (IOException e) {
if(br !=null){
try {
br.close();
}catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
if(pw !=null){
pw.close();
pw =null;
}
if(socket !=null){
try {
socket.close();
}catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
socket =null;
}
}
}
}
//4.客户端代码
package com.xbai.io;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
public class TimeClient {
public static void main(String[] args) {
int port =8080;
if(args !=null && args.length >0){
try {
port = Integer.valueOf(args[0]);
}catch (Exception e) {
// TODO: handle exception
}
}
Socket socket =null;
BufferedReader br =null;
PrintWriter pw =null;
try {
socket =new Socket("localhost",port);
br =new BufferedReader(new InputStreamReader(socket.getInputStream()));
pw =new PrintWriter(socket.getOutputStream(),true);
pw.println("query time order");
System.out.println("send order succeed");
String resp = br.readLine();
System.out.println("Now is :" + resp);
}catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
if(pw !=null){
pw.close();
pw =null;
}
if(br !=null){
try {
br.close();
}catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
br =null;
}
if(socket !=null){
try {
socket.close();
}catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
socket =null;
}
}
}
}
执行结果
服务端启动及收发:
客户端发送和接收:
三、非阻塞IO的例子(原生Java NIO,目前有写半包等问题,怀疑服务端没有写出去导致的客户端Selector的关闭状态异常)
//1.服务端主程序
package com.xiaobai.nio;
public class NIOServer {
public static void main(String[] args) {
MultiplexerTimeServer timeServer =new MultiplexerTimeServer();
new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
}
}
//2.服务端timeServer
package com.xiaobai.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
public class MultiplexerTimeServerimplements Runnable {
private Selectorselector;
private ServerSocketChannelservChannel;
private volatile boolean stop;
public MultiplexerTimeServer() {
try {
selector = Selector.open();//建立Selector
servChannel = ServerSocketChannel.open();//建立Channel
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(2048), 1024);//ServerSocket绑定
servChannel.register(selector, SelectionKey.OP_ACCEPT);//向Selector注册ACCEPT事件
System.out.println("The time server is started in port 2048");
}catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run() {
while(!stop){
try {
selector.select(1000);//轮询Channel
Set selectedKeys =selector.selectedKeys();
Iterator it = selectedKeys.iterator();
SelectionKey key =null;
while(it.hasNext()){
key = it.next();
it.remove();//移除它
try {
handleInput(key);
}catch (Exception e) {
if(key !=null){
key.cancel();
if(key.channel() !=null){
key.channel().close();
}
}
}
}
}catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
if(selector !=null){
try {
selector.close();
}catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key)throws IOException{
if(key.isValid()){
//处理新接入的请求
if(key.isAcceptable()){//此前已向Selector注册,并已open
//获取server channel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//获取client channel
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//第一次捕捉到的客户端向Selector注册READ事件
sc.register(selector, SelectionKey.OP_READ);
}
//处理已注册的读事件
if(key.isReadable()){
//获取客户端Channel
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);//读到缓冲
if(readBytes >0){
readBuffer.flip();
// Buffer java.nio.Buffer.flip()
//
//
// Flips this buffer. The limit is set to the current position and then the position is set to zero. If the mark is defined then it is discarded.
//
// After a sequence of channel-read or put operations, invoke this method to prepare for a sequence of channel-write or relative get operations. For example:
//
// buf.put(magic); // Prepend header
// in.read(buf); // Read data into rest of buffer
// buf.flip(); // Flip buffer
// out.write(buf); // Write header + data to channel
byte[] bytes =new byte[readBuffer.remaining()];//缓冲中有多少个字节数据
readBuffer.get(bytes);
String body =new String(bytes,"UTF-8");
System.out.println("The time server receive order : " + body);
String currentTime ="QUERY TIME ORDER".equalsIgnoreCase(body) ?new Date(//
System.currentTimeMillis()).toString() :"BAD ORDER";
doWrite(sc,currentTime);
}else if(readBytes <0){
//贵在坚持!
//对端链路关闭
// key.cancel();
// sc.close();
}else{
;//读到0字节,忽略
}
}
}
}
private void doWrite(SocketChannel channel, String response)throws IOException{
if(response !=null && response.trim().length() >0){
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//根据字节数组容量创建ByteBuffer
writeBuffer.put(bytes);//字节数组复制到缓冲区
writeBuffer.flip();
channel.write(writeBuffer);//SocketChannel是异步非阻塞的,不保证一次发送完,出现“写半包”问题,
//这里缺少注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕
//TODO 这里有问题,没有写出去,导致客户端无法收到消息,显示Selector关闭状态异常
}
}
}
//3.客户端主程序
package com.xiaobai.nio;
public class NIOClient {
public static void main(String[] args) {
TimeClientHandle timeClientHandle =new TimeClientHandle("127.0.0.1",2048);
new Thread(timeClientHandle,"NIO-TimeClient-001").start();
}
}
//4.客户端timeClient
package com.xiaobai.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
public class TimeClientHandleimplements Runnable {
private Stringhost;
private int port;
private Selectorselector;
private SocketChannelsocketChannel;
private volatile boolean stop;
public TimeClientHandle(String host,int port) {
this.host = host==null?"127.0.0.1":host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
}catch (Exception e) {
// TODO: handle exception
}
}
@Override
public void run() {
try {
doConnect();
}catch (Exception e) {
// TODO: handle exception
}
while(!stop){
try {
selector.select(3000);
Set selectedKeys =selector.selectedKeys();
Iterator it = selectedKeys.iterator();
SelectionKey key =null;
while(it.hasNext()){
key = it.next();
it.remove();
try {
handleInput(key);
}catch (Exception e) {
if(key !=null){
key.cancel();
if(key.channel() !=null){
key.channel().close();
}
}
}
}
}catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(selector !=null){
try {
selector.close();
}catch (Exception e) {
// TODO: handle exception
}
}
}
}
private void handleInput(SelectionKey key)throws Exception{
if(key.isValid()){
//判断是否连接成功
//连接方法中已有连接不成功注册连接事件的逻辑,反复尝试连接,这里判断,如果成功,注册该客户连接的read事件准备接收数据
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect()){
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);//本客户向外写东西
}
}
//下面是从服务器接收数据
if(key.isReadable()){
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);//读到缓冲
if(readBytes >0){
readBuffer.flip();
// Buffer java.nio.Buffer.flip()
//
//
// Flips this buffer. The limit is set to the current position and then the position is set to zero. If the mark is defined then it is discarded.
//
// After a sequence of channel-read or put operations, invoke this method to prepare for a sequence of channel-write or relative get operations. For example:
//
// buf.put(magic); // Prepend header
// in.read(buf); // Read data into rest of buffer
// buf.flip(); // Flip buffer
// out.write(buf); // Write header + data to channel
byte[] bytes =new byte[readBuffer.remaining()];//缓冲中有多少个字节数据
readBuffer.get(bytes);
String body =new String(bytes,"UTF-8");
System.out.println("Now is : " + body);
this.stop =true;
}else if(readBytes <0){
//贵在坚持!
//对端链路关闭
key.cancel();
sc.close();
}else{
;//读到0字节,忽略
}
}
}
}
private void doConnect()throws IOException {
//如果连接成功,则直接注册到多路复用器上,发送请求消息,读应答
if(socketChannel.connect(new InetSocketAddress(host, port))){//异步连接,直至成功
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else{//注册连接事件,轮询直至连接成功
//异步,到底是什么概念?底层是什么原理?TCP/IP层面
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel sc)throws IOException {
//本客户向外写东西
byte[] req ="QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if(!writeBuffer.hasRemaining()){
System.out.println("Send order 2 server succeed.");
}
}
}
四、TCP与UDP
五、网络传输粘包与拆包问题
六、Netty入门案例与原理分析、Reactor模式
第一个例子:
//1.NettyServer
package com.xiaobai.server.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
public class NettyServer {
private final int port;
public NettyServer(int port) {
this.port = port;
}
public static void main(String[] args)throws InterruptedException {
if(args.length !=1) {
System.err.println("Usage:" + NettyServer.class.getSimpleName() +" ");
return;
}
int port = Integer.parseInt(args[0]);
new NettyServer(port).start();
}
private void start()throws InterruptedException {
final NettyServerHandler serverHandler =new NettyServerHandler();
EventLoopGroup group =new NioEventLoopGroup();
try {
ServerBootstrap b =new ServerBootstrap();
b.group(group).channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel)throws Exception {
socketChannel.pipeline().addLast(serverHandler);
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully().sync();
}
}
}
//2.NettyServerHandler
package com.xiaobai.server.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
@ChannelHandler.Sharable
public class NettyServerHandlerextends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);//关闭该Channel
}
}
//3.NettyClient
package com.xiaobai.server.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class NettyClient {
private final Stringhost;
private final int port;
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
public static void main(String[] args)throws InterruptedException {
if(args.length !=2) {
System.err.println("Usage:" + NettyServer.class.getSimpleName() +" ");
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new NettyClient(host,port).start();
}
private void start()throws InterruptedException {
EventLoopGroup group =new NioEventLoopGroup();
try {
Bootstrap b =new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host,port))
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel)throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully().sync();
}
}
}
//4.NettyClientHandler
package com.xiaobai.server.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerInvoker;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.EventExecutorGroup;
public class NettyClientHandlerextends SimpleChannelInboundHandler {
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)throws Exception {
System.out.println("Client received: " + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx)throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",CharsetUtil.UTF_8));
}
}
执行结果:
服务端:
客户端:
七、Netty对粘包拆包的解决方案
八、编码与解码
九、序列化与反序列化
十、网络传输私有协议制定与聊天室业务实现