SpringCloud(17)之SpringCloud Stream

一、Spring Cloud Stream介绍

        Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的Spring习惯用法和最佳实践之上,包括对持久发布/子语义、使用者组和有状态分区的支持。  

        它可以基于 Spring Boot来创建独立的、可用于生产的  Spring应用程序,Spring Cloud Stream为一些供应商的消息   中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通   过使用 Spring Cloud Stream ,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。目前 Spring Cloud Stream 支持 RabbitMQ  Kafka 自动化配置。

        目前Spring Cloud Stream只适配以下中间件信息:

二、Spring Cloud Stream 工作流程

        Spring Cloud Stream应用程序由一个与中间件无关的核心组成。应用程序通过在外部代理公开的目的地和代码中的输入/输出参数之间建立绑定来与外部世界通信。建立绑定所需的特定于Broker的详细信息由特定于中间件的Binder实现来处理。

        通过Stream可以很好的屏蔽各个中间件的API差异,它统一了API,生产者通过OUTPUT向消息中间件发 送消息,此时并不需要关心消息中间件是Kafka还是RabbitMQ,不需要关注他们的API,只需要用到Stream的API,这样可以降低学习成本。消费方通过INPUT消费指定的消息,也不需要关注消息中间件 API,架构图如上图: 

        我们对上图的对象进行说明:

  • Application Core:生产者、消费者;
  • inputs:消费者;
  • ouputs:生产者;
  • Binder:绑定器,主要和消息中间件进行绑定操作;
  • Middleware:消息中间件服务;

        

        我们项目中真正应用到Stream,只需要按照如上流程图操作即可;

 生产者:

        1:使用Source绑定消息的输出管道。

        2:通过MessageChannel输出消息。

        3:通过@EnableBinding开启binder,将生产者绑定到指定的MQ服务。

消费者:      

        1:通过@EnableBinding绑定到MQ。

        2:通过Sink绑定到输入数据管道。

        3:@StreamListener监听指定管道数据。

 2.1 Spring Cloud Stream 实战

        

        如上图,当用户行程结束,用户需进入支付操作,当用户支付完成时,我们需要更新订单状态,此时我 们可以让支付系统将支付状态发送到MQ中,订单系统订阅MQ消息,根据MQ消息修改订单状态。我们 将使用 SpringCloud Stream实现该功能。

2.1.1 生产者

1)引入依赖 

   hailtaxi-pay 中引入依赖:

        <!--stream--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

2) 配置MQ服务

 修改 hailtaxi-pay  application.yml 添加如下配置:

server:port: 18083
spring:application:name: hailtaxi-paycloud:#Consul配置consul:host: localhostport: 8500discovery:#注册到Consul中的服务名字service-name: ${spring.application.name}#Streamstream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: 192.168.211.145port: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称destination: payExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

3)消息输出管道绑定 

/**** 负责向MQ发送消息*/
@EnableBinding(Source.class)
public class MessageSender {@Resourceprivate MessageChannel output;//消息发送管道/**** 发送消息* @param message* @return*/public Boolean send(Object message) {//消息发送boolean bo = output.send(MessageBuilder.withPayload(message).build());System.out.println("*******send message: "+message);return bo;}
}

 参数说明:

Source.class:绑定一个输出消息管道Channel。

MessageChannel:发送消息对象,默认是DirectWithAttributesChannel,发消息在 AbstractMessageChannel中完成。

MessageBuilder.withPayload:构建消息。

        此时大家可能会有一个疑问?如果我们多个channel,在rabbitMQ中就是说我一个服务有多个交换机该怎么办?

        我们来看下 Source.class里面定义的内容是什么,定义的内容如下:

public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}

        所以说如果此时我们要新的管道的话,我们就可以参考Source来定义新的类,然后OUTPUT就定义新的管道名称,然后再配置文件中我们就定义这个新的管道名称。 

4)消息发送 

  com.itheima.pay.controller.TaxiPayController 中创建支付方法用于发送消息,代码如下:

    /**** 支付  http://localhost:18083/pay/wxpay/1* @return*/@GetMapping(value = "/wxpay/{id}")public TaxiPay pay(@PathVariable(value = "id")String id){//支付操作TaxiPay taxiPay = new TaxiPay(id,310,3);//发送消息messageSender.send(taxiPay);return taxiPay;}

2.1.2 消费者 

1)修改配置 

 修改 hailtaxi-order 的核心配置文件 application.yml ,在文件中配置要监听MQ信息:

server:port: 18082
spring:application:name: hailtaxi-orderzipkin:#zipkin服务地址base-url: http://localhost:9411sleuth:sampler:probability: 1  #采样值,0~1之间,1表示全部信息都手机,值越大,效率越低cloud:#Consul配置consul:host: localhostport: 8500discovery:#注册到Consul中的服务名字service-name: ${spring.application.name}#Streamstream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: 192.168.211.145port: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: payExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit  # 设置要绑定的消息服务的具体设置group: paygroup #所属分组

