RabbitMQ入门指南(三):Java入门示例

专栏导航

RabbitMQ入门指南

从零开始了解大数据


目录

专栏导航

前言

一、AMQP协议

1.AMQP

2.Spring AMQP

二、使用Spring AMQP实现对RabbitMQ的消息收发

1.案例准备阶段

2.入门案例(无交换机)

3.任务模型案例(Work Queues)

总结


前言

RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了AMQP、Spring AMQP和使用SpringAMQP实现对RabbitMQ的消息收发等内容。


一、AMQP协议

1.AMQP

全称为Advanced Message Queuing Protocol,是一种用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。通过AMQP,不同的应用程序可以在不改变各自实现方式的情况下进行跨平台、跨语言的消息通信。

AMQP协议定义了消息的传输方式和消息的元数据,例如消息的发送者、接收者、消息体、消息类型等。这些元数据可以帮助应用程序对消息进行正确的处理。

2.Spring AMQP

在Spring框架中,有一个Spring AMQP的项目,它基于AMQP协议定义了一套API规范,提供了模板来发送和接收消息。这个项目包含两部分,其中spring-amqp是基础抽象,而spring-rabbit是底层的默认实现。

Spring AMQP通过提供模板和抽象层,简化了应用程序与RabbitMQ的交互。它提供了一组易于使用的API,用于发送和接收消息。这些API可以帮助开发人员更专注于业务逻辑,而不是消息的发送和接收细节。

spring-rabbit是Spring AMQP的一部分,它基于RabbitMQ实现了AMQP协议。spring-rabbit提供了对RabbitMQ的封装,使开发人员可以通过简单的配置和API调用与RabbitMQ进行交互。

Spring AMQP 主要功能:

  • 自动声明和配置队列、交换机及其绑定关系:通过简化队列和交换器的创建和管理过程,Spring AMQP 帮助开发人员专注于实现业务逻辑,而不是手动配置消息中间件。
  • 基于注解的监听器模式,实现异步消息接收:通过注解,Spring AMQP 可以自动将方法与特定的队列或交换机绑定,从而实现异步接收和处理消息。这种模式提高了应用程序的响应性能和吞吐量。
  • 封装了 RabbitTemplate 工具,用于发送消息:RabbitTemplate 是 RabbitMQ 的核心类之一,用于发送和接收消息。Spring AMQP 提供了对这个工具的封装,使得开发人员可以方便地使用它来发送消息。

官方文档:

Spring AMQPicon-default.png?t=N7T8https://spring.io/projects/spring-amqp

二、使用Spring AMQP实现对RabbitMQ的消息收发

1.案例准备阶段

项目结构如下:

项目结构介绍:

  • mq-demo:父工程,管理项目依赖

  • publisher:消息的发送者

  • consumer:消息的消费者

在父工程引入spring-amqp依赖:

<!--AMQP依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

项目完整依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.rye.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.15</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><!--Jackson--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency></dependencies>
</project>

在application.yml中配置RabbitMQ服务端信息(每个微服务都需要配置):

spring:rabbitmq:host: 10.0.0.100port: 5672virtual-host: /demousername: userpassword: 123456

2.入门案例(无交换机)

案例模型:

在RabbitMQ管理控制台新建队列:

查看新建结果:

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送

    @Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessage2Queue() {// 队列名称String queueName = "demo.queue";// 消息String msg = "First demo";// 发送消息rabbitTemplate.convertAndSend(queueName, msg);}

运行测试用例,查看结果:

在consumer服务中新建一个类实现消息接收

@Component
public class MqListener {@RabbitListener(queues = "demo.queue")public void listenSimpleQueue(String msg) {System.out.println("消息:" + msg);}
}

启动consumer服务,查看消息(一旦监听的队列中有了消息,就会推送给当前服务):

3.任务模型案例(Work Queues)

让多个消费者绑定一个队列,共同消费队列中的消息。

案例模型:

在RabbitMQ管理控制台新建队列:

查看新建结果:

在publisher服务中的测试类添加一个测试方法(通过循环发送,模拟大量消息堆积现象 ):

    @Testvoid testWorkQueue() throws InterruptedException {String queueName = "work.queue";for (int i = 1; i <= 50; i++) {String msg = "Work Queues " + i;rabbitTemplate.convertAndSend(queueName, msg);Thread.sleep(20);}}

在consumer服务的类中添加2个新的方法,模拟多个消费者绑定同一个队列 :

    @RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收work.queue消息:" + msg);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2接收work.queue消息:" + msg);}

