Spring Boot整合RabbitMQ之发布与订阅模式

RabbitMQ的模式中,常用的模式有:简单模式,发布与订阅模式,工作模式,路由模式,主题模式。简单模式不太会运用到工作中,我们可以使用 RabbitMQ 的发布订阅模式,实现:

  1. 用户发布动态,其“粉丝”收到其发布动态的消息
  2. 用户下订单,库存模块、支付模块等收到消息并处理
  3. 等等

1. 创建RabbitMQ的生产者

创建一个springboot项目,项目创建idea的默认创建springboot项目

然后进行rabbitMq的整合过程

1.1 引入rabbitmq的jar包

在项目的pom.xml中引入rabbitmq的jar包,详情如下:

<dependency>	<groupId>org.springframework.boot</groupId>	<artifactId>spring-boot-starter-amqp</artifactId>	<version>2.3.12.RELEASE</version>
</dependency>

1.2 配置文件中添加配置

在项目的配置文件中添加rabbitmq的相关配置,配置详情如下:

server:port: 10001# rabbitMq 相关配置
spring:application:name: springboot-rabbitmq-s1rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: guestpassword: guest

guest是rabbitmq的默认密码,不需要重新设置,不过在生产中为了安全是需要改密码的
1.3 创建配置类

配置类用于将队列和交换机进行绑定,该操作也可以使用rabbitmq的管理界面操作,并不是一定需要的步骤。配置类详情如下:

package com.study.rabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author alen* @DATE 2022/6/7 23:50*/
@Configuration
public class RabbitMQConfig {public static final String EXCHANGE_NAME = "fanout-order-exchange";public static final String SMS_QUEUE = "sms-fanout-queue";public static final String EMAIL_QUEUE = "email-fanout-queue";public static final String WECHAT_QUEUE = "wechat-fanout-queue";/*** 1.* 声明交换机* @return*/@Beanpublic FanoutExchange fanoutExchange() {/*** FanoutExchange的参数说明:* 1. 交换机名称* 2. 是否持久化 true:持久化,交换机一直保留, false:不持久化,用完就删除* 3. 是否自动删除 false:不自动删除, true:自动删除*/return new FanoutExchange(EXCHANGE_NAME, true, false);}/*** 2.* 声明队列* @return*/@Beanpublic Queue smsQueue() {/*** Queue构造函数参数说明* 1. 队列名* 2. 是否持久化 true:持久化, false:不持久化*/return new Queue(SMS_QUEUE, true);}@Beanpublic Queue emailQueue() {return new Queue(EMAIL_QUEUE, true);}@Beanpublic Queue wechatQueue() {return new Queue(WECHAT_QUEUE, true);}/*** 3.* 队列与交换机绑定*/@Beanpublic Binding smsBinding() {return BindingBuilder.bind(smsQueue()).to(fanoutExchange());}@Beanpublic Binding emailBinding() {return BindingBuilder.bind(emailQueue()).to(fanoutExchange());}@Beanpublic Binding wechatBinding() {return BindingBuilder.bind(wechatQueue()).to(fanoutExchange());}
}

1.4 模拟发送消息

创建一个service类,在类中进行rabbitMq消息的发送,源码如下:

package com.study.rabbitmq.service;import cn.hutool.json.JSONUtil;
import com.study.rabbitmq.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** @Author alen* @DATE 2022/6/7 23:31*/
@Service
@Slf4j
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void createOrder(Order order) {String body = JSONUtil.toJsonStr(order);log.info("订单信息:{}", body);//交换机名称String exchangeName = "fanout-order-exchange";//路由key 由于我们实现的是fanout模式(广播模式),不需要路由key,所有的消费者都可以进行监听和消费String routeKey = "";//发送mq消息rabbitTemplate.convertAndSend(exchangeName, routeKey, body);log.info("rabbitmq发送广播模式消息成功。。。");}
}

使用单元测试模拟消息发送,单元测试详情如下:

package com.study.rabbitmq;import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.UUID;@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {@Autowiredprivate OrderService orderService;@Testvoid contextLoads() {for (long i = 1; i < 50; i++) {Order order = new Order();order.setRequestId(i);order.setUserId(i);order.setOrderNo(UUID.randomUUID().toString());order.setAmount(10L);order.setGoodsNum(1);order.setTotalAmount(10L);orderService.createOrder(order);}}
}

发送完后,我们可以在rabbitMq的管理后台看到已经发送成功的消息,效果如下:

可见消息已经全部发送完毕,因为前面的三个队列都是绑定在同一个交换机上,所以三个队列都会收到消息。

