【RabbitMQ实战】 03 SpringBoot RabbitMQ生产者和消费者示例

上一节我们写了一段原生API来进行生产和消费的例子。实际上SpringBoot对原生RabbitMQ客户端做了二次封装,让我们使用API的代价更低。

一、配置文件

依赖引入

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency>
</dependencies>

RabbitMQ的配置如下

spring:rabbitmq:host: 192.168.56.201port: 5672username: hellopassword: world#虚拟hostvirtual-host: virtual01template:mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功publisher-returns: true #是否开启生产者returnslistener:simple:acknowledge-mode: manual #手动回复方式,一般建议手动回复,即需要我们自己调用对应的ACK方法prefetch: 10 #每个消费者可拉取的,还未ack的消息数量concurrency: 3 #消费端(每个Listener)的最小线程数max-concurrency: 10 #消费端(每个Listener)的最大线程数

每个配置的具体含义,详见配置

二、生产者代码

@Slf4j
@RestController
@RequestMapping("/rabbit")
public class RabbitSendController implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {private static final String EXCHANGE_NAME = "my_exchange";private static final String ROUTING_KEY = "my_routing";@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 正常发送并被broker接收* @return*/@RequestMapping("send")public String send() {for (int i = 0; i < 10; i++) {OrderInfo orderInfo = new OrderInfo();orderInfo.setAddress("成都市高新区");orderInfo.setOrderId(String.valueOf(i));orderInfo.setProductName("华为P60:" + i);//设置回调关联的一个idString messageId = UUID.randomUUID().toString();log.info("开始发送消息,当前消息关联id为:{}", messageId);CorrelationData correlationData = new CorrelationData(messageId);MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();//设置ack回调rabbitTemplate.setConfirmCallback(this);//退回消息的回调rabbitTemplate.setReturnCallback(this);rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);}return "ok";}/*** 设置一个非法的路由键,模拟消息被broker退回的情况,前提是* spring.rabbitmq.template.mandatory=true 当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息* <p>* spring.rabbitmq.publisher-returns=true 生产者回调确认机制,由回调来确定消息是否发布成功** @return*/@RequestMapping("send-return")public String sendAndReturn() {OrderInfo orderInfo = new OrderInfo();orderInfo.setAddress("成都市高新区");orderInfo.setOrderId("111");orderInfo.setProductName("小米13");//设置回调关联的一个idString messageId = UUID.randomUUID().toString();log.info("开始发送消息,当前消息关联id为:{}", messageId);CorrelationData correlationData = new CorrelationData(messageId);MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();//设置ack回调rabbitTemplate.setConfirmCallback(this);//退回消息的回调rabbitTemplate.setReturnCallback(this);//下面这个RoutingKey是没有绑定的,所以发不出去rabbitTemplate.convertAndSend(EXCHANGE_NAME, "error.routing", message, correlationData);return "ok";}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (correlationData == null) {return;}String messageId = correlationData.getId();if (ack) {log.info("【confirm回调方法】,消息发布成功,messageId={}", messageId);} else {log.info("【confirm回调方法】,消息发布失败,messageId={}", messageId);}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("【returnedMessage回调方法】,消息被退回,message={},replyCode:{},replyText:{},exchange:{},routingKey:{}",new String(message.getBody()), replyCode, replyText, exchange, routingKey);}
}

代码说明

