响应式编程之手写Reactor-Netty

前言

从使用到源码,研究了很久WebFlux及Reactor

响应式编程之Reactor

响应式编程之Reactive streams

响应式编程之手写Reactor

响应式编程之WebFlux

响应式编程之Reactor-Netty

今天准备整合一下知识,自己写出一个类似Reactor-Netty的框架,可以练习一下Reactor的使用,同时回顾一下netty的知识

原材料即ReactorNetty

最终实现如下的效果即可,既可以像Reactor-Netty一样写一个接口,并支持响应式返回,底层使用Netty进行网络通讯

DisposableServer server = HttpServer.create().port(7892) // 绑定端口
        .route( // 路由
                routes -> routes.get("/hello", (request, response) ->
                        response.sendString(Mono.just("Hello World"))
                ).get("/hello2", (request, response) ->
                        response.sendString(Mono.just("Hello World2"))
                )
        )
        .bindNow();
server.onDispose().block();

此时访问端口7892的"/hello"路径就会返回“Hello World”

依赖

要实现出这样的效果,首先就是要引入两个依赖ReactorNetty

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.3.8.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.51.Final</version>
</dependency>

netty服务

然后思路也并不复杂,不过就是定义一个类:HttpServer,然后create方法时启动一个Netty服务端即可,尝试一下如下

public class HttpServerV1 {

    ServerBootstrap bootstrap; // netty服务构造器

    public static HttpServerV1 create() {// 静态创建
        return new HttpServerV1();
    }

    public HttpServerV1() { // 初始化,开始创建netty服务端构造器
        bootstrap = new ServerBootstrap();
        bootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_REUSEADDR, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) { // 用一个简单的时间处理器,单纯打印
                        ch.pipeline().addLast(new HttpRequestDecoder(), new HttpResponseEncoder(), new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                if (msg instanceof DefaultHttpRequest) {
                                    DefaultHttpRequest request = (DefaultHttpRequest) msg; // 请求信息
                                    ByteBuf result = Unpooled.copiedBuffer("Hello World: " + request.uri(), CharsetUtil.UTF_8);
                                    DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, result);
                                    ctx.writeAndFlush(response); // 返回
                                    ctx.channel().close(); // 关闭连接
                                }
                            }
                        });
                    }
                });
    }

    public HttpServerV1 port(int port) { // 设置端口
        bootstrap.localAddress(new InetSocketAddress(port));
        return this;
    }

    public HttpServerV1 bindNow() { // 开始绑定端口
        bootstrap.bind();
        return this;
    }
}

有了netty很简单就写完了,一个简单的web接口:请求后返回“hello world”+ 请求路径,使用如下

public static void main(String[] args) {
    HttpServerV1.create().port(7893).bindNow();
}

此时浏览器访问7893端口,输出“Hello world”+ 请求路径

Hello world

守护线程&阻塞

此时再回头看reactor-netty的使用例子,有一句server.onDispose().block(),意思是阻塞至通道服务关闭,如果去掉block()方法则运行的服务很快结束了

去掉block()
程序直接结束

这里我当时比较奇怪,为什么我写的HttpServer会一直运行不需要写什么阻塞

调查了一下,发现原来reactor-netty创建的NioEventLoop都是守护线程,所以main线程如果结束后netty就停止了,至于为什么是守护线程,可能是因为为了回收资源吧

总之不管因为什么,我也这么干吧,先建一个线程工厂,生产的线程都是守护线程

public class ReactorNettyThreadFactory implements ThreadFactory {
    AtomicInteger threadNo = new AtomicInteger(0);
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, "reactor-nio-" + (threadNo.incrementAndGet()));
        thread.setDaemon(true); // 守护线程
        return thread;
    }
}