运行结果:

修改consumer服务类中的方法:

  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息

  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

    @RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收work.queue消息:" + msg);Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2接收work.queue消息:" + msg);Thread.sleep(200);}

重启后查看运行结果:

以上结果表明:默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者,并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

修改consumer服务的application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息(每次只能获取一条消息,处理完成才能获取下一个消息):

spring:rabbitmq:host: 10.0.0.100port: 5672virtual-host: /demousername: userpassword: 123456listener:simple:prefetch: 1

重启后查看运行结果:


总结

RabbitMQ是一个开源的消息队列软件,旨在提供可靠的消息传递和消息队列功能。本文主要介绍了AMQP、Spring AMQP和使用Spring AMQP实现对RabbitMQ的消息收发等内容,希望对大家有所帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/234364.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

九.数据处理之增删改

数据处理之增删改 1.插入数据1.1实际问题1.2方式1&#xff1a;VALUES的方式添加1.3方式2&#xff1a;将查询结果插入到表中 2.更新数据3.删除数据4.MySQL8新特性&#xff1a;计算列5.综合案例 1.插入数据 1.1实际问题 解决方式&#xff1a;使用INSERT语句向表中插入数据 1.2方…

vue - 详解配置反向代理后,项目出现白屏的情况,js、css文件404问题解决方案(Vue解决项目白屏以及反向代理)

问题说明 添加反向代理代码后,前端运行白屏。 // 设置baseURL,8888是后端端口号,前端请求默认发送到baseURL的地址 var axios = require(axios) axios.defaults.baseURL = http://localhost:8888/api // 全局注册,之后可在其他组件中通过 this.$axios 发送数据 Vue.proto…

【jupyter notebook】jupyter notebook 调用另一个jupyter notebook 的函数

总结 使用 %run 魔法命令将 Notebook 转换为py文件使用 nbimporter 库手动复制代码优点notebook最前面加上即可最基本方法就跟导入py文件一样&#xff0c;不会被执行一遍快缺点所有的代码都会执行一遍修改原文件就要重新转换&#xff0c;且 从自定义的 .py 文件中导入函数时&a…

文献速递:生成对抗网络医学影像中的应用——用于生成前列腺MR-only影像治疗剂量规划的合成CT的深度学习模型:多中心研究

文献速递&#xff1a;生成对抗网络医学影像中的应用——用于生成前列腺MR-only影像治疗剂量规划的合成CT的深度学习模型&#xff1a;多中心研究 本周给大家分享文献的主题是生成对抗网络&#xff08;Generative adversarial networks, GANs&#xff09;在医学影像中的应用。文…

Codeforces Round 916 (Div. 3)(E:贪心 F贪心dfs G tarjan+topsort )

A&#xff1a;直接暴力统计每个字符的次数是否达标即可 #include<bits/stdc.h> using namespace std; const int N 3e510,mod998244353; #define int long long typedef long long LL; typedef pair<int, int> PII; typedef unsigned long long ULL;const long l…

微信小程序更新机制

1/同步更新 1、定期检查更新时&#xff1b; 2、长期未使用&#xff0c;首次进入会同步更新&#xff0c;但在弱网或者下载新版本失败的情况下&#xff0c;还会使用旧版本。 2/异步更新&#xff1a; 启动时异步更新 3/开发者手动触发更新 在启动时异步更新的情况下&#xff0c;…

电源模块测试方法 | 怎么测试电源负载瞬态响应?

负载瞬态响应测试是检测电源稳定性和质量的重要方法之一&#xff0c;而电源稳定性是设备正常运行的基础。通过负载瞬态响应测试来检测电源的响应速度和稳定性&#xff0c;从而优化电源设计&#xff0c;提升性能&#xff0c;确保电子设备可以稳定工作。 什么是负载瞬态响应测试?…

js知识点1:防抖节流

js知识点1&#xff1a;防抖节流 防抖节流 防抖节流&#xff0c;本质上是优化高频率执行代码的一种手段 定义&#xff1a; 防抖: n 秒后再执行该事件&#xff0c;若在 n 秒内被重复触发&#xff0c;则重新计时 节流: n 秒内只运行一次&#xff0c;若在 n 秒内重复触发&#xff0…

【C语言】SCU安全项目2-BufBomb

目录 关键代码解读&#xff1a; getxs() getbuf() test() 核心思路 具体操作1 具体操作2 前段时间忙于强网杯、英语4级和一些其他支线&#xff0c;有点摸不清头绪了&#xff0c;特别是qwb只有一个输出&#xff0c;太过坐牢&#xff0c;决定这个安全项目做完后就继续投身…