  • 使用RabbitTemplate可以发送消息
  • 这个Controller定义了一个发送的接口,调用RabbitTemplate将消息发送出去
  • 实现了ConfirmCallback接口,对应着我们配置publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功。发送成功时,会回调confirm方法。
  • 实现了ReturnCallback接口,对应着我们的配置
    • publisher-returns: true #是否开启生产者returns。开启生产者returns机制,被回退回来的消费,会调用returnedMessage方法
    • mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队
  • 注意上面两个实现接口,要调用下面两行代码,才能生效。
    rabbitTemplate.setConfirmCallback(this);
    rabbitTemplate.setReturnCallback(this);

访问:http://localhost:8080/rabbit/send
输出日志如下

开始发送消息,当前消息关联id为:b60196e7-4ff2-4926-8a1f-bd0872b236f8
开始发送消息,当前消息关联id为:03232b2c-b755-4b46-9a8c-6b2bbf3b2bd6
【confirm回调方法】,消息发布成功,messageId=b60196e7-4ff2-4926-8a1f-bd0872b236f8
【confirm回调方法】,消息发布成功,messageId=03232b2c-b755-4b46-9a8c-6b2bbf3b2bd6
...

模拟一个发送失败,消息被Broker退回的情况
访问:http://localhost:8080/rabbit/send-return
输出日志如下
可以发现,即使被退回的情况,confirm方法也会被成功执行。这个要注意。

开始发送消息,当前消息关联id为:f89eb7c2-340b-42ef-9454-7dca03d895a9
【returnedMessage回调方法】,消息被退回,message={"orderId":"111","productName":"小米13","address":"成都市高新区"},replyCode:312,replyText:NO_ROUTE,exchange:my_exchange,routingKey:error.routing
【confirm回调方法】,消息发布成功,messageId=f89eb7c2-340b-42ef-9454-7dca03d895a9

三、消费者代码

@Slf4j
@Component
public class RabbitOrderConsumer {private static final String EXCHANGE_NAME = "my_exchange";private static final String QUEUE_NAME = "my_queue";private static final String ROUTING_KEY = "my_routing";@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_NAME, type = "topic", durable = "true"), key = ROUTING_KEY)})public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {log.info("接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), tag);channel.basicAck(tag, false);}
}

代码说明

  • 定义了@RabbitListener,将Exchange和Queue进行绑定
  • channel.basicAck(tag, false);是手动回复ack。注意手动回复ack表示该消息已被客户端成功消费,且在配置文件中要配置ack方式为手动,即上面配置文件中的acknowledge-mode: manual
    运行消费者代码,日志输出如下
接收到消息:{"orderId":"1","productName":"华为P60:1","address":"成都市高新区"},deliveryTag:1
接收到消息:{"orderId":"4","productName":"华为P60:4","address":"成都市高新区"},deliveryTag:2

示例代码git仓库:https://gitee.com/syk1234/mqdmo

四、更多关于@RabbitListener参数说明如下

@Queue注解为我们提供了队列相关的一些属性,具体如下:

  • name: 队列的名称;

  • durable: 是否持久化;

  • exclusive: 是否独享、排外的;

  • autoDelete: 是否自动删除;

  • arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:

  • x-message-ttl:消息的过期时间,单位:毫秒;

  • x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;

  • x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;

  • x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;

  • x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;

  • x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;

  • x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值

  • x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)

  • x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;

  • x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;

  • x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

@RabbitListener 提供消费者配置

  • ackMode:覆盖容器工厂 AcknowledgeMode属性。
  • admin:参考AmqpAdmin.
  • autoStartup:设置为 true 或 false,以覆盖容器工厂中的默认设置。
  • QueueBinding[] bindings:QueueBinding提供监听器队列名称以及交换和可选绑定信息的数组。
  • concurrency:消费并发数。
  • containerFactory:RabbitListenerContainerFactory的bean名称 ,没有则使用默认工厂。
  • converterWinsContentType:设置为“false”以使用“replyContentType”属性的值覆盖由消息转换器设置的任何内容类型标头。
  • errorHandler:消息异常时调用的方法名。
  • exclusive:当为true时,容器中的单个消费者将独占使用 queues(),从而阻止其他消费者从队列接收消息。
  • executor:线程池bean的名称
  • group:如果提供,则此侦听器的侦听器容器将添加到以该值作为其名称的类型为 的 bean 中Collection。
  • id:为此端点管理的容器的唯一标识符。
  • messageConverter:消息转换器。
  • priority:此端点的优先级。
  • String[] queues:监听的队列名称
  • Queue[] queuesToDeclare:监听的队列Queue注解对象,与bindings()、queues()互斥。
  • replyContentType:用于设置回复消息的内容类型。
  • replyPostProcessor:在ReplyPostProcessor发送之前处理响应的 bean 名称 。
  • returnExceptions:设置为“true”以导致使用正常replyTo/@SendTo语义将侦听器抛出的异常发送给发送者。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/91333.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023年十大开源项目:革新技术创新

