RSocket协议初识

文章目录

  • 前言
  • RSocket是什么?
  • RSocket设计目标是什么?
  • RSocket与其他协议有什么区别?
    • 对比Http1.x
    • 对比Http2.x
    • 对比grpc
    • 对比TCP
    • 对比WebSocket
    • 结论
  • RSocket适用于哪些场景?
    • 1、移动设备与服务器的连接。
    • 2、微服务场景。
    • 3、由于微服务和移动设备的普及,RSocket火起来应该就是这几年的事儿。

前言

前几天无聊翻SpringBoot官方文档,无意中发现文档中增加了一个章节叫RSocket协议的鬼东西,遂研究了一下。

RSocket是什么?

RSocket是一种二进制字节流传输协议,位于OSI模型中的5~6层,底层可以依赖TCP、WebSocket、Aeron协议。

RSocket设计目标是什么?

1、支持对象传输,包括request\response、request\stream、fire and forget、channel
2、支持应用层流量控制
3、支持单连接双向、多次复用
4、支持连接修复
5、更好的使用WebSocket和Aeron协议

RSocket与其他协议有什么区别?

对比Http1.x

Http1.x只支持request\response,但是现实应用中并不是所有请求都需要有回应(Fire And Forget)、有的需求需要一个请求返回一个数据流(request\stream)、有的还需要双向数据传输(channel)。

对比Http2.x

http2.x不支持应用层流量控制、伪双向传输,即服务端push数据本质上还是对客户端请求的响应,而不是直接推送。RSocket做到了真正的双向传输,使得服务端可以调用客户端服务,使得服务端和客户端在角色上完全对等,即两边同时是Requester和Responder。

对比grpc

grpc需要依赖protobuf,本质上还是http2.x。RSocket不限制编解码,可以是json、protobuf等等。
性能上grpc要差一些:详见压测对比,https://dzone.com/articles/rsocket-vs-grpc-benchmark

对比TCP

其实两者不在一个层面,为啥要作比较呢,因为netty让tcp层的编程也很容易,但是需要自定义传输协议,比如定义header、body长度等等,用起来还是很麻烦的。

对比WebSocket

websocket不支持应用层流量控制,本质上也是一端请求另一端响应,不支持连接修复。
RSocket协议的形式是什么?
连接上传输的数据是流(Stream)
流(Stream)由帧(Frame)组成
帧(Frame)包含了元数据(MetaData)与业务数据(Data)

结论

基于RSocket协议,我们的业务数据会被打包成帧,并以帧流的形式在客户端与服务端互相传输。所以RSocket的所有特性都是基于这个帧流实现的。后续有时间会针对每个帧类型做解析。

RSocket适用于哪些场景?

1、移动设备与服务器的连接。

数据双向传输,且支持流量控制。支持背压,背压的意思:如果客户端请求服务端过快,那么服务端会堆积请求,最终耗光资源。有了背压服务端可以根据自己的资源来控制客户端的请求速度,即调用客户端告诉他别发那么快。
支持连接修复,比如手机进地铁之后,网络断开一段时间,其他协议需要重新建立连接,RSocket则可以修复连接继续传输帧数据。

2、微服务场景。

spring cloud目前支持的http协议,不能fire and forget、不能请求流数据、不能单连接双向调用;替换成RSocket之后可以满足以上需求的同时提高性能。且针对服务治理、负载均衡等RSocket都在慢慢完善。

3、由于微服务和移动设备的普及,RSocket火起来应该就是这几年的事儿。

BB了这么多你给我上个代码
SpringBoot中的使用
step1、构建SpringBoot项目,引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-rsocket</artifactId></dependency>

