SpringBoot整合RabbitMQ的快速使用教程

     

目录

一、引入依赖

二、配置rabbitmq的连接信息等

1、生产者配置

2、消费者配置 

三、设置消息转换器

四、生产者代码示例

 1、配置交换机和队列信息

2、生产消息代码

五、消费者代码示例

1、消费层代码

2、业务层代码 


        在分布式系统中,消息队列是一种重要的通信方式,它能够有效地将消息从一个应用程序传递到另一个应用程序。RabbitMQ是一款流行的开源消息队列系统,简单易用且功能强大。本文将介绍如何使用SpringBoot快速整合RabbitMQ,实现消息的发送和接收。

 

交换机: 主要负责接收生产者发送的消息,并根据特定的规则将这些消息路由到一个或多个队列中。交换机的类型有:

  •  Fanout Exchange(扇出交换机)

        Fanout交换机会将接收到的所有消息广播到它知道的所有队列中。这种类型的交换机不考虑路由键,只是简单地将消息复制到所有绑定的队列中。适用于不需要选择性地发送消息给特定队列的情况,例如,广播系统通知或有多个服务需要消费同一份数据的场景。

  • Direct Exchange(直连交换机)

       Direct交换机根据消息的路由键将消息发送到与之匹配的队列中。只有当路由键与绑定关键字完全匹配时,消息才会被路由到相应的队列。适合于精确控制消息投递的场景,如特定的服务或功能模块只关心特定类型的消息。

  • Topic Exchange(主题交换机)

       Topic交换机允许更复杂的匹配规则,通过模式匹配的方式将消息路由到一个或多个队列。路由键和绑定键都使用点分隔的字符串,可以包含特殊字符如“#”和“*”来实现模糊匹配。"*"用于匹配一个单词,而“#”则用于匹配零个或多个单词。适合于需要按内容分类消息的系统,如日志处理系统,可以根据日志等级或来源将日志消息分发到不同的队列。

  • Headers Exchange(头交换机)

        Headers交换机使用消息头的一组键值对来决定消息应该被路由到哪个队列。这种交换机允许更细粒度的路由控制,但配置和使用较为复杂。适合需要基于消息多个属性来动态决定路由的场景,例如某些高级的路由策略或复杂的事件处理系统。

队列:主要用于存储消息,实现先进先出(FIFO)的特性。

一、引入依赖

这里引入了两个依赖。一个是rabbitmq的依赖,另一个是配置json转换器所需要的依赖。生产者和消费者服务都需要引入这两个依赖。

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
      <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-xml</artifactId>
 </dependency>

二、配置rabbitmq的连接信息等

1、生产者配置

  rabbitmq:
    host: 170.40.20.16
    port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /

2、消费者配置 

   rabbitmq:
    host: 170.40.20.16
  port: 5672
    username: zhuoye
    password: zy521
    virtual-host: /
    listener:
      simple:
        prefetch: 1 #每次只能处理一个,处理完成才能获取下一个消息

三、设置消息转换器

        默认情况下Spring采用的序列化方式是JDK序列化,而JDK的序列化存在可读性性差、占用内存大、存在安全漏洞等问题。所以,这里我们一般使用Jackson的序列化代替JDk的序列化。

在生产者和消费者的启动类上加上如下代码:  

@SpringBootApplication
@EnableRabbit //开启rabbitmq的使用
public class ConsumerApp {public static void main( String[] args ) {SpringApplication.run(ConsumerApp.class, args);}//使用的是Jackson库中的Jackson2JsonMessageConverter类,代替使用jdk自带的序列化@Beanpublic MessageConverter jacksonMessageConvertor(){Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);//开启消息id的自动生成功能return jackson2JsonMessageConverter;}
}

四、生产者代码示例

 1、配置交换机和队列信息