来源整理 : 小托 | 开源社翻译组PM 翻译 : 张锋 | 开源社翻译 Open-source projects have revolutionized the world of software development by fostering innovation, collaboration, and community-driven contributions. These projects are often the backbone of countl…

PHP8的继承和多态-PHP8知识详解

我们在前面的时候讲过《面向对象编程的特点》时&#xff0c;面向对象编程具有3大特点&#xff1a;封装性、继承性和多态性。 继承和多态的根本作用就是完成代码的重用。下面就来讲解php8的继承和多态。 1继承 子类可以继承父类的所有成员变量和成员方法&#xff0c;包括构造方…

玄子Share 设计模式 GOF 全23种 + 七大设计原则

玄子Share 设计模式 GOF 全23种 七大设计原则 前言&#xff1a; 此文主要内容为 面向对象七大设计原则&#xff08;OOD Principle&#xff09;GOF&#xff08;Gang Of Four&#xff09;23种设计模式拓展的两个设计模式 简单工厂模式&#xff08;Simple Factory Pattern&#x…

小白入门pytorch(二)----神经网络

本文为&#x1f517;[小白入门Pytorch]学习记录博客 文章目录 前言一、神经网络的组成部分1.神经元2.神经网络层3.损失函数4.优化器 二、Pytorch构建神经网络中的网络层全连接层2.卷积层3.池化层4.循环神经网络5.转置卷积层6.归一化层7.激活函数层 三、数据加载与预处理1.数据加…

【计算机网络】 TCP和UDP的区别

文章目录 区别相关问题 区别 UDP是无连接的&#xff0c;TCP面向连接。UDP是不可靠传输&#xff0c;不使用流量控制和拥塞控制。而TCP是可靠传输&#xff0c;使用流量控制和拥塞控制。UDP支持一对一&#xff0c;一对多&#xff0c;多对一和多对多交互通信&#xff0c;而TCP只能…

uvm白皮书练习_ch2_ch223_加入objection机制

UVM中通过objection机制来控制验证平台的关闭。 在每个phase中&#xff0c;UVM会检查是否有objection被提起&#xff08;raise_ objection&#xff09;&#xff0c;如果有&#xff0c;那么等待这个objection被撤销&#xff08;drop_objection&#xff09;后停止仿真&#xff1b…

Fake Maxpooling 二维滑动窗口

先对每一行求一遍滑动窗口&#xff0c;列数变为(列数-k1) 再对每一列求一遍滑动窗口&#xff0c;行数变为(行数-k1) 剩下的就是每一个窗口里的最大值啦 #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0); #define endl \nusing nam…

关于Greenplum为什么基于PostgreSQL而不是MySQL?

Greenplum选择PostgreSQL而不是MySQL&#xff0c;其原因主要有以下几点&#xff1a; 更强的分析能力 PG有非常强大的SQL支持能力和非常丰富的统计函数和统计语法支持&#xff0c;除对ANSI SQL完全支持外&#xff0c;还支持比如分析函数&#xff08;SQL2003 OLAP window函数&a…

【强化学习】基础概念

1. Agent (智能体) 智能体是进行决策和学习的实体&#xff0c;它能感知环境的状态&#xff0c;并基于策略采取动作以影响环境。智能体的目标是通过与环境的交互获得最大化的累积奖励。 2. Environment (环境) 环境是智能体所处的外部系统&#xff0c;它与智能体交互。环境的…

【算法】莫队

这篇博客起源于本人把一道 p o w ( 2 , n ) pow(2,n) pow(2,n) 的问题考虑成求组合数前缀和的问题qwq&#xff0c;于是接触到了这个新算法来总结一下 参考自这篇文章&#xff0c;写得太好了 首先是一道模板题 题目意思是&#xff0c;给出一个数组a&#xff0c;再给出多个区…

