springCould中的Stream-从小白开始【12】

🥚今日鸡汤🥚

        见过一些人,他们朝九晚五😭,有时也要加班,却能把生活过得很😎有趣。他们有自己的爱好,不怕独处。他们有自己的坚持,哪怕没人在乎。🤦‍♂️

                                                               开心一点😁

                                                               认真一点🤔

                                                               努力一点🫡

目录

😶‍🌫️1.为什么引入Stream

🥚2.什么是Stream 

🧇3.Steam设计思想 

🥓4.案例说明 

🧂5.重复消费 


1.为什么引入Stream🥚🥚🥚

  • 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

1.1无感知的使用消息中间件

Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知。

1.2中间件和服务的高度解耦

Spring Cloud Stream进行了配置隔离,只需要调整配置,开发中可以动态的切换中间件(如rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

2.什么是Stream 🥚🥚🥚

  • 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架
  • 应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。
  • 通过我们配置binding(绑定),而Spring Cloud Stream的 binder对象负责与消息中间件交互

所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

3.Steam设计思想🥚🥚🥚 

  • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
  • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离

4.案例说明 🥚🥚🥚

  • cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
  • cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • cloud-stream-rabbitmq-consumer8803,作为消息接收模块

4.1消息驱动-生产者

1.加pom

   <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot </groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--基础依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--eureka客户端--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--消息驱动--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>

2.改yml

  • 注意:小张的Rabbitmq是在Linux上的,所以配置如下:
server:port: 8801spring:application:name: cloud-stream-providerrabbitmq:host: 192.168.20.129port: 5672username: rootpassword: 123456cloud:stream:binders:defaultRabbit:type: rabbitbindings:output:destination: studyExchangecontent-type: application/jsonbinder: defaultRabbiteureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: send-8801.com  # 在信息列表时显示主机名称prefer-ip-address: true     # 访问的路径变为IP地址

3.主启动类

@SpringBootApplication
public class StreamMqMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMqMain8801.class);}
}

4.业务类

  • 1.创建接口
  • 2.创建接口实现类
  • @EnableBinding:Spring Cloud Stream中用来启用消息传递功能的注释。
  • 它用于将应用程序绑定到消息传递系统(例如,Apache Kafka, RabbitMQ),并声明用于发送和接收消息的输入和输出通道。
  • 通过使用@EnableBinding,您可以定义应用程序所需的通道和消息处理程序
@EnableBinding(Source.class)//定义消息的推送管道
public class IMessageProviderImpl implements IMessageProvider {@Autowiredprivate MessageChannel output; //消息发送管道@Overridepublic String send() {String serial= UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("======serial:"+serial);return null;}
}

5.测试

  • 1.浏览器192.168.20.129:15672访问RabbitMQ
  • 2.localhost:8801/sendMessage访问

4.2消息驱动-消费者

1.建模块

  • 1.在父工程下创建模块cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • 2.注意jdk和maven版本号

2.加pom

  • 1.springboot依赖
  • 2.通用依赖
  • 3.eureka客户端依赖
  • 4.消息驱动rabbitmq
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot </groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--基础依赖--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--eureka客户端--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!--消息驱动--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency></dependencies>

3.添yml

server:port: 8802spring:application:name: cloud-stream-consumerrabbitmq:host: 192.168.20.129port: 5672username: rootpassword: 123456cloud:stream:binders:defaultRabbit:type: rabbitbindings:input:destination: studyExchangecontent-type: application/jsonbinder: defaultRabbiteureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8802.com  # 在信息列表时显示主机名称prefer-ip-address: true     # 访问的路径变为IP地址

4.主启动类

@SpringBootApplication
public class StreamMqMain8802 {public static void main(String[] args) {SpringApplication.run(StreamMqMain8802.class);}
}

5.业务类

  • 1.@StreamListener注解是Spring Cloud Stream框架提供的一个注解,用于定义一个消息监听器
  • 2.通过使用@StreamListener注解,可以将一个方法标记为消息的消费者并指定该方法要监听的消息通道
  • 3.当有消息到达指定的通道时,该方法会被自动触发执行,从而处理这个消息。
  • 4.@StreamListener注解通常与@EnableBinding注解一起使用,用于指定所要绑定的消息通道。
  • 5.@EnableBinding注解用于绑定消息通道与应用程序中的输入输出接口,@StreamListener注解则用于标记一个方法作为消息的消费者。
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("消费者1---接受消息:"+message.getPayload()+",port:"+serverPort);}}