此时Netty服务初始化代码变为

 ThreadFactory threadFactory = new ReactorNettyThreadFactory();
 bootstrap
    .group(new NioEventLoopGroup(1, threadFactory), new NioEventLoopGroup(threadFactory)

这是所有的EventLoop的线程都是守护线程,如果main方法执行完毕程序就结束了,这样肯定不行,所以main方法中一定要加上阻塞才能让服务一直运行

阻塞到什么时候呐,我们是web服务程序,应该阻塞到服务通道关闭,而刚好Netty的bind()方法可以获取到channel关闭的Future,此时bindNow方法变为如下

private ChannelFuture closeFuture; // 通道的关闭的Future
public HttpServer bindNow() {
    closeFuture = bootstrap.bind().channel().closeFuture();
    return this;
}

main方法如何阻塞到channel关闭呐,一个closeFuture.sync()其实就可以,但我们使用Reactor,当然要发挥Reactor的优势,因为我们可能还会在close事件发生时订阅一些操作,所以我们把closeFuture转换为Reactor的Mono发布者,发布得就是通道关闭事件,取名为onDispose,即服务关闭的发布者

public Mono<Void> onDispose() { // 这里源码实现更复杂,简化一下
    return Mono.create(sink->{
        closeFuture.addListener((ChannelFutureListener) future -> sink.success());
    });
}

此时回到使用,使用代码如下:

public static void main(String[] args) {
    HttpServer httpServer = HttpServer.create()
            .port(7893)
            .bindNow();
    httpServer.onDispose().block();
}

感觉上就和reactor-netty的使用很像了,如果不block(),程序立马结束

但此时我们的web服务只有一个,无法根据路径走不同的方法,所以下一步:加路由

路由

路由也好理解,就是一个path到方法的映射map,先对照reactor-netty学一下我们的方法应该是如何抽象

首先有两个参数:request(用于获取请求的参数),response(用于写回响应)

request简单一点直接用netty的DefaultHttpRequest

但response可不简单,它有一个send方法用于写回数据,它接受的参数是一个Publisher,所以这个方法的作用是在Publisher发布时能写回数据至客户端channel,所以send方法本质是订阅一个程序数据准备好后,发布数据至客户端的步骤,由于writeAndFlush也是异步操作,所以要再返回一个Publisher发布写完事件,以便后续关闭通道的相关处理,由于这个发布者只是事件没有数据所以是Void,整个过程使用flatMap即可实现,如下

public class HttpServerResponse {

    private ChannelHandlerContext ctx;

    public HttpServerResponse(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    public Mono<Void> sendString(Mono<String> publisher) {
        return send(publisher.flatMap(content-> Mono.just(Unpooled.copiedBuffer(content, CharsetUtil.UTF_8))));
    }

    public Mono<Void> send(Mono<ByteBuf> publisher) {
        return publisher.flatMap(content-> Mono.create(sink-> {
            ChannelFuture channelFuture = ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content));
            channelFuture.addListener(future -> {
                sink.success();
            });
        }));
    }
}

此时我们的自定义方法的结构出来了,两个参数:netty的HttpRequest和自己封装的HttpServerResponse,一个返回结果:Publisher<Void>

可以用JDK的BiFunction代表方法的抽象

BiFunction<? super HttpRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler

我们把一个映射和方法的对应用实体描述一下:

@AllArgsConstructor
static final class HttpRouteHandler {
    private String path; // 路径
    private BiFunction<? super HttpRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler; // 方法

    public Publisher<Void> apply(HttpRequest request,
                                 HttpServerResponse response) { // 执行方法
        return handler.apply(request, response);
    }

    public boolean test(HttpRequest request) { // 是否是某个请求
        return request.uri().equals(path);
    }
}

再用一个集合存储所有path->方法的映射

public class HttpServerRoutes {

    private List<HttpRouteHandler> handlers = new ArrayList<>(); // 映射集合

    // 添加get请求path和方法映射
    public HttpServerRoutes get(String path,
                                BiFunction<? super HttpRequest, ? super HttpServerResponse, ? extends Publisher<Void>> handler) {
        handlers.add(new HttpRouteHandler(path, handler));
        return this;
    }

    // 选择路由对应的处理方法执行
    public Publisher<Void> apply(HttpRequest request, HttpServerResponse response) {
        for (HttpRouteHandler handler : handlers) {
            if (handler.test(request)) { // 路径对应上
                return handler.apply(request, response); // 执行
            }
        }
        return Mono.empty();
    }

}

