【SpringCloud笔记】(11)消息驱动之Stream

Stream

技术背景

底层不同模块可能使用不同的消息中间件,这就导致技术的切换,微服务的维护及开发变得麻烦起来

在这里插入图片描述

概述

官网:
https://spring.io/projects/spring-cloud-stream#overview
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/

Spring Cloud Stream中文指导手册:
https://m.wang1314.com/doc/webapp/topic/20971999.html

什么是SpringCloudStream

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框染

应用程序通过inputs 或者 outputsSpring Cloud Stream中binder对象交互

通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互

所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。

Spring Ccloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

目前仅支持RabbitMQ、Kafka

Stream设计思想

标准MQ流程

在这里插入图片描述

  • 生产者/消费者之间靠消息媒介传递信息内容(Message)

  • 消息必须走特定的通道(消息通道MessageChannel)

  • 消息通道里的消息如何被消费呢,谁负责收发处理(消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅)

为什么要引入Stream

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区

这些中间件的差异导致我们实际项目开发给我们造成了一定的困难,我们如果用了两个消息队列的其中一种,后面的业务需求,我们想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候SpringCloud Stream给我们提供了一种解耦合的方式。

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。

通过定义绑定器Binder作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离

Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程

Stream的消息通信方式遵循了发布-订阅模式

INPUT对应于生产者,OUTPUT对应于消费者

在这里插入图片描述

Stream标准流程套路

在这里插入图片描述

  • binder:很方便的连接中间件,屏蔽差异
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置
  • Source(生产)和sink(消费):简单地可理解为参照对象是spring cloud stream自身,从stream发布消息就是输出,接收消息就是输入

常用注解

组成说明
Middleware中间件,目前只支FRabbitMQ和Kafka
BinderBinder是应用与消息中间件之间的封装,目前实行了KafKa和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input注解标识输入通道,通过该输入通接收到的消息息进入应用程序
@Output注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener监听队列,用于消费者的队列的消息接收
@EnableBinding指信道channel和exchange绑定在一起

要新建3个子模块

cloud-stream-rabbitmq-consumer8802:作为消息接收模块
cloud-stream-rabbitmq-consumer8802:作为消息接收模块

消息驱动之生产者

新建cloud-stream-rabbitmq-provide8801:作为生产者进行发消息模块

pom文件

<artifactId>cloud-stream-rabbitmq-provide8801</artifactId><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!-- 引入自己定义的api通用包,可以使用Payment支付Entity --><dependency><groupId>com.mzr.springcloud</groupId><artifactId>cloud-api-commons</artifactId><version>${project.version}</version></dependency></dependencies>

yml文件

server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: send-8801.com  # 在信息列表时显示主机名称prefer-ip-address: true     # 访问的路径变为IP地址

主启动类

@SpringBootApplication
public class StreamMQMain8801
{public static void main(String[] args){SpringApplication.run(StreamMQMain8801.class,args);}
}

业务类

定义发送消息的接口

public interface IMessageProvider
{public String send();
}

定义发送消息的接口

//@Service 这个地方不再需要该注解,不再是和controller打交道的service,而是发送消息的推送服务类与Stream打交道,可结合Stream标准流程图来看
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
{@Resourceprivate MessageChannel output; // 消息发送管道@Overridepublic String send(){String serial = UUID.randomUUID().toString();//将消息与绑定器绑定output.send(MessageBuilder.withPayload(serial).build());System.out.println("*****serial: "+serial);return null;}
}

controller

@RestController
public class SendMessageController
{@Resourceprivate IMessageProvider messageProvider;@GetMapping(value = "/sendMessage")public String sendMessage(){return messageProvider.send();}
}

启动rabbitMQ

在这里插入图片描述
这里的studyExchange便是yml文件配置的

在这里插入图片描述
启动eureka7001、生产者8801,多次访问localhost:8801/sendMessage

可以看到rabbitMQ中也有流量的起伏

在这里插入图片描述

消息驱动之消费者

新建模块 cloud-stream-rabbitmq-consumer8802

pom文件与8801一样

省略..

yml文件

server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理#output: # 这个名字是一个通道的名称,这是8802与8801唯一的区别,一个是消息发送一个是消息接收input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8802.com  # 在信息列表时显示主机名称prefer-ip-address: true     # 访问的路径变为IP地址

主启动类与8801一致

省略..

业务类

因为这是消费者是接受消息,所以只需关注controller层业务逻辑

controller层

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
{@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)//监听输入源public void input(Message<String> message){System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\t  port: "+serverPort);}
}

启动8802测试

可以看到这个通道已经被监听了

在这里插入图片描述

可以看到8801发送的流水号与8802接受到的流水号对应

在这里插入图片描述

在这里插入图片描述

分组消费及持久化

依照8802,clone出来一份cloud-stream-rabbitmq-consumer8802作为消费者并启动

运行后有两个问题

  • 有重复消费问题
  • 消息持久化

消息重复消费

8801发送的消息,8802及8803同时都收到了,存在着重复消费问题

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决

在这里插入图片描述

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次不同组是可以全面消费的(重复消费)

