springboot整合netty推送

千言万语不如撸码实战,begin!

首先说明此篇文章只是重要部分代码示例,完整代码,末尾会有彩蛋。

netty配置类代码块:

创建constant类,描述:常量类,目前存放的是所有推送类型

/**常量类*/
public class Constant {
//测试推送1
    public static final int PUSH_TEST_ONE=1;
    //测试推送2
    public static final int PUSH_TEST_TWO=2;
}

创建ChildChannelHandler类,描述:初始化SokcetChannel(socket通道),添加socket通道到pipeline(管道),注册到WorkGroup(负责读写的nioEventLoop中),注册成功后,会调用pipeline的channelRegistered方法,将registered事件在pipeline上传播。

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
/**通过管道,添加handler,HttpServerCodec是由netty自己提供的助手类,可以理解为拦截器,当请求到服务端,我们需要做解码,响应到客户端做解码*/
        ch.pipeline().addLast("http-codec",new HttpServerCodec());
        ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
        ch.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket"));
        ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
//添加自定义的助手类
        ch.pipeline().addLast("handler",new NettyServerHandler());
    }

}

创建nettyCache类,描述:处理所有类型的通道保存类

import com.alibaba.fastjson.JSONObject;
import com.king.common.Constant;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**所有类型的通道保存类*/
@Component
public class NettyCache {

    public static Map<Integer,ChannelGroup> channels = new HashMap<Integer,ChannelGroup>();
    public static Map<ChannelId,JSONObject> channelMessage = new ConcurrentHashMap<ChannelId,JSONObject>();
    public static ChannelGroup defaultGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


    @PostConstruct
    public void initAllChannel(){
        String[] pushTypes = getAllPushType();
        for (int i=0;i<pushTypes.length;i++){
            channels.put(Integer.valueOf(pushTypes[i]),new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));
        }
    }

    /**
     * 获取所有推送的类型
     */
    private String[] getAllPushType(){
        Field[] fields = Constant.class.getDeclaredFields();
        String[] allType = null;
        List<String> pushType = new ArrayList<String>();
        for(int i = 0;i<fields.length;i++){
            String fieldName = fields[i].getName();
            if(fieldName.startsWith("PUSH_")){//
                try {
                    Object s = Constant.class.getField(fieldName).get(null);
                    pushType.add(s.toString());
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (NoSuchFieldException e) {
                    e.printStackTrace();
                }
            }
        }
        allType = pushType.toArray(new String[pushType.size()]);
        return allType;
    }
}

创建nettyServer类,描述:处理netty服务初始化启动,监听socket等操作。