2)消息监听 

  hailtaxi-order 中创建消息监听对象 com.itheima.order.mq.MessageReceiver ,代码如下:

@EnableBinding(Sink.class)
public class MessageReceiver {@Value("${server.port}")private String port;/***** 消息监听* @param message*/@StreamListener(Sink.INPUT)public void receive(String message) {System.out.println("消息监听(增加用户积分、修改订单状态)-->" + message+"-->port:"+port);}
}

参数说明:

Sink.class:绑定消费者管道。

@StreamListener(Sink.INPUT):监听消息配置,指定了消息为application中的input


1.3 消息分组

         消息分组有2个好处,分别是集群合理消费、数据持久化。

 1.3.1集群消费下的分组

1)分组的意义

        分组在项目中是有非常重大的意义,通常应用于消息并发高、消息堆积的场景,这些场景服务消费方通 常会做集群操作,一旦做集群操作,我们又需要项目中的消费者合理消费,比如用户打车支付完成后, 我们需要增加用户积分同时修改订单状态,如果集群环境中有2台服务器都执行该消费操作,此时用户  积分会增加两次,就会造成非幂等问题。 

 

        此时集群中相同服务应该属于同一个组,同一个组中只允许有一个足节点消费某一个信息,这样就可以 避免费幂等问题的出现。

2)分组实战 

        新增一个 hailtaxi-order消费者节点:

  

        此时运行起来,  18082  18182 节点会同时消费所有数据。 

        修改 hailtaxi-order 的核心配置文件 application.yml ,添加分组: 

 

        此时再次测试,可以发现消费者不会重复消费数据。 

1.3.2 数据持久化

        我们把分组去掉,停掉 hailtaxi-order 服务,然后请求 http://localhost:18083/pay/wxpay/1 送数据,发送完数据后,再启动 hailtaxi-order服务,此时发现没有数据可以消费,这是因为数据没 有持久化,是一种广播模式,如果需要数据持久化,得给每个消费节点添加group组即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/706064.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

腾讯云4核8G服务器优惠价格表(轻量+CVM)

腾讯云4核8G服务器多少钱&#xff1f;轻量应用服务器4核8G12M带宽一年446元、646元15个月&#xff0c;云服务器CVM标准型S5实例4核8G配置价格15个月1437.3元&#xff0c;5年6490.44元&#xff0c;标准型SA2服务器1444.8元一年&#xff0c;在txy.wiki可以查询详细配置和精准报价…

ChatGPT带火的HBM是什么?

“ChatGPT是人工智能领域的iPhone时刻&#xff0c;也是计算领域有史以来最伟大的技术之一。” 英伟达创始人兼CEO黄仁勋此前这样盛赞ChatGPT。 ChatGPT突然爆火&#xff0c;对大算力芯片提出了更高更多的要求。近日&#xff0c;据韩国经济日报报道&#xff0c;受惠于ChatGPT&am…

[rust] 10 project, crate, mod, pub, use: 项目目录层级组织, 概念和实战

文章目录 一 项目目录层级组织概念1.1 cargo new 创建同名 的 Project 和 crate1.2 多 crate 的 package1.3 mod 模块1.3.1 创建嵌套 mod1.3.2 mod 树1.3.3 用路径引用 mod1.3.3.1 使用绝对还是相对? 1.3.4 代码可见性1.3.4.1 pub 关键字1.3.4.2 用 super 引用 mod1.3.4.3 用 …

Linux之安装jdk,tomcat,mysql,部署项目

目录 一、操作流程 1.1安装jdk 1.2安装tomcat&#xff08;加创建自启动脚本&#xff09; 1.3 安装mysql 1.4部署项目 一、操作流程 首先把需要用的包放进opt文件下 1.1安装jdk 把jdk解压到/usr/local/java里 在刚刚放解压包的文件夹打开vim /etc/profile编辑器&#xff0c…

普中51单片机学习(8*8LED点阵)

8*8LED点阵 实验代码 #include "reg52.h" #include "intrins.h"typedef unsigned int u16; typedef unsigned char u8; u8 lednum0x80;sbit SHCPP3^6; sbit SERP3^4; sbit STCPP3^5;void HC595SENDBYTE(u8 dat) {u8 a;SHCP1;STCP1;for(a0;a<8;a){SERd…

【GameFramework框架内置模块】4、内置模块之调试器(Debugger)

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址QQ群&#xff1a;398291828 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 【GameFramework框架】系列教程目录&#xff1a;…

MATLAB_ESP32有限脉冲响应FIR无限脉冲响应IIR滤波器

要点 ESP32闪烁LED&#xff0c;计时LEDESP32基础控制&#xff1a;温控输出串口监控&#xff0c;LCD事件计数器&#xff0c;SD卡读写&#xff0c;扫描WiFi网络&#xff0c;手机控制LED&#xff0c;经典蓝牙、数字麦克风捕捉音频、使用放大器和喇叭、播放SD卡和闪存MP3文件、立体…

如何多环境切换?如何在微服务配置多环境?

