千言万语不如撸码实战,begin!
首先说明此篇文章只是重要部分代码示例,完整代码,末尾会有彩蛋。
netty配置类代码块:
创建constant类,描述:常量类,目前存放的是所有推送类型
/**常量类*/
public class Constant {
//测试推送1
public static final int PUSH_TEST_ONE=1;
//测试推送2
public static final int PUSH_TEST_TWO=2;
}
创建ChildChannelHandler类,描述:初始化SokcetChannel(socket通道),添加socket通道到pipeline(管道),注册到WorkGroup(负责读写的nioEventLoop中),注册成功后,会调用pipeline的channelRegistered方法,将registered事件在pipeline上传播。
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class ChildChannelHandler extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**通过管道,添加handler,HttpServerCodec是由netty自己提供的助手类,可以理解为拦截器,当请求到服务端,我们需要做解码,响应到客户端做解码*/
ch.pipeline().addLast("http-codec",new HttpServerCodec());
ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket"));
ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
//添加自定义的助手类
ch.pipeline().addLast("handler",new NettyServerHandler());
}
}
创建nettyCache类,描述:处理所有类型的通道保存类
import com.alibaba.fastjson.JSONObject;
import com.king.common.Constant;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**所有类型的通道保存类*/
@Component
public class NettyCache {
public static Map<Integer,ChannelGroup> channels = new HashMap<Integer,ChannelGroup>();
public static Map<ChannelId,JSONObject> channelMessage = new ConcurrentHashMap<ChannelId,JSONObject>();
public static ChannelGroup defaultGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@PostConstruct
public void initAllChannel(){
String[] pushTypes = getAllPushType();
for (int i=0;i<pushTypes.length;i++){
channels.put(Integer.valueOf(pushTypes[i]),new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
}
}
/**
* 获取所有推送的类型
*/
private String[] getAllPushType(){
Field[] fields = Constant.class.getDeclaredFields();
String[] allType = null;
List<String> pushType = new ArrayList<String>();
for(int i = 0;i<fields.length;i++){
String fieldName = fields[i].getName();
if(fieldName.startsWith("PUSH_")){//
try {
Object s = Constant.class.getField(fieldName).get(null);
pushType.add(s.toString());
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}
}
allType = pushType.toArray(new String[pushType.size()]);
return allType;
}
}
创建nettyServer类,描述:处理netty服务初始化启动,监听socket等操作。
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.king.common.CommonConfig;
import com.king.util.ApplicationContextHelper;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@Service
public class NettyServer {
private static Logger log=LoggerFactory.getLogger(NettyServer.class);
@PostConstruct
public void initServer() {
new Thread() {
public void run() {
new NettyServer().run();
}
}.start();
}
public void run() {
//定义一对线程组
//主线程组,用于接收客户端的连接,但是不做任何处理,跟老板一样不做事
EventLoopGroup bossGroup=new NioEventLoopGroup();
//从线程组,老板线程组会把任务丢给它,让手下线程组去做任务
EventLoopGroup workGroup=new NioEventLoopGroup();
try {
//netty服务器的创建,ServerBootstrap 是一个启动类
ServerBootstrap socket=new ServerBootstrap();
//设置主从线程组
socket.group(bossGroup,workGroup);
//设置nio的双向通道
socket.channel(NioServerSocketChannel.class);
//子处理器,用于处理workGroup
socket.childHandler(new ChildChannelHandler());
CommonConfig config=(CommonConfig) ApplicationContextHelper.getBean(CommonConfig.class);
//启动server,并绑定端口号,同时启动方式为同步
Channel channel=socket.bind(config.getWebSocketPort()).sync().channel();
//服务端管道关闭的监听器并同步阻塞,知道channel关闭,线程才会往下执行,结束进程。主线程执行到这里就wait子线程结束,子线程才是真正监听和接收请求的。子线程就是netty启动的监听端口的线程。
//即closeFuture()是 开启了一个channel的监听器,负责监听channel是否关闭的状态,如果未来监听到channel关闭了,子线程才会释放,syncUninterruptibly()让主线程同步等待子线程结果。
//补充:channel.close()才是主动关闭通道的方法。
//监听关闭的channel,设置为同步方式
channel.closeFuture().sync();
log.info("服务端启动成功");
} catch (Exception e) {
log.error(e.getMessage(),e);
}finally {
//优雅的关闭线程组
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
创建nettyServerHandler类,描述:处理打开关闭通道,接收消息分配处理等操作
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**通道打开,关闭,接收消息处理类,异常捕获*/
public class NettyServerHandler extends SimpleChannelInboundHandler<Object>{
private static Logger log=LoggerFactory.getLogger(NettyServerHandler.class);
//捕获异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
//打开通道
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//连接默认添加
NettyCache.defaultGroup.add(ctx.channel());
log.info("客户端与服务端连接开启:"+ctx.channel().remoteAddress().toString());
}
//关闭通道
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//移除通道以及通道对应的消息
if(NettyCache.defaultGroup.contains(ctx.channel())) {
NettyCache.defaultGroup.remove(ctx.channel());
}
if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
NettyCache.channels.get(NettyCache.channelMessage.get(ctx.channel().id()).getIntValue("pushType"))
.remove(ctx.channel());
}
if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
NettyCache.channelMessage.remove(ctx.channel().id());
log.info("客户端与服务端连接关闭:"+ctx.channel().remoteAddress().toString());
}
}
//通道读取数据完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//接收消息
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
if(msg instanceof WebSocketFrame) {
handlerWebocketFrame(ctx, (WebSocketFrame) msg);
}
}
//返回应答消息
private void handlerWebocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame) {
String requestMessage=((TextWebSocketFrame)frame).text();
JSONObject jsonMessage=JSONObject.parseObject(requestMessage);
log.info("接收到客户端发送的消息"+requestMessage);
if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
NettyCache.channels.get(NettyCache.channelMessage.get(ctx.channel().id()).getIntValue("pushType")).remove(ctx.channel());
}
NettyCache.channels.get(jsonMessage.getIntValue("pushType")).add(ctx.channel());
NettyCache.channelMessage.put(ctx.channel().id(), jsonMessage);
if(NettyCache.defaultGroup.contains(ctx.channel())) {
NettyCache.defaultGroup.remove(ctx.channel());
}
}
}
创建html,很简单一个页面:websocket.html
<html>
<head>
<title>推送消息页面</title>
</head>
<body>
<div>
你好!<p th:text="${name}"></p>
</div>
<input id="text" type="text" /><button onclick="send()">Send</button> <button onclick="closeWebSocket()">Close</button>
<div id="message"></div>
<script type="text/javascript">
var websocket = null;
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window){
websocket = new WebSocket("ws://localhost:28095/websocket");
}
else{
alert('Not support websocket')
}
//连接发生错误的回调方法
websocket.onerror = function(){
setMessageInnerHTML("error");
};
//连接成功建立的回调方法
websocket.onopen = function(event){
setMessageInnerHTML("open");
}
//接收到消息的回调方法
websocket.onmessage = function(event){
setMessageInnerHTML(event.data);
}
//连接关闭的回调方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function(){
websocket.close();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML){
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//关闭连接
function closeWebSocket(){
websocket.close();
}
//发送消息
function send(){
var zhi = document.getElementById('text').value;
var message={
"pushType":"1",
"message":zhi
}
websocket.send(JSON.stringify(message));
}
</script>
</body>
</html>
好了,到此主要代码完毕,完整代码,请移步:
https://github.com/wangdonghuihuang/HappyKing
敬请Fork跟Star。有问题可企鹅群--535296702,群里是一群可爱的小伙伴,探讨技术与吹牛打咖样样俱全哦!