Spring Cloud Stream 消息驱动基础入门与实践总结

Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架,该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的Spring熟语和最佳实践上,包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。

【1】概念介绍

① 什么是Spring Cloud Stream

官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。简单来讲,就是屏蔽了底层XXMQ,应用层不用关注底层是RabbitMQ还是Kafka 。类似于Spring Data抽离持久层屏蔽底层各种数据库的概念。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。

官网文档:https://spring.io/projects/spring-cloud-stream#overview

在这里插入图片描述

② stream如何统一底层差异

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性。

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

  • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。

  • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

在这里插入图片描述

③ Spring Cloud Stream标准流程设计

Stream中的消息通信方式遵循了发布-订阅模式。

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

在这里插入图片描述

④ 几个API注解

@EnableBinding:指信道channel和exchange绑定在一起。

@StreamListener:监听队列,用于消费者的队列的消息接收

@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。

@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序

下面以RabbitMQ为底层MQ来说明如何使用Stream,当然同样要先安装好RabbitMQ。

【2】消息生产者

① pom依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

② yml配置

server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit:  # 表示定义的名称,用于 binding整合type: rabbit  # 消息组件类型environment:  # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 输出通道的名称destination: studyExchange  #表示要使用的 Exchange 名称定义content-type: application/json  # 消息类型binder: defaultRabbit
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: send-8001.com  #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址

③ 消息服务类

public interface IMessageProvider {public String send();
}@EnableBinding(Source.class)//定义消息推送管道
@Slf4j
public class IMessageProviderImpl implements IMessageProvider {@Resourceprivate MessageChannel output;//消息发送通道@Overridepublic String send() {String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());log.info(serial+"***********************");return serial;}
}

编写控制器发送消息:

@RestController
public class IMessageController {@Resourceprivate IMessageProvider provider;@GetMapping("/sendMessage")public String send(){return provider.send();}
}

【3】消息消费者

① pom依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

② yml配置

server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit:  # 表示定义的名称,用于 binding整合type: rabbit  # 消息组件类型environment:  # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange  #表示要使用的 Exchange 名称定义content-type: application/json  # 消息类型binder: defaultRabbitgroup: group1
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: receive-8002.com  #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址

③ 消息接收服务

@Component
@Slf4j
@EnableBinding(Sink.class)
public class StreamController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String>message){log.info("消费者1号接收到消息"+message.getPayload()+"\t port:"+serverPort);}}

上面【2】【3】即可实现消息的发送和接收,但是假设有多个消费者,我们还要考虑两个问题:消息的重复消费和消息的持久化。

【4】group分组解决重复消费问题

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。

在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。

  • 不同组是可以全面消费的(重复消费),
  • 同一组内会发生竞争关系,只有其中一个可以消费。

也就是说,两个消费者微服务的group定义为同一个,即可以解决重复消费问题

【5】group分组解决消息丢失问题

即,在你服务停机重启期间,消息在不断发送,而服务启动后并没有接收到发送的消息。这是由于你没有配置group属性导致的。

解决方案:配置group属性

spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit:  # 表示定义的名称,用于 binding整合type: rabbit  # 消息组件类型environment:  # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange  #表示要使用的 Exchange 名称定义content-type: application/json  # 消息类型binder: defaultRabbitgroup: group1 # 这个很重要!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/27229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

激活和禁用Hierarchy面板上的物体

1、准备工作&#xff1a; (1) 在HIerarchy上添加待隐藏/显示的物体&#xff0c;名字自取。如&#xff1a;endImage (2) 在Inspector面板&#xff0c;该物体的名称前取消勾选&#xff08;隐藏&#xff09; (3) 在HIerarchy上添加按钮&#xff0c;名字自取。如&#xff1a;tip…

【机器学习300问】117、序列模型中的符号表示方法?以命名实体识别(NER)任务为例。

在序列模型中&#xff0c;特别是在命名实体识别(NER)任务中&#xff0c;我们通常会用一系列符号来表示输入序列、目标标签以及模型的结构和操作。本文列出一些常见的符号表示方法&#xff0c;结合NER任务进行解释。 一、什么是命名实体识别任务&#xff1f; &#xff08;1&am…

YUV格式与RGB格式详解

图像处理 文章目录 图像处理前言YUV 格式YUV 采样 前言 像素格式描述了像素数据存储所用的格式&#xff0c;定义了像素在内存中的编码方式。RGB 和 YUV 为两种经常使用的像素格式。/ 1024 / 1024 2.63 MB 存储空间。 RGB 和 RGBA 格式 RGB 图像具有三个通道 R、G、B&#xff…

HyperBDR新版本上线,自动化容灾兼容再升级!

本次HyperBDR v5.5.0版本新增完成HCS&#xff08;Huawei Cloud Stack&#xff09;8.3.x和HCSO&#xff08;Huawei Cloud Stack Online&#xff09;自动化对接&#xff0c;另外还突破性完成了Oracle云(块存储模式)的自动化对接。 HyperBDR&#xff0c;云原生业务级别容灾工具。支…

确定性网络_v0

目录 一、背景二、技术参考文献 一、背景 确定性网络&#xff08;Deterministic Networking&#xff09;是提供确定性服务质量的网络技术&#xff0c;是在以太网的基础上为多种业务提供端到端确定性服务质量保障的一种新技术。通过对网络数据转发行为的控制&#xff0c;将时延…

【渗透测试】|dvwa命令注入乱码问题

法一&#xff1a; 解决方法如下&#xff1a; 1、按住winr&#xff0c;在运行框中输入cmd弹出命令行&#xff0c;在命令行中输入“control intl.cpl” 2、这个命令是使用control命令行工具来打开"区域和语言设置"对话框 3、选中对话框中的管理选项卡 4、可以看到这里…