import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.king.common.CommonConfig;
import com.king.util.ApplicationContextHelper;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@Service
public class NettyServer {
    private static Logger log=LoggerFactory.getLogger(NettyServer.class);
@PostConstruct
    public void initServer() {
    new Thread() {
        public void run() {
            new NettyServer().run();
        }
    }.start();
}
public void run() {
//定义一对线程组
//主线程组,用于接收客户端的连接,但是不做任何处理,跟老板一样不做事
    EventLoopGroup bossGroup=new NioEventLoopGroup();
//从线程组,老板线程组会把任务丢给它,让手下线程组去做任务
    EventLoopGroup workGroup=new NioEventLoopGroup();
    try {
        //netty服务器的创建,ServerBootstrap 是一个启动类
        ServerBootstrap socket=new ServerBootstrap();
//设置主从线程组
        socket.group(bossGroup,workGroup);
        //设置nio的双向通道
        socket.channel(NioServerSocketChannel.class);
//子处理器,用于处理workGroup
        socket.childHandler(new ChildChannelHandler());
        CommonConfig config=(CommonConfig) ApplicationContextHelper.getBean(CommonConfig.class);
        //启动server,并绑定端口号,同时启动方式为同步
        Channel channel=socket.bind(config.getWebSocketPort()).sync().channel();
        //服务端管道关闭的监听器并同步阻塞,知道channel关闭,线程才会往下执行,结束进程。主线程执行到这里就wait子线程结束,子线程才是真正监听和接收请求的。子线程就是netty启动的监听端口的线程。
        //即closeFuture()是 开启了一个channel的监听器,负责监听channel是否关闭的状态,如果未来监听到channel关闭了,子线程才会释放,syncUninterruptibly()让主线程同步等待子线程结果。
        //补充:channel.close()才是主动关闭通道的方法。
//监听关闭的channel,设置为同步方式
        channel.closeFuture().sync();
        log.info("服务端启动成功");
    } catch (Exception e) {
        log.error(e.getMessage(),e);
    }finally {
//优雅的关闭线程组
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
}
}

创建nettyServerHandler类,描述:处理打开关闭通道,接收消息分配处理等操作

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**通道打开,关闭,接收消息处理类,异常捕获*/
public class NettyServerHandler extends SimpleChannelInboundHandler<Object>{
    private static Logger log=LoggerFactory.getLogger(NettyServerHandler.class);
    //捕获异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
        }
    //打开通道
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//连接默认添加
        NettyCache.defaultGroup.add(ctx.channel());
        log.info("客户端与服务端连接开启:"+ctx.channel().remoteAddress().toString());
    }
    //关闭通道
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//移除通道以及通道对应的消息
        if(NettyCache.defaultGroup.contains(ctx.channel())) {
            NettyCache.defaultGroup.remove(ctx.channel());
        }
        if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
            NettyCache.channels.get(NettyCache.channelMessage.get(ctx.channel().id()).getIntValue("pushType"))
            .remove(ctx.channel());
        }
        if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
            NettyCache.channelMessage.remove(ctx.channel().id());
            
            log.info("客户端与服务端连接关闭:"+ctx.channel().remoteAddress().toString());
        }
    }
    //通道读取数据完成
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    //接收消息
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof WebSocketFrame) {
            handlerWebocketFrame(ctx, (WebSocketFrame) msg);
        }
    }
    
    //返回应答消息
    private void handlerWebocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame) {
        String requestMessage=((TextWebSocketFrame)frame).text();
        JSONObject jsonMessage=JSONObject.parseObject(requestMessage);
        log.info("接收到客户端发送的消息"+requestMessage);
        if(NettyCache.channelMessage.containsKey(ctx.channel().id())) {
            NettyCache.channels.get(NettyCache.channelMessage.get(ctx.channel().id()).getIntValue("pushType")).remove(ctx.channel());
        }
        NettyCache.channels.get(jsonMessage.getIntValue("pushType")).add(ctx.channel());
        NettyCache.channelMessage.put(ctx.channel().id(), jsonMessage);
        if(NettyCache.defaultGroup.contains(ctx.channel())) {
            NettyCache.defaultGroup.remove(ctx.channel());
        }
    }
}

创建html,很简单一个页面:websocket.html

<html>
<head>
<title>推送消息页面</title>
</head>
<body>
<div>
你好!<p th:text="${name}"></p>
</div>
<input id="text" type="text" /><button onclick="send()">Send</button>    <button onclick="closeWebSocket()">Close</button>
 <div id="message"></div>
 <script type="text/javascript">
    var websocket = null;
    //判断当前浏览器是否支持WebSocket
    if('WebSocket' in window){
        websocket = new WebSocket("ws://localhost:28095/websocket");
    }
    else{
        alert('Not support websocket')
    }
    //连接发生错误的回调方法
    websocket.onerror = function(){
        setMessageInnerHTML("error");
    };
    //连接成功建立的回调方法
    websocket.onopen = function(event){
        setMessageInnerHTML("open");
    }
    //接收到消息的回调方法
    websocket.onmessage = function(event){
        setMessageInnerHTML(event.data);
    }
    //连接关闭的回调方法
    websocket.onclose = function(){
        setMessageInnerHTML("close");
    }
    //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
    window.onbeforeunload = function(){
        websocket.close();
    }
    //将消息显示在网页上
    function setMessageInnerHTML(innerHTML){
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }
    //关闭连接
    function closeWebSocket(){
        websocket.close();
    }
    //发送消息
    function send(){
        var zhi = document.getElementById('text').value;
        var message={
            "pushType":"1",
            "message":zhi
        }
        websocket.send(JSON.stringify(message));
    }
</script>
</body>
</html>

好了,到此主要代码完毕,完整代码,请移步:
https://github.com/wangdonghuihuang/HappyKing
敬请Fork跟Star。有问题可企鹅群--535296702,群里是一群可爱的小伙伴,探讨技术与吹牛打咖样样俱全哦!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容