RSocket协议初识-Springboot中使用(一)

前言

前几天无聊翻SpringBoot官方文档,无意中发现文档中增加了一个章节叫RSocket协议的鬼东西,遂研究了一下。

RSocket是什么?

RSocket是一种二进制字节流传输协议,位于OSI模型中的5~6层,底层可以依赖TCP、WebSocket、Aeron协议。

RSocket设计目标是什么?

1、支持对象传输,包括request\response、request\stream、fire and forget、channel
2、支持应用层流量控制
3、支持单连接双向、多次复用
4、支持连接修复
5、更好的使用WebSocket和Aeron协议

RSocket与其他协议有什么区别?

对比Http1.x

  • Http1.x只支持request\response,但是现实应用中并不是所有请求都需要有回应(Fire And Forget)、有的需求需要一个请求返回一个数据流(request\stream)、有的还需要双向数据传输(channel)。

对比Http2.x

  • http2.x不支持应用层流量控制、伪双向传输,即服务端push数据本质上还是对客户端请求的响应,而不是直接推送。RSocket做到了真正的双向传输,使得服务端可以调用客户端服务,使得服务端和客户端在角色上完全对等,即两边同时是Requester和Responder。

对比grpc

对比TCP

  • 其实两者不在一个层面,为啥要作比较呢,因为netty让tcp层的编程也很容易,但是需要自定义传输协议,比如定义header、body长度等等,用起来还是很麻烦的。

对比WebSocket

  • websocket不支持应用层流量控制,本质上也是一端请求另一端响应,不支持连接修复。

RSocket协议的形式是什么?

  • 连接上传输的数据是流(Stream)
  • 流(Stream)由帧(Frame)组成
  • 帧(Frame)包含了元数据(MetaData)与业务数据(Data)

结论:

基于RSocket协议,我们的业务数据会被打包成帧,并以帧流的形式在客户端与服务端互相传输。所以RSocket的所有特性都是基于这个帧流实现的。后续有时间会针对每个帧类型做解析。

RSocket适用于哪些场景?

1、移动设备与服务器的连接。

  • 数据双向传输,且支持流量控制。支持背压,背压的意思:如果客户端请求服务端过快,那么服务端会堆积请求,最终耗光资源。有了背压服务端可以根据自己的资源来控制客户端的请求速度,即调用客户端告诉他别发那么快。
  • 支持连接修复,比如手机进地铁之后,网络断开一段时间,其他协议需要重新建立连接,RSocket则可以修复连接继续传输帧数据。

2、微服务场景。

  • spring cloud目前支持的http协议,不能fire and forget、不能请求流数据、不能单连接双向调用;替换成RSocket之后可以满足以上需求的同时提高性能。且针对服务治理、负载均衡等RSocket都在慢慢完善。

3、由于微服务和移动设备的普及,RSocket火起来应该就是这几年的事儿。

BB了这么多你给我上个代码

SpringBoot中的使用

  • step1、构建SpringBoot项目,引入依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>
  • step2、编写需要传输的消息类和服务器类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.Instant;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    private String from;
    private String to;
    private long index;
    private long created = Instant.now().getEpochSecond();

    public Message(String from, String to) {
        this.from = from;
        this.to = to;
        this.index = 0;
    }

    public Message(String from, String to, long index) {
        this.from = from;
        this.to = to;
        this.index = index;
    }
}
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

@Slf4j
@Controller
public class RSocketController {

    private final List<RSocketRequester> CLIENTS = new ArrayList<>();

    @MessageMapping("request-response")
    public Message requestResponse(Message request) {
        log.info("收到请求: {}", request);
        return new Message("服务端", "客户端");
    }

    @MessageMapping("fire-and-forget")
    public void fireAndForget(Message request) {
        log.info("收到fire-and-forget请求: {}", request);
    }

    @MessageMapping("stream")
    Flux<Message> stream(Message request) {
        log.info("收到流式请求: {}", request);
        return Flux
                .interval(Duration.ofSeconds(1))
                .map(index -> new Message(”服务端“, "客户端", index))
                .log();
    }