2. 创建RabbitMQ的消费者

创建消费者服务S2,项目结构参考生产者项目结构,然后进行消息消费的相关代码的实现,实现过程如下

2.1 引入RabbitMQ的jar包

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.3.12.RELEASE</version>
</dependency>

2.2 在项目配置文件中添加配置

配置详情如下

server:port: 10002# rabbitmq 相关配置
spring:application:name: springboot-rabbitmq-s2rabbitmq:host: 127.0.0.1port: 5672virtual-host: /username: adminpassword: admin

2.3 创建MQ消息消费者

消费者类详情如下

package com.study.rabbitmq.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** @Author alen* @DATE 2022/6/8 8:15*/
@Slf4j
@Service
@RabbitListener(queues = {"email-fanout-queue"}) //监听队列
public class FanoutEmailConsumer {@RabbitHandlerpublic void emailMessage(String message) {log.info("Email fanout --接收到消息:{}", message);}
}

启动消费者项目,消费效果如下:

登录rabbitMq后台查看队列的消息情况如下

到此,似乎感觉整合得很顺利,没啥毛病。但是实际的运用中,以上演示过程中忽略了两个很重要的问题,一是我如何知道消息被顺利的发送到了队列,因为实际的工作中,不大可能每个消息都去rabbitmq管理后台查看。二是如果消息在消费的过程中出现了异常导致消息丢失,不重要的数据还好,如果是支付类的消息呢?就会产生严重的线上问题。那么这两个问题需要怎么处理呢?其实rabbitmq提供了消息发送结果回调和消息消费手动确认来处理这两个问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/50527.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

android cocoscreator 检测模拟器还是真机

转载至 一行代码帮你检测Android模拟器 具体原理看原博主文章&#xff0c;这里只讲cocoscreator3.6的安卓工程怎么使用 1.新建一个com.lahm.library包&#xff0c;和com.cocos.game同目录&#xff0c;如图示 那四个文件的代码如下&#xff1a; EmulatorCheckUtil类&#…

【Unity】制作一个简单的菜单栏页面并实现其功能

这是一个简单的菜单页面制作&#xff0c;接下来我们将制作一个完整的菜单页面&#xff0c;并且通过一定的代码去实现它对应的效果。这个主要的功能就是我们在游戏中如果想暂停一下或者重新开始&#xff0c;那么就要用到我们这个功能。接下来我们将实现在游戏中按ESC退出键可以调…

Linux Kernel的local_irq_enable()和local_irq_disable()函数

代码如下图所示&#xff0c;最终操作的是msr daifset, #3 和 msr daifclr, #3 寄存器。 (include/linux/irqflags.h) #define local_irq_enable() do { raw_local_irq_enable(); } while (0) #define local_irq_disable() do { raw_local_irq_disable(); } while (0)#define ra…

解决SEGGER Embedded Studio无法显示Nordic MCU外设寄存器问题

如果使用SES调试NRF52840的时候发现&#xff0c;官方例程只能显示CPU寄存器&#xff0c;但是无法显示外设寄存器时&#xff0c;解决办法如下&#xff1a; 1.在解决方案右键→Options→Debug→Debugger&#xff0c;然后Target Device选择正确的型号。 2.Register Definition Fil…

10个最好的云GPU服务

随着深度学习、人工智能和机器学习等新技术的出现&#xff0c;云 GPU 的需求量很大。 GPU&#xff08;图形处理单元&#xff09;是专用处理器&#xff0c;用于处理计算机图形和游戏等活动所需的大量数据集和复杂计算。不过&#xff0c;它们现在对人工智能&#xff08;A.I.&…

【C++】命名空间 namespace 与 标准流 iostream ( 命名空间概念简介 | 命名空间定义 | 命名空间使用 | iostream 中的命名空间分析 )

文章目录 一、命名空间 namespace1、命名空间基本概念2、名称概念4、C 语言的命名空间3、命名空间避免标识符冲突 二、命名空间定义1、命名空间基本概念2、命名空间定义语法3、代码示例 - 命名空间定义使用 三、命名空间使用1、命名空间默认访问方式2、使用命名空间3、使用默认…

实战项目ssm权限系统 3-总结篇,权限模块保护业务模块

一 工程模块介绍 1.1 工程模块关系 在业务微服务模块中引入安全认证模块&#xff0c;起到对业务模块的认证授权保护

在线设计APP ui的网站,分享这7款

