Spring Cloud Stream 消息驱动基础入门与实践总结

Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。

【1】概念介绍

① 什么是Spring Cloud Stream

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。简单来讲,就是屏蔽了底层XXMQ,应用层不用关注底层是RabbitMQ还是Kafka 。类似于Spring Data抽离持久层屏蔽底层各种数据库的概念。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。

官网文档:https://spring.io/projects/spring-cloud-stream#overview

在这里插入图片描述

② stream如何统一底层差异

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

  • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。

  • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

在这里插入图片描述

③ Spring Cloud Stream标准流程设计

Stream中的消息通信方式遵循了发布-订阅模式。

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

在这里插入图片描述

④ 几个API注解

@EnableBinding:指信道channel和exchange绑定在一起。

@StreamListener:监听队列,用于消费者的队列的消息接收

@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。

@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序

下面以RabbitMQ为底层MQ来说明如何使用Stream,当然同样要先安装好RabbitMQ。

【2】消息生产者

① pom依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

② yml配置

server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit:  # 表示定义的名称,用于 binding整合type: rabbit  # 消息组件类型environment:  # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 输出通道的名称destination: studyExchange  #表示要使用的 Exchange 名称定义content-type: application/json  # 消息类型binder: defaultRabbit
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: send-8001.com  #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址

③ 消息服务类

public interface IMessageProvider {public String send();
}@EnableBinding(Source.class)//定义消息推送管道
@Slf4j
public class IMessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output;//消息发送通道@Overridepublic String send() {String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());log.info(serial+"***********************");return serial;}
}

编写控制器发送消息:

@RestController
public class IMessageController {@Resourceprivate IMessageProvider provider;@GetMapping("/sendMessage")public String send(){return provider.send();}
}

【3】消息消费者

① pom依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

② yml配置

server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit:  # 表示定义的名称,用于 binding整合type: rabbit  # 消息组件类型environment:  # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange  #表示要使用的 Exchange 名称定义content-type: application/json  # 消息类型binder: defaultRabbitgroup: group1
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: receive-8002.com  #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址

③ 消息接收服务

@Component
@Slf4j
@EnableBinding(Sink.class)
public class StreamController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String>message){log.info("消费者1号接收到消息"+message.getPayload()+"\t port:"+serverPort);}}

上面【2】【3】即可实现消息的发送和接收,但是假设有多个消费者,我们还要考虑两个问题:消息的重复消费和消息的持久化。

【4】group分组解决重复消费问题

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。

在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。

  • 不同组是可以全面消费的(重复消费),
  • 同一组内会发生竞争关系,只有其中一个可以消费。

也就是说,两个消费者微服务的group定义为同一个,即可以解决重复消费问题

【5】group分组解决消息丢失问题

即,在你服务停机重启期间,消息在不断发送,而服务启动后并没有接收到发送的消息。这是由于你没有配置group属性导致的。

解决方案:配置group属性

spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit:  # 表示定义的名称,用于 binding整合type: rabbit  # 消息组件类型environment:  # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange  #表示要使用的 Exchange 名称定义content-type: application/json  # 消息类型binder: defaultRabbitgroup: group1 # 这个很重要!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/27229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

激活和禁用Hierarchy面板上的物体

1、准备工作&#xff1a; (1) 在HIerarchy上添加待隐藏/显示的物体&#xff0c;名字自取。如&#xff1a;endImage (2) 在Inspector面板&#xff0c;该物体的名称前取消勾选&#xff08;隐藏&#xff09; (3) 在HIerarchy上添加按钮&#xff0c;名字自取。如&#xff1a;tip…

前端开发之TCP与UDP认识

上一篇&#x1f449;: 前端开发之性能优化 TCP与UDP 三次握手 1. 初始状态&#xff1a; 客户端开始时处于CLOSED状态&#xff0c;表明没有活动的连接。服务器监听特定端口&#xff0c;处于LISTEN状态&#xff0c;等待连接请求。 2. 第一次握手&#xff08;SYN_SENT状态&am…

sklearn(Scikit-learn)入门学习教程

sklearn&#xff08;Scikit-learn&#xff09;是一个功能强大的Python机器学习库&#xff0c;它提供了丰富的工具和方法&#xff0c;用于数据挖掘、数据分析和预测建模。以下是一个关于sklearn的清晰教程&#xff0c;涵盖了其主要特点和功能&#xff1a; 1. sklearn简介 定义…

FPGA “+:”、“-:“语法