linux 安装 Nginx 并部署 vue 项目

1、安装 yum install nginx2、使用 nginx 命令 查看nginx状态 systemctl status nginx 启动服务 systemctl start nginx停止服务 systemctl stop nginx重启服务 systemctl restart nginx修改配置后重载 systemctl reload nginx 加入开机自启动 systemctl enable ngin…

企业应该先上ERP系统还是先实施MES管理系统

在当今日益激烈的市场竞争中&#xff0c;企业信息化已成为提升竞争力的关键。ERP系统与MES管理系统作为企业信息化建设的两大核心系统&#xff0c;各自扮演着不可或缺的角色。然而&#xff0c;在资源有限的情况下&#xff0c;企业往往需要在两者之间做出选择。本文将深入探讨ER…

跨境电商卖家入驻美国线下商超困难吗?

对于跨境电商卖家来说&#xff0c;入驻美国线下商超确实具有一定的挑战性&#xff0c;但并非不可能。成功的关键在于卖家是否具备必要的条件和资质&#xff0c;以及是否能够有效应对美国市场的挑战。 1、卖家需要满足美国相关法律法规的要求 需要拥有合法的经营执照、提供准确…

智能创作引领潮流,抓住时机!TikTok矩阵源码带来自动定时发布的成功策略

智能创作是当今社交媒体平台发展的重要趋势&#xff0c;而TikTok作为最受欢迎的短视频平台之一&#xff0c;通过其独特的创作方式和大量的用户&#xff0c;已经成为广告主和内容创作者的首选平台。在这个竞争激烈的市场&#xff0c;抓住时机并实现成功的关键是自动定时发布&…

Photoshop 2024 mac/win版:探索图像处理的全新境界

Photoshop 2024是Adobe推出的最新图像处理与设计软件&#xff0c;它在继承了前作所有优秀特性的基础上&#xff0c;实现了多个方面的质的飞跃。这款软件凭借其卓越的图像处理性能、丰富的创意工具以及精确的选区编辑功能&#xff0c;成为了图像处理领域的佼佼者。 Photoshop 2…

初始化三板斧 - centos7

1、关闭防火墙、关闭SELinux ① 立即关闭防火墙 systemctl stop firewalld ② 设置开机关闭防火墙 systemctl disable firewalld ③ 立即关闭SELinxu setenforce 0 ④ 设置开机关闭SELinux 将SELINUXenforcing 修改替换为 SELINUXdisabled vim /etc/selinux/config se…

adb shell pm path packageName

在Android命令行中&#xff0c;如果你想要查询某个应用程序的安装位置&#xff0c;可以使用pm命令&#xff08;Package Manager的缩写&#xff09;。这个命令提供了很多关于软件包管理的操作&#xff0c;查询应用安装路径&#xff0c;可以使用path选项。 具体命令如下&#xf…

Nginx+Tomcat负载均衡、动静分离群集方案

一、Tomcat简介 在现代 Web 服务架构中&#xff0c;Tomcat 和 Nginx 是两个至关重要的组件&#xff0c;负责处理用户请求并实现高性能的服务。本篇博客将深入探讨这些技术的原理和部署配置方法。 最初是由Sun的软件构架师詹姆斯邓肯戴维森开发。安装Tomcat后&#xff0c;安装…

立项 |上海城投《污染土壤修复工厂设计指南》

由上海城投上境生态修复科技有限公司提出申请 主要起草单位有上海城投上境生态修复科技有限公司、上海大学、中华环保联合会水环境治理专业委员会、中华环保联合会固危废及土壤污染治理专业委员会等单位 本指南规定了污染土壤修复工厂设计的技术要求。 本指南适用于国内污染土…

基于Matlab的纸币币值检测系统设计(GUI界面) 【含Matlab源码 MX_004期】

简介&#xff1a; 基于Matlab的纸币币值检测系统是一种利用数字图像处理技术来自动识别和鉴别纸币面额的系统。 图像获取&#xff1a;获取纸币的图像。 预处理&#xff1a;对获取到的图像进行预处理&#xff0c;包括去噪、灰度化、边缘检测等操作&#xff0c;以便后续的处理。…

哈尔滨等保测评驱动下的智慧城市建设思考

面对滚滚而来的大数据时代&#xff0c;信息安全等级保护测评&#xff08;简称等保测评&#xff09;对城市发展的推动作用不容忽视。作为黑龙江省的省会&#xff0c;哈尔滨在智慧城市建设上的积极探索和实践&#xff0c;必须以完善的等保测评体系为前提&#xff0c;确保信息的安…

算法day30

第一题 433. 最小基因变化 题型转化&#xff1a;可以转化为边权为一的最短路问题 将最开始的字符串定义为起点&#xff0c;我们将初识字符串每一个元素改变一次定义为移动一个位置&#xff0c;最后的字符串定义为中点&#xff0c;就这样每一次改变一个元素&#xff0c;最后成功…

aac如何转化mp3?超好用的四种音频转换方法!

aac如何转化mp3&#xff1f;AAC格式可能鲜为人知&#xff0c;但实际上它是一种音频文件格式&#xff0c;然而&#xff0c;AAC的应用却不太广泛&#xff0c;这并非偶然&#xff0c;首先&#xff0c;使用AAC需要支付专利费用&#xff0c;这对于个人和公司都可能是一笔不小的开支&…

2024【大模型】国内市场如何?程序员该何时入局?

1.市场形势 根据最新的市场研究报告&#xff0c;2023年中国的大模型市场呈现出显著的发展趋势和广阔的前景。以下是关于中国大模型市场的几个关键点&#xff1a; 市场规模和增长&#xff1a;2023年&#xff0c;中国AI大模型行业的市场规模达到了147亿元人民币&#xff0c;预计…