Spring MVC框架支持RESTful,设计URL时可以使用{自定义名称}的占位符@Get(“/{id:[0-9]+}/delete“)

背景&#xff1a;在开发实践中&#xff0c;如果没有明确的规定URL&#xff0c;可以参考&#xff1a; 传统接口 获取数据列表,固定接口路径&#xff1a;/数据类型的复数 例如&#xff1a;/albums/select RESTful接口 - 根据ID获取某条数据&#xff1a;/数据类型的复数/{id} - 例…

在Linux安装的Docker中开启IPv6

先在Linux中安装docker&#xff0c;然后在docker中开启IPv6。 安装docker 第一步&#xff0c;卸载旧版本docker。 若系统中已安装旧版本docker&#xff0c;则需要卸载旧版本docker以及与旧版本docker相关的依赖项。 命令&#xff1a;yum -y remove docker docker-client do…

ADC芯片CS1237在电子秤方案的优势

​随着科技的不断发展&#xff0c;电子秤已经成为我们日常生活中不可或缺的测量工具。为了满足用户对于高精度、高稳定性的需求&#xff0c;芯海ADC芯片CS1237应运而生&#xff0c;为电子秤方案带来了革命性的变革。 一、芯海ADC芯片CS1237介绍 芯海ADC芯片CS1237是一款高性能…

【力扣100】104.二叉树的最大深度

添加链接描述 递归 # Definition for a binary tree node. # class TreeNode: # def __init__(self, val0, leftNone, rightNone): # self.val val # self.left left # self.right right class Solution:def maxDepth(self, root: Optional[T…

CountDownLatch和Semaphore的区别?

CountDownLatch和Semaphore都是在Java中用于多线程协同的工具&#xff0c;但它们有一些重要的区别。 CountDownLatch&#xff1a; 用途&#xff1a; 主要用于等待一个或多个线程完成操作&#xff0c;它的计数器只能被减少&#xff0c;不能被增加。计数&#xff1a; 初始化时需…

栈-二叉树的中序遍历/easy

二叉树的中序遍历 1、题目2、解题思路3、复杂度最优解示例4、抽象与扩展 1、题目 给定一个二叉树的根节点 root &#xff0c;返回 它的 中序 遍历 。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,3,2]示例 2&#xff1a; 输入&#xff1a;roo…

科聪控制系统典型应用车型 —— 料箱机器人

料箱机器人即料箱AGV是一种智能化物流搬运设备&#xff0c;它可以代替人力完成出库入库和搬运工作&#xff0c;可根据出入库生产出货需求&#xff0c;将货物从起点运送到终点&#xff0c;自动柔性完成货到人货到点的操作。 提升仓储和物流效率的自动化利器 料箱机器人的投用能…

STM32_通过Ymodem协议进行蓝牙OTA升级固件教程

目录标题 前言1、OTA升级的重要性和应用场景2、理论基础2.1、单片机的启动流程2.2、什么是IAP&#xff1f;2.3、什么是OTA&#xff1f;2.4、什么是BootLoader&#xff1f;2.5、Ymodem协议是什么&#xff1f;2.6、IAP是如何实现的&#xff1f; 3、具体操作3.1、软硬件工具准备3.…

数据可视化---直方图

内容导航 类别内容导航机器学习机器学习算法应用场景与评价指标机器学习算法—分类机器学习算法—回归机器学习算法—聚类机器学习算法—异常检测机器学习算法—时间序列数据可视化数据可视化—折线图数据可视化—箱线图数据可视化—柱状图数据可视化—饼图、环形图、雷达图统…

filebeat8版本支持文件move

1、[rootzz log]# mv elink.log elink.log.001 2、[rootzz ~]# ps -ef | grep -i filebeat root 7090 3121 0 20:39 pts/0 00:00:00 ./filebeat -e -c /root/filebeat-8.8.0-linux-x86_64/prospectors/elink.yaml root 7128 3297 0 20:40 pts/3 00:00…

短剧分销平台搭建,助力普通人进入短剧市场

当下短剧抢占了各大用户的碎片化时间&#xff0c;成为了当代年轻人的“电子榨菜”&#xff0c;目前我国的短剧用户已达到了10.12亿人&#xff0c;拥有庞大的用户体量。 以小程序为主的付费短剧&#xff0c;今年充值流水达到了180多亿元。以当前短剧市场规模计算&#xff0c;今年…