6.测试

  • 1.使用8801生产者发送消息
  • 2.使用8802消费者接受消息

5.重复消费 🥚🥚🥚

问题描述:

  • 1.根据8802,重新创建cloud-stream-rabbitmq-consumer8803,作为消息接收模块
  • 2.8801生产者发送消息
  • 3.8802,8803都可以接收到

 如果一个订单同时被两个服务获取到,就会造成数据错误

注意:在Stream中处于同一个group中的多个消费者竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

5.1自定义分组

在消费者端添加group配置:分为xzA,xzB

5.2轮询分组 

8802/8803实现了轮询分组,每次只有一个消费者8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。

8802,8803的group配置相同名称,重新启动 ,使用8801发送两条消息,8802接受一条,8803接收一条

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/625259.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FASTQ 文件压缩格式有哪些?

FASTQ 文件压缩格式 .gz .bz2 .xz .rfq .rfq.xz FASTQ 文件是用于存储测序数据的一种格式&#xff0c;它包含了大量的文本信息&#xff0c;因此通常占用大量的存储空间。为了有效地处理和传输这些数据&#xff0c;通常需要对 FASTQ 文件进行压缩来节省存储空间及传输带宽。以下…

Qt点击按钮在其附近弹出一个窗口

效果 FS_PopupWidget.h #ifndef FS_POPUPWIDGET_H #define FS_POPUPWIDGET_H#pragma once#include <QToolButton> #include <QWidgetAction> #include <QPointer>class QMenu;class FS_PopupWidget : public QToolButton {Q_OBJECTpublic:FS_PopupWidget(QW…

PXE 高效批量网络装机

前提&#xff1a; 虚拟机恢复到初始化 调整网卡为vm1 关闭防火墙 安全linux systemctl stop firewalld vim /etc/selinux/config 配置IP地址 vim /etc/sysconfig/network-scripts/ifcfg-ens33 重启网卡 systemctl restart network 挂载磁盘 安装yum源 安装服务 yum install vs…

Redis相关命令详解及其原理

Redis概念 Redis&#xff0c;英文全称是remote dictionary service&#xff0c;也就是远程字典服务。这是kv存储数据库。Redis&#xff0c;包括所有的数据库&#xff0c;都是请求-回应模式&#xff0c;通俗来说就是数据库不会主动地要给前台推送数据&#xff0c;只有前台发送了…

美团2024届秋招笔试第一场编程真题(js版本)

1.小美的外卖订单 简单的加法逻辑&#xff0c;需要注意的是各个数据的边界问题 折扣价不能超过原价减的价格不能超过满的价格满减优惠仅限原价购入 const rl require("readline").createInterface({ input: process.stdin }); void (async function () {let count…

ENVI5.6版本中规则与不规则图像裁剪操作

图像裁剪的目的是将研究之外的区域去除&#xff0c;常用的是按照行政区划边界或自然区划边界进行图像的裁剪&#xff0c;在基础数据生产中&#xff0c;还经常要做标准分幅裁剪。按照ENVI的图像裁剪过程&#xff0c;可分为规则裁剪和不规则裁剪。 ENVI5.6之前版本的图像裁剪工具…

linux基础学习(3):挂载

挂载可以理解为给磁盘空间一个可访问的入口&#xff0c;那个入口称为挂载点&#xff0c;相当于windows中的盘符。 1.挂载命令mount 1.1直接输入mount 查看系统已挂载的设备 1.2挂载与卸载命令 mount -t 文件系统名 设备文件名 挂载点 | umount 挂载点 或 umount 设…

衡水学院新人真题百练2022(1-20)修订版

​ 1 重要的话说三遍 分数 5 作者 陈越 单位 浙江大学 这道超级简单的题目没有任何输入。 你只需要把这句很重要的话 —— “I’m gonna WIN!”——连续输出三遍就可以了。 注意每遍占一行&#xff0c;除了每行的回车不能有任何多余字符。 #include<stdio.h> int…

QT-day6