默认分组group是不同的,组流水号不一样,被认为不同组,可以消费,所以导致8002 8003同时收到相同的消息

在这里插入图片描述

解决消息重复消费

原理:微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。

实现步骤:

1、8802/8803都变成不同组,group两个不同

8802/8803配置yml文件

	# 8802bindings:input:   destination: studyExchange  content-type: application/json  binder: defaultRabbit  group: atguiguA  # 自定义分组配置# 8803bindings:input:   destination: studyExchange  content-type: application/json  binder: defaultRabbit  group: atguiguB  # 自定义分组配置

可以看到RabbitMQ上已经变成了我们自定义的分组

在这里插入图片描述
此时8002 8003同时收到相同的消息

2、8802/8803都变成相同组,group两个相同

将8802/8803yml文件中group修改为atguiguA

在这里插入图片描述

访问2次localhost:8801/sendMessage触发生产者发送2条信息

在这里插入图片描述

消费者8802接收到1条

在这里插入图片描述

消费者8803接收到1条

在这里插入图片描述

8802/8803实现了轮询分组,每次只有一个消费者,8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。

消息持久化

做个小测试感受一下消息持久化的重要性

停止8802/8803并去除掉8802的分组group: atguiguA,此时8803的分组group: atguiguA并没有去掉

8801先发送4条消息到rabbitmq

在这里插入图片描述

重新启动8802,因为没有分组属性配置,后台没有打出来消息

重新启动8803,因为有分组属性配置,后台打出来了8801生产者发送的消息

在这里插入图片描述

group分组属性对于避免消息重复消费 及 消息持久化避免消息丢失是非常重要的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/580052.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

最小覆盖子串(LeetCode 76)

文章目录 1.问题描述2.难度等级3.热门指数4.解题思路参考文献 1.问题描述 给你一个字符串 s 、一个字符串 t 。返回 s 中涵盖 t 所有字符的最小子串。如果 s 中不存在涵盖 t 所有字符的子串&#xff0c;则返回空字符串 “” 。 注意&#xff1a; 对于 t 中重复字符&#xff…

git 常用基本命令, reset 回退撤销commit,解决gitignore无效,忽略记录或未记录远程仓库的文件,删除远程仓库文件

git 基本命令 reset 撤销commit https://blog.csdn.net/a704397849/article/details/135220091 idea 中 rest 撤销commit过程如下&#xff1a; Git -> Rest Head… 在To Commit中的HEAD后面加上^&#xff0c;点击Reset即可撤回最近一次的尚未push的commit Reset Type 有三…

Flink Has Become the De-facto Standard of Streaming Compute

摘要&#xff1a;本文整理自 Apache Flink 中文社区发起人、阿里巴巴开源大数据平台负责人王峰&#xff08;莫问&#xff09;&#xff0c;在 Flink Forward Asia 2023 主会场的分享。Flink 从 2014 年诞生之后&#xff0c;已经发展了将近 10 年&#xff0c;尤其是最近这些年得到…

爬虫系列----Python解析Json网页并保存到本地csv

Python解析JSON 1 知识小课堂1.1 爬虫1.2 JSON1.3 Python1.4 前言技术1.4.1 range1.4.2 random1.4.3 time.sleep1.4.4 with open() as f: 2 解析过程2.1 简介2.2 打开调试工具2.3 分析网址2.3.1 网址的规律2.3.2 网址的参数 2.4 爬取第一页内容2.5 存入字典并获取2.6 循环主体数…

7-2 设计一元二次方程求解类(高教社,《Python编程基础及应用》习题9-4)——python

设计一个类Root来计算ax2bxc0的根。该类包括&#xff1a;a、b、c共3个属性表示方程的3个系数&#xff0c;getDiscriminant()方法返回b2-4ac, getRoot1()和getRoot2()返回方程的两个根。 其中&#xff0c;getRoot1()返回的根对应&#xff1a; getRoot2()返回的根对应&#xff1a…

百度沧海文件存储CFS推出新一代Namespace架构

随着移动互联网、物联网、AI 计算等技术和市场的迅速发展&#xff0c;数据规模指数级膨胀&#xff0c;对于分布式文件系统作为大规模数据场景的存储底座提出了更高的要求。已有分布式文件系统解决方案存在着短板&#xff0c;只能适应有限的场景&#xff1a; >> 新型分布式…

格密码:傅里叶矩阵

目录 一. 铺垫性介绍 1.1 傅里叶级数 1.2 傅里叶矩阵的来源 二. 格基与傅里叶矩阵 2.1 傅里叶矩阵详细解释 2.2 格基与傅里叶矩阵 写在前面&#xff1a;有关傅里叶变换的解释太多了&#xff0c;这篇博客主要总结傅里叶矩阵在格密码中的运用。对于有一定傅里叶变换基础的同…

IntelliJ IDEA [设置] 隐藏 .idea 等 .XXX 文件夹

文章目录 1. 问题描述2. 解决办法3. 最后效果4. 特殊处理&#xff08;正常不需要此步骤&#xff09;总结 我们使用 IntelliJ IDEA 导入项目的时候&#xff0c;经常会看到一些 .XXX 的文件夹&#xff08;例如&#xff1a;.idea&#xff0c;.mvn&#xff0c;.gradle 等&#xff0…