step2、编写需要传输的消息类和服务器类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.time.Instant;@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {private String from;private String to;private long index;private long created = Instant.now().getEpochSecond();public Message(String from, String to) {this.from = from;this.to = to;this.index = 0;}public Message(String from, String to, long index) {this.from = from;this.to = to;this.index = index;}}
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;@Slf4j
@Controller
public class RSocketController {private final List<RSocketRequester> CLIENTS = new ArrayList<>();@MessageMapping("request-response")public Message requestResponse(Message request) {log.info("收到请求: {}", request);return new Message("服务端", "客户端");}@MessageMapping("fire-and-forget")public void fireAndForget(Message request) {log.info("收到fire-and-forget请求: {}", request);}@MessageMapping("stream")Flux<Message> stream(Message request) {log.info("收到流式请求: {}", request);return Flux.interval(Duration.ofSeconds(1)).map(index -> new Message(”服务端“, "客户端", index)).log();}@MessageMapping("channel")Flux<Message> channel(final Flux<Duration> settings) {return settings.doOnNext(setting -> log.info("发射间隔为 {} 秒.", setting.getSeconds())).switchMap(setting -> Flux.interval(setting).map(index -> new Message("服务端", "客户端", index))).log();}}

step3、配置文件里增加配置项

spring.main.lazy-initialization=true
spring.rsocket.server.port=7000

step4、编写客户端代码

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.time.Instant;@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {private String from;private String to;private long index;private long created = Instant.now().getEpochSecond();public Message(String from, String to) {this.from = from;this.to = to;this.index = 0;}public Message(String from, String to, long index) {this.from = from;this.to = to;this.index = index;}}
import java.time.Duration;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import lombok.extern.slf4j.Slf4j;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;@Slf4j
@RestController
public class RSocketClient {private final RSocketRequester rsocketRequester;private static Disposable disposable;@Autowiredpublic RSocketClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {this.rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies).connectTcp("localhost", 7000).block();this.rsocketRequester.rsocket().onClose().doOnError(error -> log.warn("发生错误,链接关闭")).doFinally(consumer -> log.info("链接关闭")).subscribe();}@PreDestroyvoid shutdown() {rsocketRequester.rsocket().dispose();}@GetMapping("request-response")public Message requestResponse() {Message message = this.rsocketRequester.route("request-response").data(new Message("客户端", "服务器")).retrieveMono(Message.class).block();log.info("客户端request-response收到响应 {}", message);return message;}@GetMapping("fire-and-forget")public String fireAndForget() {this.rsocketRequester.route("fire-and-forget").data(new Message("客户端", "服务器")).send().block();return "fire and forget";}@GetMapping("stream")public String stream() {disposable = this.rsocketRequester.route("stream").data(new Message("客户端", "服务器")).retrieveFlux(Message.class).subscribe(message -> log.info("客户端stream收到响应 {}", message));return "stream";}@GetMapping("channel")public String channel() {Mono<Duration> setting1 = Mono.just(Duration.ofSeconds(1));Mono<Duration> setting2 = Mono.just(Duration.ofSeconds(3)).delayElement(Duration.ofSeconds(5));Mono<Duration> setting3 = Mono.just(Duration.ofSeconds(5)).delayElement(Duration.ofSeconds(15));Flux<Duration> settings = Flux.concat(setting1, setting2, setting3).doOnNext(d -> log.info("客户端channel发送消息 {}", d.getSeconds()));disposable = this.rsocketRequester.route("channel").data(settings).retrieveFlux(Message.class).subscribe(message -> log.info("客户端channel收到响应 {}", message));return "channel";}}

step5、启动服务端、启动客户端,打开浏览器访问localhost:8080/fire-and-forget等测试效果
代码解析

  • @MessageMapping:Spring提供的注解,用于路由,与@GetMapping等功能类似
  • Mono:响应式编程里用于返回0-1个结果
  • Flux:响应式编程里用于返回0-N个结果
  • Disposable:断流器,为true的时候两边不能传输数据