最终

最后就是我们的HttpServer构建器,要可以配置路由,并再请求到达时执行路由的方法,完整代码如下

public class HttpServer {

    ServerBootstrap bootstrap; // netty服务构造器

    ChannelFuture closeFuture; // 通道的关闭的Future

    HttpServerRoutes handler; // 路由

    public static HttpServer create() {
        return new HttpServer();
    }

    /**
     * 初始化,开始创建netty服务端构造器
     */
    public HttpServer() {
        bootstrap = new ServerBootstrap();
        ThreadFactory threadFactory = new ReactorNettyThreadFactory();
        bootstrap.group(new NioEventLoopGroup(1, threadFactory), new NioEventLoopGroup(threadFactory))
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_REUSEADDR, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数
                    @Override
                    protected void initChannel(SocketChannel ch) { // 用一个简单的时间处理器,单纯打印
                        ch.pipeline().addLast(new HttpRequestDecoder(), new HttpResponseEncoder(), new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                if (msg instanceof DefaultHttpRequest) {
                                    DefaultHttpRequest request = (DefaultHttpRequest) msg; // 请求
                                    HttpServerResponse response = new HttpServerResponse(ctx); // 响应
                                    handler.apply(request, response) // 执行方法
                                    .subscribe(new ChannelDisposeSubscriber(ctx)); // 订阅
                                }
                            }
                        });
                    }
                });
    }

    public HttpServer port(int port) {
        bootstrap.localAddress(new InetSocketAddress(port));
        return this;
    }


    /**
     * 设置路由
     * @return
     */
    public HttpServer route(Consumer<? super HttpServerRoutes> routesBuilder) {
        handler = new HttpServerRoutes();
        routesBuilder.accept(handler);
        return this;
    }

    public HttpServer bindNow() {
        closeFuture = bootstrap.bind().channel().closeFuture();
        return this;
    }

    public Mono<Void> onDispose() {
        return Mono.create(sink->{
            closeFuture.addListener((ChannelFutureListener) future -> sink.success());
        });
    }
}

其中handler.apply方法完成了订阅操作,订阅的就是响应已写回客户端的事件,所以对应的处理就是关闭客户端通道

@AllArgsConstructor
public class ChannelDisposeSubscriber implements Subscriber<Void> {

    private ChannelHandlerContext ctx;

    @Override
    public void onComplete() {
        ctx.close(); // 写回响应数据后关闭通道
    }
}

到此一个基于基于Netty的http服务就写完了,可以接受响应式的返回结果,使用如下

public static void main(String[] args) {
    HttpServer httpServer = HttpServer.create()
            .port(7893)
            .route(routes -> routes
                    .get("/hello",
                            (request, response) -> response.sendString(Mono.just("Hello World"))
                    ).get("/hello2",
                            (request, response) -> response.send(Mono.just(Unpooled.copiedBuffer("Hello World2", CharsetUtil.UTF_8)))
                    ).get("/hello3",
                            (request, response) -> response.sendString(Mono.create(sink->{
                                try {Thread.sleep(1000);} catch (InterruptedException e) {}
                                sink.success("Hello World3");
                            }))
                    )
            )
            .bindNow();
    httpServer.onDispose().block();
}

测试结果如下

测试

小结

不得不说,初次使用Reactor写功能,跟原命令行写法的思维差异真的很大,总结如下

  • 服务维护一个path至方法的映射
  • 请求到达执行对应方法,反回的是一个发布者,发布的事件是请求处理结束
  • 执行方法后得到返回的发布者后立即订阅,订阅的处理是关闭连接
  • 方法内部通过执行response.send方法可以给执行结果发布者(类似Mono和Flux)添加一个把结果发送到客户端的处理过程

个人认为response.send也应该封装进框架中,而不是让用户自己写,因为我们写一个接口一定是要有返回值的,就像如果使用的是WebFlux,一般请求是不需要管response的,方法直接返回Mono就可以了

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容