作业1&#xff1a;数据库增删查改 #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);if (!db.contains("stu.db")){db QSqlDatabase::addDatabase(&q…

The Plant cell:DAP-seq技术助力揭示BBR/BPC家族的MdBPC2转录因子调控苹果生长素的生物合成从而促进苹果生长及矮化

植物生长素&#xff08;IAA&#xff09;在植物生长发育过程中起着重要的作用。其化学本质是吲哚乙酸。主要作用是使植物细胞壁松弛&#xff0c;从而使细胞生长伸长&#xff0c;在许多植物中还能增加RNA和蛋白质的合成。 目前BARLEY B RECOMBINANT/BASIC PENTACYSTEINE (BBR/BP…

Unity对应SDK和NDK版本的对照表

官网&#xff1a;Unity - Manual: Android environment setup 本人安装的是2022版本长期支持版本2022.3.15f1c1 安装Java的JDK环境就不在这里展开了&#xff0c;就记录下对Android SDK的设置&#xff0c;要与Unity的版本对应&#xff0c;否则会出现很多莫名奇妙的问题。 打开…

自定义vector的实现

实现前需要思考的一个问题 为什么需要将空间的申请与对象的构建分开 查看vector的模板参数时可以看到其有第三个参数是空间适配器allocator&#xff0c;查找其对外提供的成员函数不难发现它的实现逻辑是将空间的申请与对象的构建分开的&#xff0c;为什么呢&#xff1f;不弄清…

SpringBoot异常处理(Whitelabel Error Page和自定义全局异常处理页面)和整合ajax异常处理

SpringBoot异常处理&#xff08;Whitelabel Error Page和自定义全局异常处理页面&#xff09;和整合ajax异常处理 1、springboot自带的异常处理页面Whitelabel Error Page SpringBoot默认的处理异常的机制&#xff1a;SpringBoot 默认的已经提供了一套处理异常的机制。一旦程…

MongoDB 索引管理

文章目录 前言1. 术语介绍1.1 index / key1.2 Coverd Query1.3 IXSCAN / COLLSCAN1.4 Selectivity1.5 Index Prefix 2. 索引原理3. 索引的维护3.1 创建索引语法3.2 单字段索引3.3 多字段复合索引3.4 数组的多列索引3.5 全文索引3.6 Hash 索引3.7 TTL 索引3.8 删除索引3.9 后台创…

基于springboot书籍学习平台源码和论文

首先,论文一开始便是清楚的论述了平台的研究内容。其次,剖析平台需求分析,弄明白“做什么”,分析包括业务分析和业务流程的分析以及用例分析,更进一步明确平台的需求。然后在明白了平台的需求基础上需要进一步地设计平台,主要包罗软件架构模式、整体功能模块、数据库设计。本项…

MongoDB安装与基本使用

一、简介 1.1 Mongodb 是什么 MongoDB 是一个基于分布式文件存储的数据库&#xff0c;官方地址 https://www.mongodb.com/ 1.2 数据库是什么 数据库&#xff08; DataBase &#xff09;是按照数据结构来组织、存储和管理数据的 应用程序 1.3 数据库的作用 数据库的…

创建React步骤

确保电脑已经安装了node.js以后&#xff0c;打开终端进入目标文件夹 cd xxx(文件夹 npx create-react-app react01(替换为你自己的react名称) 可能会出现是否安装xxx,输入y即可 没有报错信息后&#xff0c;输入 cd react01 npm start 会自动跳转到react界面&#xff0c;就…

翻译: Streamlit从入门到精通 部署一个机器学习应用程序 四

Streamlit从入门到精通 系列&#xff1a; 翻译: Streamlit从入门到精通 基础控件 一翻译: Streamlit从入门到精通 显示图表Graphs 地图Map 主题Themes 二翻译: Streamlit从入门到精通 构建一个机器学习应用程序 三 1. 5. 如何部署一个Streamlit应用 部署是将应用程序从开发…

Linux操作系统——重定向与缓冲区

1.理解一下struct file内核对象 上一篇文章&#xff08;文件详解&#xff09;我们一直在谈&#xff0c;一个文件要被访问就必须要先被打开&#xff0c;打开之前就必须要先把文件加载到内存&#xff0c;同时呢我们的操作系统为了管理文件也会为我们的文件创建相对应的struct fi…