问题本质: nacos配置中心的配置是如何被项目读取到的&#xff1f;(nacos的配置中心和项目是如何联系的&#xff1f;) 注意&#xff1a;nacos有配置管理和服务管理&#xff0c;别弄混。自动注册的是服务管理&#xff01;&#xff01;&#xff01; 1. 如何注册到nacos服务管理中心…

蓝桥杯备战刷题one(自用)

1.被污染的支票 #include <iostream> #include <vector> #include <map> #include <algorithm> using namespace std; int main() {int n;cin>>n;vector<int>L;map<int,int>mp;bool ok0;int num;for(int i1;i<n;i){cin>>nu…

玩转ChatGPT:参考文献速查

一、写在前面 各位大佬&#xff0c;我又回来了&#xff0c;最近2月太忙啦&#xff08;过年、奶娃、本子、材料、结题&#xff09;&#xff0c;断更了。现水一篇证明我还活着&#xff01;&#xff01;&#xff01; 最近在写国自然本子&#xff0c;遇到一个估计大家都会遇到的问…

Unity将4个纹理图拼接成1个纹理

需要的效果 最终实现的效果大概如下: 4个贴图上去 这里随便放一个切分的图。 Shader代码如下 直接上代码: // Unity built-in shader source. Copyright (c) 2016 Unity Technologies. MIT license (see license.txt)// Unlit shader. Simplest possible textured shad…

UE5 C++ Widget练习 Button 和 ProgressBar创建血条

一. 1.C创建一个继承Widget类的子类&#xff0c; 命名为MyUserWidget 2.加上Button 和 UserWidget的头文件 #include "CoreMinimal.h" #include "Components/Button.h" #include "Blueprint/UserWidget.h" #include "MyUserWidget.genera…

Python实现自动检测设备连通性并发送告警到企业微信

背景&#xff1a;门禁机器使用的WiFi连接&#xff0c;因为某些原因会不定期自动断开连接&#xff0c;需要人工及时干预&#xff0c;以免影响门禁数据同步&#xff0c;故写此脚本&#xff0c;定时检测门禁网络联通性。 #首次使用要安装tcping模块 pip install tcpingfrom tcpin…

【学习笔记】Serdes中的高速接口设计

参考文献&#xff1a; 一、绪论 1.1 背景 “串行替代并行”&#xff1a; 串行传输使用差分信号传输以传输更长距离&#xff1b; 并行传输因串扰无法长距离传输&#xff1b;并行线路对信号偏斜量的要求&#xff0c;限制了最大的传输速率。 SerDesSerializer Deserializer S…

欧拉函数性质和快速幂算法及python实现

目录 欧拉函数 快速幂算法 快速模幂算法 欧拉函数 两个不同的正整数a,b&#xff0c;若gcd(a,b)1,则a和b互质&#xff0c;1与任何正整数都互质 欧拉函数的意义 φ(n) 表示小于或等于正整数n的所有正整数中与n互质的数的个数 如φ(32) 16&#xff0c;即小于32的数中有16个…

Prompt 编程的优化技巧

一、为什么要优化 一&#xff09;上下文限制 目前 GPT-3.5 以及 GPT-4最大支持 16K 上下文&#xff0c;比如你输入超过 16k 的长文本&#xff0c;ChatGPT 会提示文本过大&#xff0c;为了避免 GPT 无法回复&#xff0c;需要限制 上下文在16k 以内 上下文对于 GPT 来说是非常重…

STL常用容器(vector容器)---C++

STL常用容器目录 2.vector容器2.1 vector基本概念2.2 vector构造函数2.3 vector赋值操作2.4 vector容量和大小2.5 vector插入和删除2.6 vector数据存取2.7 vector互换容器2.7.1 vector互换容器收缩内存空间 2.8 vector预留空间 2.vector容器 2.1 vector基本概念 功能&#xf…

自然语言处理(NLP)—— 神经网络自然语言处理(2)实际应用

本篇文章的第一部分是关于探索词嵌入&#xff08;word embedding&#xff09;向量空间。词嵌入是一种语言模型和文本表示技术&#xff0c;其中单词或短语从词汇表被映射到向量的高维空间中。通过这种方式&#xff0c;可以通过计算向量之间的距离来捕捉单词之间的语义关系。 1.…

2024-02-23(Spark)

1.RDD的数据是过程数据 RDD之间进行相互迭代计算&#xff08;Transaction的转换&#xff09;&#xff0c;当执行开启后&#xff0c;代表老RDD的消失 RDD的数据是过程数据&#xff0c;只在处理的过程中存在&#xff0c;一旦处理完成&#xff0c;就不见了。 这个特性可以最大化…

R语言空间分析、模拟预测与可视化

随着地理信息系统&#xff08;GIS&#xff09;和大尺度研究的发展&#xff0c;空间数据的管理、统计与制图变得越来越重要。R语言在数据分析、挖掘和可视化中发挥着重要的作用&#xff0c;其中在空间分析方面扮演着重要角色&#xff0c;与空间相关的包的数量也达到130多个。在本…