“:”变量[起始地址 : 数据位宽] <–等价于–> 变量[(起始地址数据位宽-1)&#xff1a;起始地址] data[0 : 8] <–等价于–> data[7:0] data[15 : 2] <–等价于–> data[16:15] “-:”变量[结束地址 -: 数据位宽] <–等价于–> 变量[结束地址&#xf…

【机器学习300问】117、序列模型中的符号表示方法?以命名实体识别(NER)任务为例。

在序列模型中&#xff0c;特别是在命名实体识别(NER)任务中&#xff0c;我们通常会用一系列符号来表示输入序列、目标标签以及模型的结构和操作。本文列出一些常见的符号表示方法&#xff0c;结合NER任务进行解释。 一、什么是命名实体识别任务&#xff1f; &#xff08;1&am…

mysql8.0 sql_mode与ONLY_FULL_GROUP_BY报错

如果你的项目出现如下类似的错误 ### Error querying database. Cause: java.sql.SQLSyntaxErrorException: Expression #2 of SELECT list is not in GROUP BY clause and contains nonaggregated column 字段名 which is not functionally dependent on columns in GROUP BY…

PostgreSQL导出导出压缩文件大小

1、导出 pg_dump -h [你的IP地址] -p [你的端口号名称] -U [你的用户名称] -d [你的数据库名称] -t [将要导出数据的表] -F c > [保存路径] 注意&#xff1a; 据说会话创建密码即可不需要输入密码&#xff0c;我试了下不行&#xff1a; export PGPASSWORD你的密码 2…

JSON、yam|fIProperties

JSON、YAML和Properties都是数据序列化和存储的格式&#xff0c;它们各自有独特的特点和适用场景。 1. JSON (JavaScript Object Notation) : 特点&#xff1a;JSON是一种轻量级的数据交换格式&#xff0c;易于人阅读和编写&#xff0c;同时也易于机器解析和生成。它基于ECMA…

YUV格式与RGB格式详解

图像处理 文章目录 图像处理前言YUV 格式YUV 采样 前言 像素格式描述了像素数据存储所用的格式&#xff0c;定义了像素在内存中的编码方式。RGB 和 YUV 为两种经常使用的像素格式。/ 1024 / 1024 2.63 MB 存储空间。 RGB 和 RGBA 格式 RGB 图像具有三个通道 R、G、B&#xff…

Gobject tutorial 一

参考&#xff1a; https://github.com/ToshioCP/Gobject-tutorial/tree/main?tabreadme-ov-file Gobject class and instance Gobject 实例是通过函数g_object_new创建的。Gobject不仅包含实例还包含类。 Gobject的类是在第一次调用g_object_new函数时被创建的。并且对于一…

HyperBDR新版本上线,自动化容灾兼容再升级!

本次HyperBDR v5.5.0版本新增完成HCS&#xff08;Huawei Cloud Stack&#xff09;8.3.x和HCSO&#xff08;Huawei Cloud Stack Online&#xff09;自动化对接&#xff0c;另外还突破性完成了Oracle云(块存储模式)的自动化对接。 HyperBDR&#xff0c;云原生业务级别容灾工具。支…

确定性网络_v0

目录 一、背景二、技术参考文献 一、背景 确定性网络&#xff08;Deterministic Networking&#xff09;是提供确定性服务质量的网络技术&#xff0c;是在以太网的基础上为多种业务提供端到端确定性服务质量保障的一种新技术。通过对网络数据转发行为的控制&#xff0c;将时延…

【渗透测试】|dvwa命令注入乱码问题

法一&#xff1a; 解决方法如下&#xff1a; 1、按住winr&#xff0c;在运行框中输入cmd弹出命令行&#xff0c;在命令行中输入“control intl.cpl” 2、这个命令是使用control命令行工具来打开"区域和语言设置"对话框 3、选中对话框中的管理选项卡 4、可以看到这里…

linux 安装 Nginx 并部署 vue 项目

1、安装 yum install nginx2、使用 nginx 命令 查看nginx状态 systemctl status nginx 启动服务 systemctl start nginx停止服务 systemctl stop nginx重启服务 systemctl restart nginx修改配置后重载 systemctl reload nginx 加入开机自启动 systemctl enable ngin…

前端开发之HTTP协议认识

上一篇&#x1f449;: 前端开发之WebSocket通信 文章目录 1. HTTP 1.0 和 HTTP 1.1 之间有哪些区别1.连接方面&#xff1a;2.资源传输优化&#xff1a;3.缓存机制增强&#xff1a;4.主机头识别5.请求方法扩展 2.HTTP 1.1 和 HTTP 2.0 的区别1. 二进制分帧层&#xff1a;2.多路…

企业应该先上ERP系统还是先实施MES管理系统

在当今日益激烈的市场竞争中&#xff0c;企业信息化已成为提升竞争力的关键。ERP系统与MES管理系统作为企业信息化建设的两大核心系统&#xff0c;各自扮演着不可或缺的角色。然而&#xff0c;在资源有限的情况下&#xff0c;企业往往需要在两者之间做出选择。本文将深入探讨ER…

跨境电商卖家入驻美国线下商超困难吗?

对于跨境电商卖家来说&#xff0c;入驻美国线下商超确实具有一定的挑战性&#xff0c;但并非不可能。成功的关键在于卖家是否具备必要的条件和资质&#xff0c;以及是否能够有效应对美国市场的挑战。 1、卖家需要满足美国相关法律法规的要求 需要拥有合法的经营执照、提供准确…

智能创作引领潮流,抓住时机!TikTok矩阵源码带来自动定时发布的成功策略

智能创作是当今社交媒体平台发展的重要趋势&#xff0c;而TikTok作为最受欢迎的短视频平台之一&#xff0c;通过其独特的创作方式和大量的用户&#xff0c;已经成为广告主和内容创作者的首选平台。在这个竞争激烈的市场&#xff0c;抓住时机并实现成功的关键是自动定时发布&…

人脑神经元与AI神经网络的奥秘

神经元是赋予我们思考力的生物学奇迹。大脑中藏着近千亿个这样的神经元&#xff0c;它们通过错综复杂的连接形成了我们的神经系统。每个神经元由细胞体、树突和轴突构成&#xff0c;这些部分使得神经元能够接收、处理和传递信息。 在人工智能领域&#xff0c;神经网络其实是模仿…

总费用大于20万患者详细信息

select t.住院号 病案号, t.入院日期, t.出院日期, b.名称 出院科室, (select x1.编码 from 病人诊断记录 x,疾病编码目录 x1 where x.疾病idx1.id and x.病人idt.病人id and x.主页idt.主页id and x.记录来源3 and x.诊断次序1 and x.编码序号1 …