@Configuration
public class RabbitMqConfig {private static String EXCHANGE_NAME="amq.topic";private static String QUEUE_NAME="alarm.data.topic.queue";private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";/*** 声明交换机*/@Beanpublic TopicExchange exchange(){// durable:是否持久化,默认是false// autoDelete:是否自动删除,当没有生产者或者消费者使用此交换机,该交换机会自动删除。return new TopicExchange(EXCHANGE_NAME,true,false);}/*** 声明告警队列* @return*/@Bean("alarmQueue")public Queue alarmQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。return new Queue(QUEUE_NAME,true,false,false);}/*** 声明确认告警队列* @return*/@Bean("confirmAlarmQueue")public Queue confirmAlarmQueue(){return new Queue(CONFIRM_ALARM_QUEUE_NAME,true,false,false);}/*** 声明告警队列绑定关系* @param queue* @param topicExchange* @return*/@Beanpublic Binding alarmBinding(@Qualifier("alarmQueue") Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("server.event.#");}/*** 声明确认告警队列绑定关系* @param queue* @param topicExchange* @return*/@Beanpublic Binding confirmAlarmBinding(@Qualifier("confirmAlarmQueue") Queue queue, TopicExchange topicExchange){return BindingBuilder.bind(queue).to(topicExchange).with("server.event_confirm.#");}
2、生产消息代码
    @Autowiredprivate RabbitTemplate rabbitTemplate;private static String EXCHANGE_NAME="amq.topic";private static String CONFIRM_ALARM_QUEUE_NAME="alarm.confirm.data.topic.queue";@Testvoid producerAlarmMsg() {String msg = "发送一条告警消息";rabbitTemplate.convertAndSend(EXCHANGE_NAME, "server.event.#",msg);System.out.println("msg = " + msg);}@Testvoid producerConfirmAlarmMsg() {String msg = "发送一条确认告警消息";rabbitTemplate.convertAndSend(CONFIRM_ALARM_QUEUE_NAME, "server.event_confirm.#",msg);System.out.println("msg = " + msg);}

五、消费者代码示例

1、消费层代码
@Component
public class AlarmConsumer {@Autowiredprivate IAlarmService alarmService;@RabbitListener(queues ="alarm.data.topic.queue",concurrency = "5")public void getAlarmInfo(String data){alarmService.dealAlarmData(data);}@RabbitListener(queues ="alarm.confirm.data.topic.queue",concurrency = "5")public void getConfirmAlarmInfo(String data){alarmService.dealConfirmAlarmData(data);}
}
2、业务层代码 
@Service
public class IAlarmServiceImpl implements IAlarmService {@Overridepublic void dealAlarmData(String data) {EquipAlarmResp equipAlarmResp= JSON.parseObject(result,EquipAlarmResp.class);List<String> alarmIdsOld = dceEquipAlarmMapper.queryAllAlarmIds();DceEquipAlarmDto dceEquipAlarmDto = CopyBeanUtils.copyProperties(equipAlarmResp, DceEquipAlarmDto.class);dceEquipAlarmDto.setCreateTime(new Date());dceEquipAlarmDto.setAlarmTime(dceEquipAlarmDto.getAlarmTime()/1000);//查询出需要新增或者更新的数据Boolean flag=alarmIdsOld.stream().filter(a->a.equals(dceEquipAlarmDto.getAlarmId())).findFirst().isPresent();//开启事务,保证新增、更新、删除的原子性TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);List<DceEquipAlarmDto> list=new ArrayList<>();list.add(dceEquipAlarmDto);try {//新增if (!flag) {dceEquipAlarmMapper.insertBatch(list);}//更新if (flag) {dceEquipAlarmMapper.updateBatch(list);}//提交事务transactionManager.commit(transaction);} catch (Exception e) {//回滚transactionManager.rollback(transaction);log.error("DynamicEnvironmentServiceImpl.getAlarmInfoByRabbitMq 新华报业动环设备告警信息更新失败!", e);}}@Overridepublic void dealConfirmAlarmData(String data) {EquipConfirmAlarmResp alarmResp = JSON.parseObject(data,EquipConfirmAlarmResp.class);Integer confirmTime = Integer.parseInt(String.valueOf(System.currentTimeMillis() / 1000));alarmResp.setConfirmTime(confirmTime);dceEquipAlarmMapper.updateConfirmAlarmBatch(alarmResp,alarmResp.getAlarmIds());}}

注:以上代码为对接告警信息和对接告警确认消息的示例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/18115.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

00Java准备工作

目录 JDK的安装目录 JAVA环境变量的配置 JAVA小知识 JDK的安装目录 目录名称说明bin该路径下存放了JDK的各种工具命令,javac和java就放在这个目录conf该路径下存放了JDK的相关配置文件include该路径下存放了一些平台特定的头文件jmods该路径下存放了JDK的各种模块legal该路…

简单随机数据算法

文章目录 一&#xff0c;需求概述二&#xff0c;实现代码三、测试代码四、测试结果五、源码传送六、效果演示 一&#xff0c;需求概述 系统启动时&#xff0c;读取一组图片数据&#xff0c;通过接口返回给前台&#xff0c;要求&#xff1a; 图片随机相邻图片不重复 二&#…

进程互斥经典问题(读写者问题、理发店问题)

目录 读写者问题 问题描述 问题分析 进程互斥问题三部曲 读者写者算法实现 一、找进程——确定进程关系 二、找主营业务 三、找同步约束 a.互斥 b.资源 c.配额 理发店问题 问题描述 问题分析 进程互斥问题三部曲 理发店问题算法实现 一、找进程——确定进程…

SB-OSC,最新的 MySQL Schema 在线变更方案

目前主流的 MySQL 在线变更方案有两个&#xff1a; 基于 trigger 的 pt-online-schema-change基于 binlog 的 gh-ost 上周 Sendbird 刚开源了他们的 MySQL Schema 在线变更方案 SB-OSC: Sendbird Online Schema Change。 GitHub 上刚刚 25 颗星星&#xff0c;绝对新鲜出炉。 …

Qt Creator(2)【如何在Qt Creator中创建新工程】

阅读导航 引言一、Qt Creator开始界面介绍二、如何在Qt Creator中创建新工程1. 新建项目2. 选择项目模板3. 选择项目路径4. 选择构建系统5. 填写类信息设置界面6. 选择语言和翻译文件7. 选择Qt套件8. 选择版本控制系统9. 最终效果 三、认识Qt Creator项目内容界面1. 基本界面2.…

go语言初识别(五)

本博客内容涉及到&#xff1a;切片 切片 1. 切片的概念 首先先对数组进行一下回顾&#xff1a; 数组定义完&#xff0c;长度是固定的&#xff0c;例如&#xff1a; var num [5]int [5]int{1,2,3,4,5}定义的num数组长度是5&#xff0c;表示只能存储5个整形数字&#xff0c…

检索模型预训练方法:RetroMAE

论文title&#xff1a;https://arxiv.org/pdf/2205.12035RetroMAE: Pre-Training Retrieval-oriented Language Models Via Masked Auto-Encoder 论文链接&#xff1a;https://arxiv.org/pdf/2205.12035 摘要 1.一种新的MAE工作流&#xff0c;编码器和解器输入进行了不同的掩…

华为OD机试【计算最接近的数】(java)(100分)

1、题目描述 给定一个数组X和正整数K&#xff0c;请找出使表达式X[i] - X[i1] … - X[i K 1]&#xff0c;结果最接近于数组中位数的下标i&#xff0c;如果有多个i满足条件&#xff0c;请返回最大的i。 其中&#xff0c;数组中位数&#xff1a;长度为N的数组&#xff0c;按照元…

软件性能测试有哪些测试类型和方法?

软件性能测试是一种通过模拟真实用户使用情况&#xff0c;评估软件系统在各种压力和负载下的表现的测试方法。在今天这个讲究效率的时代&#xff0c;软件性能测试是不可或缺的一环。它能帮助开发人员和企业发现潜在的性能问题&#xff0c;提前优化改进&#xff0c;保证软件系统…

动态内存管理—C语言通讯录

目录 一&#xff0c;动态内存函数的介绍 1.1 malloc和free 1.2 calloc 1.3 realloc 1.4C/C程序的内存开辟 二&#xff0c;通讯录管理系统 动态内存函数的介绍 malloc free calloc realloc 一&#xff0c;动态内存函数的介绍 1.1 malloc和free void* malloc (…

回文链表(快慢指针解法之在推进过程中反转)

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd;抱怨深处黑暗&#xff0c;不如提灯前行…

代码随想录算法训练营day14|二叉树的递归遍历、二叉树的迭代遍历、二叉树的统一迭代法

二叉树的递归遍历 首先需要明确的一点是&#xff0c;前序中序和后序在二叉树的递归遍历中的区别仅在于递归函数中操作的顺序&#xff0c;前序是在遍历一个节点的左右子树前进行操作&#xff0c;中序是在遍历一个节点的左子树后进行操作再遍历右子树&#xff0c;而后序是在遍历…

C++算术运算和自增自减运算

一 引言 表示运算的符号称为运算符。 算术运算&#xff1b; 比较运算&#xff1b; 逻辑运算&#xff1b; 位运算&#xff1b; 1 算术运算 算术运算包括加、减、乘、除、乘方、指数、对数、三角函数、求余函数&#xff0c;这些都是算术运算。 C中用、-、*、/、%分别表示加、减…

《当微服务遇上Ribbon:一场负载均衡的华丽舞会》

在微服务的厨房里&#xff0c;如何确保每一道服务都恰到好处&#xff1f;揭秘Spring Cloud Ribbon如何像大厨一样精心调配资源&#xff0c;让负载均衡变得像烹饪艺术一样简单&#xff01; 文章目录 Spring Cloud Ribbon 详解1. 引言微服务架构中的负载均衡需求Spring Cloud Rib…

【算法实战】每日一题:设计一个算法,用最少数量的矩形覆盖一系列宽度为d、高度为w的矩形,且使用矩形不能超出边界

题目 设计一个算法&#xff0c;用最少数量的矩形覆盖一系列宽度为d、高度为w的矩形建筑物侧墙&#xff0c;且矩形不能超出边界。 核心思路 考虑这种结构 前面递增后面一个与前面的某个高度一致&#xff0c;这时候考虑最下面的覆盖&#xff08;即都是从最下面向上覆盖&#…

redis数据类型set,zset

华子目录 Set结构图相关命令sdiff key1 [key2]sdiffstore destination key1 [key2...]sinter key1 [key2...]sinterstore destination key1 [key2...]sunion key1 [key2...]sunionstore destination key1 [key2...]smove source destination memberspop key [count]sscan key c…

Java GC问题排查的一些个人总结和问题复盘

个人博客 Java GC问题排查的一些个人总结和问题复盘 | iwts’s blog 是否存在GC问题判断指标 有的比较明显&#xff0c;比如发布上线后内存直接就起飞了&#xff0c;这种也是比较好排查的&#xff0c;也是最多的。如果单纯从优化角度&#xff0c;看当前应用是否需要优化&…

Unity实现首行缩进两个字符

效果 在Unity中如果想实现首行缩进两个字符&#xff0c;你会发现按空格是没法实现的。 实现原理&#xff1a;用空白的透明的字替代原来的位置。 代码&#xff1a; <color#FFFFFF00>XXX</color> 赶紧去试试吧&#xff01;

备战秋招—模拟版图面试题来了

随着暑期的脚步逐渐临近&#xff0c;电子工程和集成电路设计领域的毕业生们&#xff0c;也将迎来了另一个求职的黄金期——秋招。我们总说机会是留给有准备的人。对于有志于投身于模拟版图设计的学子们来说&#xff0c;为了在众多求职者中脱颖而出&#xff0c;充分备战模拟版图…

C# 工商银行缺少infosecapiLib.infosec

搜索Tlbimp.exe 这里使用4.8.1下的处理&#xff0c;以管理员身份打开powershell cd "C:\Program Files (x86)\Microsoft SDKs\Windows\v10.0A\bin\NETFX 4.8.1 Tools".\TlbImp.exe "G:\CSharp\icbc-api-sdk-cop-c#\sdk-cop\sdk-cop\dll\infosecapi.dll" …