支付宝、学习强国小程序input、textarea数据双向绑定

前言 和 vue 的绑定有些区别&#xff0c;需要注意。直接 value"{{inputValue}}" 是无法双向绑定的。 正确思路 文档说的比较详细&#xff0c;不过没有组合使用的案例&#xff0c;需要自行理解。这里正确的方法是先用 value 绑定数据&#xff0c;再使用 onInput 事件…

鸿蒙的基本项目_tabbar,首页,购物车,我的

以上效果&#xff0c;由四个ets文件实现&#xff0c;分别是容器页面。首页&#xff0c;购物车&#xff0c;我的。 页面里的数据&#xff0c;我是用json-server进行模拟的数据。 一、容器页面 使用组件Tabs和Tabcontent结合。 import Home from "./Home"; import …

短剧付费变现小程序源码系统:开通会员+在线充值+风口项目,变现利器+完整的代码包 附带部署安装教程

在当今数字化时代&#xff0c;短剧付费变现小程序源码系统已经成为了一个热门的风口项目。它以开通会员、在线充值、完整的代码包等特色功能&#xff0c;成为了一种有效的变现利器&#xff0c;受到了广泛的关注和应用。本文将详细介绍这个源码系统的背景和特色功能&#xff0c;…

实现阿里云oss云存储,简单几步

一、前言 虽然平常学习用的不多&#xff0c;但是用的时候再去找官方文档&#xff0c;也很繁琐&#xff0c;不如直接整理以下&#xff0c;方便粘贴复制&#xff0c;本文介绍两种图片上传方式①普通上传②服务端签名直传 1.普通上传 加载maven依赖 <dependency><grou…

centos 安装oracle 11.2.04 并配置数据库自启动操作记录,一次完成

环境&#xff1a; centos版本7.3&#xff0c;安装的有图形化界面 Oracle11.2.04&#xff0c;之所以选择这个版本是因为网上有人说11其他版本的在安装的过程中会出现这样或那样的问题&#xff0c;下载地址放到文章下面 步骤&#xff0c;按顺序&#xff1a; 1、创建安装Oracle…

万用表测接地电阻方法

万用表测接地电阻方法 用万用表在不同土质的土壤对接地电阻进行了实验&#xff0c;并将万用表所测数据和专用接地电阻测试仪所测数据进行了比较&#xff0c;两者十分接近。具体测量方法如下&#xff1a; 找两根8mm、1m长的圆钢&#xff0c;将其一端磨尖作为辅助测试棒&#x…

Mysql之视图

Mysql之视图 常见的数据库对象视图概述为什么使用视图视图的理解创建视图创建单表视图别名的运用 创建多表联合视图利用视图对数据进行格式化contact 函数以视图为基&#xff0c;再创建新的视图 查看视图更新视图的数据一般情况不可更新的视图 修改和删除视图修改视图删除视图注…

【C#】Visual Studio 2022 远程调试配置教程

在某些特殊的情况下&#xff0c;开发机和调试机可能不是同一台设备&#xff0c;此时就需要远程调试了。 开发机配置 首先需要确保两台机器在同一局域网下。 创建共享文件夹 随便找个地方新建一个文件夹&#xff0c;用来放编译结果。例如我这里是 D:\DebuggingWorkspace\。 …

什么是阿里云负载均衡SLB?

目录 硬件或软件负载均衡的区别是什么&#xff1f; 什么是阿里云负载均衡SLB&#xff1f; 阿里云传统型负载均衡CLB 硬件或软件负载均衡的区别是什么&#xff1f; 通过专用硬件实现负载均衡&#xff0c;那么整体成本会较高&#xff0c;而且设备容易出现单点故障&#xff0c;…

【MySQL】InnoDB和MyISAM区别

文章目录 一、索引不同1 InnoDB聚簇索引&#xff0c;MyISAM非聚簇索引1 InnoDB聚簇索引2 MyISAM非聚簇索引 2 InnoDB必须要有主键&#xff0c;MyISAM允许没有主键3 InnoDB支持外键4 InnoDB不支持全文索引5 索引保存位置不同 二、对事物的支持三、存储结构不同四、存储空间不同五…

Elasticsearch中复制一个索引数据到新的索引中

问题 我有时候&#xff0c;需要调试一个已经存在的ES索引&#xff0c;需要从已有的索引复制数据到新的索引中去。 解决 这里我借助一个GUI工具&#xff0c;来解决这个问题&#xff0c;底层它是使用Reindex的API实现索引数据复制的。 步骤 选中已存在的redix菜单&#xff0…

00-Git 应用

Git 应用 一、Git概述 1.1 什么是Git git 是一个代码协同管理工具&#xff0c;也称之为代码版本控制工具&#xff0c;代码版本控制或管理的工具用的最多的&#xff1a; svn、 git。 SVN 是采用的 同步机制&#xff0c;即本地的代码版本和服务器的版本保持一致&#xff08;提…