无人直播间

失败&#xff01;&#xff01; 采用 ffmpeg 技术进行推流 推流代码&#xff1a; 【需要将rtmp替换为你的推流地址】 ffmpeg -re -stream_loop -1 -i "rain.mp4" -c copy -f flv ""推流地址获取 以哔哩哔哩为例 点击下方链接 开播设置 - 个人中心 - …

leetcode之打家劫舍

leetcode 198 打家劫舍 leetcode 213 打家劫舍 II leetcode 337. 打家劫舍 III 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋&#xff0c;每间房内都藏有一定的现金。这个地方所有的房屋都 围成一圈 &#xff0c;这意味着第一个房屋和最后一个房屋是紧挨着的。同时&#…

【MATLAB源码-第39期】基于m序列/gold序列的直接扩频通信仿真,编码方式采用卷积码,调制方式采用BPSK。

1、算法描述 直接序列扩频通信系统的仿真一般包括以下几个主要步骤&#xff1a;信号产生、扩频、卷积编码、BPSK调制、信道传输、BPSK解调、卷积码译码和解扩。 信号产生&#xff1a; 首先&#xff0c;产生一个二进制数据序列作为待发送的信息位。 扩频&#xff1a; 采用m序列…

React Promise 中断

需求&#xff1a; 上传文件&#xff0c;但是后端接口不支持多文件上传&#xff0c;但是一次性发出很多请求的话如果有100个文件那对后端的压力又太大了在上传的时候还需要有停止上传的按钮 进程&#xff1a; async await 只能做到第一步&#xff0c;但是无法在上传中的时候关…

HBASE集群主节点迁移割接手动操作步骤

HBASE集群主节点迁移割接手动操作步骤 HBASE集群主节点指的是包含zk、nn、HM和rm服务的节点&#xff0c;一般这类服务都是一起复用在同一批节点上&#xff0c;我把这一类节点统称为HBASE集群主节点。 本文中使用了rsync、pssh等工具&#xff0c;这类是开源的&#xff0c;自己…

JVM的主要组成及其作用

jvm主要组成部分有: 类加载器、运行时数据区 (内存结构)、执行引擎、本地接口库、垃圾回收机制 Java程序运行的时候&#xff0c;首先会通过类加载器把Java 代码转换成字节码。然后运行时数据区再将字节码加载到内存中&#xff0c;但字节码文件只是JVM 的一套指令集规范&#xf…

如何开始着手一篇Meta分析 | Meta分析的流程及方法

Meta分析是针对某一科研问题&#xff0c;根据明确的搜索策略、选择筛选文献标准、采用严格的评价方法&#xff0c;对来源不同的研究成果进行收集、合并及定量统计分析的方法&#xff0c;最早出现于“循证医学”&#xff0c;现已广泛应用于农林生态&#xff0c;资源环境等方面。…

十五、异常(3)

本章概要 捕获所有异常 多重捕获栈轨迹重新抛出异常精准的重新抛出异常异常链 捕获所有异常 可以只写一个异常处理程序来捕获所有类型的异常。通过捕获异常类型的基类 Exception&#xff0c;就可以做到这一点&#xff08;事实上还有其他的基类&#xff0c;但 Exception 是所…

鸿鹄工程项目管理系统 Spring Cloud+Spring Boot+Mybatis+Vue+ElementUI+前后端分离构建工程项目管理系统

项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部工程管理的提升提出了更高的要求。 二、企业通过数字化转型&#xff0c;不仅有利于优化业务流程、提升经营管理…

HTML——列表,表格,表单内容的讲解

文章目录 一、列表1.1无序&#xff08;unorder&#xff09;列表1.2 有序&#xff08;order&#xff09;列表1.3 定义列表 二、表格**2.1 基本的表格标签2.2 演示 三、表单3.1 form元素3.2 input元素3.2.1 单选按钮 3.3 selcet元素 基础部分点击&#xff1a; web基础 一、列表 …