What Next?

  • 协议原理解析
  • 由于RSocket社区还不够活跃,Git上的代码也是刚刚起步,还在不断更新中,相关功能也在不断完善中,后续随着官方新内容的更新我也会跟着更新。
  • RSocket中很多概念如Mono、Flux、Disposable、背压、流式处理等都是响应式编程中的概念,想了解响应式编程可以查看:http://reactivex.io/ 中的文档,其中包括了RXJava等RX系列的各种语言的Demo。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509608.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HugeGraph Server/Hubble安装使用

文章目录HugeGraph Server1 概述2 依赖2.1 安装JDK-1.83 部署3.1 下载tar包4 安装启动4.1 解压4.2 配置Hbase5 访问Server5.1 服务启动状态校验6 停止Server7 多图配置HugeGraph-Hubble 基于Web的可视化图形界面1.概述2.安装3 使用3.1创建图HugeGraph Server 1 概述 HugeGrap…

Socket模型

两种I/O模式 一.选择模型 二.异步选择 三.事件选择 四.重叠I/O模型 五.完成端口模型 五种I/O模型的比较 两种I/O模式 1、 两种I/O模式 阻塞模式&#xff1a;执行I/O操作完成前会一直进行等待&#xff0c;不会将控制权交给程序。套接字默认为阻塞模式。可以通过多线程技术进行处…

数据结构实验之二叉树的建立与遍历

题目描述 已知一个按先序序列输入的字符序列&#xff0c;如abc,,de,g,,f,,,(其中逗号表示空节点)。请建立二叉树并按中序和后序方式遍历二叉树&#xff0c;最后求出叶子节点个数和二叉树深度。 输入 输入一个长度小于50个字符的字符串。输出 输出共有4行&#xff1a; 第1行输出…

Mysql 集群双主双从安装使用详细讲解

文章目录下载Mysql安装单机Mysql配置Mysql集群双Master配置master1配置master2配置配置说明双Slave配置Slave1配置Slave2配置双 Master 机上创建账号&#xff0c;并授权远程复制查询Master1的状态查询Master2的状态双Slave机上执行 change master 同步Master数据Slave1 复制 Ma…

vs2010常见错误记录

&#xff08;1&#xff09;在Debug模式下正常&#xff0c;在Release模式下程序出现异常 可能原因&#xff1a;配置的链接dll有问题&#xff0c;后缀带d和不带d混淆&#xff1b;在Release模式下类中的变量不会自动进行初始化&#xff0c;需要手动初始化&#xff1b;

java中的IO操作之File类