    @MessageMapping("channel")
    Flux<Message> channel(final Flux<Duration> settings) {
        return settings
                .doOnNext(setting -> log.info("发射间隔为 {} 秒.", setting.getSeconds()))
                .switchMap(setting -> Flux.interval(setting)
                        .map(index -> new Message("服务端", "客户端", index)))
                .log();
    }
}
  • step3、配置文件里增加配置项
spring.main.lazy-initialization=true
spring.rsocket.server.port=7000
  • step4、编写客户端代码

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.Instant;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    private String from;
    private String to;
    private long index;
    private long created = Instant.now().getEpochSecond();

    public Message(String from, String to) {
        this.from = from;
        this.to = to;
        this.index = 0;
    }

    public Message(String from, String to, long index) {
        this.from = from;
        this.to = to;
        this.index = index;
    }
}
import java.time.Duration;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
@RestController
public class RSocketClient {

    private final RSocketRequester rsocketRequester;
    private static Disposable disposable;

    @Autowired
    public RSocketClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {
        this.rsocketRequester = rsocketRequesterBuilder
                .rsocketStrategies(strategies)
                .connectTcp("localhost", 7000)
                .block();

        this.rsocketRequester.rsocket()
                .onClose()
                .doOnError(error -> log.warn("发生错误,链接关闭"))
                .doFinally(consumer -> log.info("链接关闭"))
                .subscribe();
    }

    @PreDestroy
    void shutdown() {
        rsocketRequester.rsocket().dispose();
    }

    @GetMapping("request-response")
    public Message requestResponse() {
        Message message = this.rsocketRequester
                .route("request-response")
                .data(new Message("客户端", "服务器"))
                .retrieveMono(Message.class)
                .block();
        log.info("客户端request-response收到响应 {}", message);
        return message;
    }

    @GetMapping("fire-and-forget")
    public String fireAndForget() {
        this.rsocketRequester
                .route("fire-and-forget")
                .data(new Message("客户端", "服务器"))
                .send()
                .block();
        return "fire and forget";
    }

    @GetMapping("stream")
    public String stream() {
        disposable = this.rsocketRequester
                .route("stream")
                .data(new Message("客户端", "服务器"))
                .retrieveFlux(Message.class)
                .subscribe(message -> log.info("客户端stream收到响应 {}", message));
        return "stream";
    }

    @GetMapping("channel")
    public String channel() {
        Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));
        Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));
        Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));
        Flux<Duration> settings = Flux.concat(setting1, setting2, setting3)
                .doOnNext(d -> log.info("客户端channel发送消息 {}", d.getSeconds()));
        disposable = this.rsocketRequester
                .route("channel")
                .data(settings)
                .retrieveFlux(Message.class)
                .subscribe(message -> log.info("客户端channel收到响应 {}", message));
        return "channel";
    }

}
  • step5、启动服务端、启动客户端,打开浏览器访问localhost:8080/fire-and-forget等测试效果

代码解析

  • @MessageMapping:Spring提供的注解,用于路由,与@GetMapping等功能类似
  • Mono:响应式编程里用于返回0-1个结果
  • Flux:响应式编程里用于返回0-N个结果
  • Disposable:断流器,为true的时候两边不能传输数据

What Next?

  • 协议原理解析
  • 由于RSocket社区还不够活跃,Git上的代码也是刚刚起步,还在不断更新中,相关功能也在不断完善中,后续随着官方新内容的更新我也会跟着更新。
  • RSocket中很多概念如Mono、Flux、Disposable、背压、流式处理等都是响应式编程中的概念,想了解响应式编程可以查看:http://reactivex.io/ 中的文档,其中包括了RXJava等RX系列的各种语言的Demo。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,036评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,046评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,411评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,622评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,661评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,521评论 1 304
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,288评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,200评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,644评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,837评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,953评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,673评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,281评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,889评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,011评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,119评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,901评论 2 355