在数字时代&#xff0c;用户界面&#xff08;UI&#xff09;设计变得非常重要&#xff0c;因为良好的UI设计可以改善用户体验&#xff0c;增强产品吸引力。随着科学技术的发展&#xff0c;越来越多的应用在线设计网站出现&#xff0c;为设计师和团队提供了一种新的创作方式。本…

postgresql基于postgis常用空间函数

1、ST_AsGeoJSON 图元转geojson格式 select ST_AsGeoJSON(l.geom) from g_zd l limit 10 2、 ST_Transform 坐标转换 select st_transform(l.shape, 3857) from sde_wf_cyyq l limit 10select st_astext(st_transform(l.shape, 3857)) from sde_wf_cyyq l limit 103、st_aste…

使用动态IP是否会影响网络

今天我们要谈论的话题是关于动态IP和网络的关系。也许有些小伙伴对这个概念还比较陌生&#xff0c;但别担心&#xff0c;我会简单明了的给你理清楚。让我们一起看看动态IP到底能否影响到网络。 首先&#xff0c;我们先来搞明白什么是动态IP。在互联网世界中&#xff0c;每一个连…

【NLP】1、BERT | 双向 transformer 预训练语言模型

文章目录 一、背景二、方法 论文&#xff1a;BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding 出处&#xff1a;Google 一、背景 在 BERT 之前的语言模型如 GPT 都是单向的模型&#xff0c;但 BERT 认为虽然单向&#xff08;从左到右预测…

WPF中手写地图控件(3)——动态加载地图图片

瓦片增加一个Loading动画 可以查看我的另一个博客WPF中自定义Loading图 从中心扩散 进行从里到外的扩散&#xff0c;方向是上左下右。如下图所示 于是我们可以定义一个拥有坐标X跟Y的集合&#xff0c;他允许这个集合&#xff0c;内部使用枚举器的MoveNext自动排序&#xf…

stm32之5.长按按键(使用时钟源)调整跑马灯速度

------------------------------ 源码 #include <stm32f4xx.h> #include "led.h" #include "delay.h" #include "my_str.h" #include "beep.h" #include "key.h" int main(void) { key_init(); Led_init();…

Ubuntu-Server 22.04安装详细过程-图文版

一.下载Ubuntu Server镜像&#xff0c;官方地址下载即可 https://ubuntu.com/download/server 乌班图镜像网址&#xff0c;点击下载即可 二.安装乌班图镜像&#xff0c;最好自己准备u盘在ISO软件内制作完成 1.选择 Install Ubuntu Server 2.选择安装语言为英语 3.安装程序更新选…

数据库第十五课-------------非关系型数据库----------Redis

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

港联证券|指数或进入磨底阶段 短期关注环保、煤炭等板块

磨底历来都不是一天能达到的&#xff0c;比方2018年的政策底到商场底&#xff0c;半途也阅历两个多月时间。当下政策底出现之后至今也有近一个月时间&#xff0c;并且下跌量能不断缩短&#xff0c;心情面也降至冰点&#xff0c;种种迹象阐明离真正商场底的构成已经不远了。此时…

ElasticSearch常用方法

ElasticSearch:是一个储存、检索、数据分析引擎。 在互联网项目中我们经常会按一定的条件去索引我们指定的数据&#xff0c;但是在大量的数据中我们如果直接查询数据库效率是非常低的&#xff0c;ElasticSearch就可以很好的帮我们完成检索。 es封装了api提供给我我们直接操作…

测试框架pytest教程(3)夹具-@pytest.fixture

内置fixture Fixture使用pytest.fixture装饰&#xff0c;pytest有一些内置的fixture 命令可以查看内置fixture pytest --fixtures fixture范围 在pytest中&#xff0c;夹具&#xff08;fixtures&#xff09;具有不同的作用范围&#xff08;scope&#xff09;&#xff0c;用于…

【MySQL】JSON 格式字段处理

MySQL 5.7 版本后已支持 JSON 格式&#xff0c;这虽是 MySQL 的一小步&#xff0c;但可以说是程序开发的一大步&#xff0c;再也不用将 JSON 内容塞到 VARCHAR 类型字段了&#xff0c;程序设计也会变得更加灵活。网上大多只针对JSONObject 对象类型&#xff0c;本文也将详解 JS…

RabbitMQ 消费者

RabbitMQ的消费模式分两种&#xff1a;推模式和拉模式&#xff0c;推模式采用Basic.Consume进行消费&#xff0c;拉模式则是调用Basic.Get进行消费。   消费者通过订阅队列从RabbitMQ中获取消息进行消费&#xff0c;为避免消息丢失可采用消费确认机制 消费者 拉模式拉模式的实…