Java的集合框架: 类和接口存在于java.util包中. Java的IO: 类和接口存在于java.io包中. 学习方法: 文档在手,天下我有! ----------------------------------------------------------------------------------------- 讲IO操作之前,必须要先讲File类(文件/目录…

数据结构实验之二叉树三:统计叶子数

题目描述 已知二叉树的一个按先序遍历输入的字符序列&#xff0c;如abc,,de,g,,f,,, (其中,表示空结点)。请建立二叉树并求二叉树的叶子结点个数。 输入 连续输入多组数据&#xff0c;每组数据输入一个长度小于50个字符的字符串。 输出 输出二叉树的叶子结点个数。 示例输入 ab…

企业需要的C++程序员

今天对智联招聘网站和前程无忧招聘网站对“任职要求"一栏进行了相关技术要求作了简要的统计&#xff0c;主要提取的是在任职要求中明确提到的信息&#xff0c;招聘城市针对长沙和深圳&#xff0c;招聘公司为随机抽取&#xff0c;列了以下技术要求出现的次数。希望在自己的…

ElasticSearch 新增节点,横向扩容

文章目录查看当前ES状态新增节点配置遇到的问题查看当前ES状态 这里默认都是在Kibana进行操作 GET _cluster/health{"cluster_name" : "bjga-gz","status" : "yellow","timed_out" : false,"number_of_nodes" :…

输入和输出(IO)概述

什么是IO:(Input/Output):输入和输出. IO设备: 和电脑通信的设备. 输入设备:麦克风,扫描器,键盘,鼠标等. 输出设备:显示器,打印机,投影仪,耳机,音响等. 为什么程序需要IO呢? 案例1:打游戏操作,得分比较高,存储游戏的信息(XXX-888分). 此时需要把游戏中的数据存储起来,只能…

求二叉树的深度

题目描述 已知一颗二叉树的中序遍历序列和后序遍历序列&#xff0c;求二叉树的深度。 输入 输入数据有多组&#xff0c;输入T组数据。每组数据包括两个长度小于<font face"\"Times" new"" roman,"" serif\""" style"…

优秀的程序员怎么做

我觉得优秀的程序员&#xff0c;不仅优秀在代码上&#xff0c;更重要在思维等方面。 我认为一个优秀程序员是谨慎的&#xff0c;在有需求与任务时&#xff0c;会不断的澄清需求与任务&#xff0c;并且多次确认想要的结果&#xff0c;而非闷头听着或者看着需求与任务列表。 我…

Linux 挂载磁盘目录

文章目录查看Linux磁盘信息格式化磁盘挂载修改重启后自动挂载fstab 文件格式介绍卸载查看Linux磁盘信息 [rootb001 ~]# fdisk -lDisk /dev/vda: 53.7 GB, 53687091200 bytes, 104857600 sectors Units sectors of 1 * 512 512 bytes Sector size (logical/physical): 512 by…

linux学习路线

很多同学接触Linux不多&#xff0c;对Linux平台的开发更是一无所知。而现在的趋势越来越表明&#xff0c;作为一个优秀的软件开发人员&#xff0c;或计算机IT行业从业人员&#xff0c;掌握Linux是一种很重要的谋生资源与手段。下来我将会结合自己的几年的个人开发经验&#xff…

java中有关文件流的操作

文件流: 顾名思义,程序和文件打交道. 此时我们谈及的文件,值得是纯文本文件(txt的,不要使用Word,Excel), 在字节流中,暂时不要使用中文. FileInputStream: 文件的字节输入流 FileOutputStream: 文件的字节输出流 FileReader:文件的字符输入流 FileWriter:文件的字符输出流…

数据结构实验之二叉树一:树的同构

题目描述 给定两棵树T1和T2。如果T1可以通过若干次左右孩子互换就变成T2&#xff0c;则我们称两棵树是“同构”的。例如图1给出的两棵树就是同构的&#xff0c;因为我们把其中一棵树的结点A、B、G的左右孩子互换后&#xff0c;就得到另外一棵树。而图2就不是同构的。 图1 …

Linxu服务器文件双向同步 rsync+sersync 详细讲解

文章目录Linxu服务器文件双向同步 rsyncsersync基础信息安装rsync安装配置根据配置文件创建相应的目录、文件、防火墙规则创建需要同步的目录创建同步用户防火墙启动rsyncd服务测试异常处理sersync安装sersync配置test21配置test22配置sersync启动test21启动test22启动测试新增…

resize和reverse的区别

首先&#xff0c;两个函数的功能是有区别的&#xff1a; reserve是容器预留空间&#xff0c;但并不真正创建元素对象&#xff0c;在创建对象之前&#xff0c;不能引用容器内的元素&#xff0c;因此当加入新的元素时&#xff0c;需要用push_back()/insert()函数。 resize是…

数据结构实验之二叉树五:层序遍历

题目描述 已知一个按先序输入的字符序列&#xff0c;如abd,,eg,,,cf,,,(其中,表示空结点)。请建立二叉树并求二叉树的层次遍历序列。 输入 输入数据有多行&#xff0c;第一行是一个整数t (t<1000)&#xff0c;代表有t行测试数据。每行是一个长度小于50个字符的字符串。输出 …

java中字符编码详解

字符编码的发展历程: 阶段1: 计算机只认识数字,我们在计算机里一切数据都是以数字来表示,因为英文符号有限, 所以规定使用的字节的最高位是0.每一个字节都是以0~127之间的数字来表示,比如A对应65,a对应97. 这就是美国标准信息交换码-ASCII. 阶段2: 随着计算机在全球的普及…