学习Spring Boot:(二十六)使用 RabbitMQ 消息队列

前言

前面学习了 RabbitMQ 基础,现在主要记录下学习 Spring Boot 整合 RabbitMQ ,调用它的 API ,以及中间使用的相关功能的记录。

相关的可以去[我的博客/RabbitMQ]

正文

我这里测试都是使用的是 topic 交换器,Spring Boot 2.0.0, jdk 1.8

配置

Spring Boot 版本 2.0.0
pom.xml 文件中引入 AMQP 的依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在系统配置文件中加入连接属性

spring:application:name: RabbitMQ-Demorabbitmq:host: k.wuwii.comport: 5672username: kronchanpassword: 123456#virtual-host: testpublisher-confirms: true # 开启确认消息是否到达交换器,需要设置 truepublisher-returns: true # 开启确认消息是否到达队列,需要设置 true

基本的使用

消费者

新增一个消费者类:

@Log
public class MessageReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try {byte[] body = message.getBody();log.info(">>>>>>> receive: " + new String(body));} finally {// 确认成功消费,否则消息会转发给其他的消费者,或者进行重试channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
}
配置类

新增 RabbitMQ 的配置类,主要是对消费者的队列,交换器,路由键的一些设置:

@Configuration
public class RabbitMQConfig {public final static String QUEUE_NAME = "springboot.demo.test1";public final static String ROUTING_KEY = "route-key";public final static String EXCHANGES_NAME = "demo-exchanges";@Beanpublic Queue queue() {// 是否持久化boolean durable = true;// 仅创建者可以使用的私有队列,断开后自动删除boolean exclusive = false;// 当所有消费客户端连接断开后,是否自动删除队列boolean autoDelete = false;return new Queue(QUEUE_NAME, durable, exclusive, autoDelete);}/*** 设置交换器,这里我使用的是 topic exchange*/@Beanpublic TopicExchange exchange() {// 是否持久化boolean durable = true;// 当所有消费客户端连接断开后,是否自动删除队列boolean autoDelete = false;return new TopicExchange(EXCHANGES_NAME, durable, autoDelete);}/*** 绑定路由*/@Beanpublic Binding binding(Queue queue, TopicExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);}@Beanpublic SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(QUEUE_NAME);container.setMessageListener(receiver());//container.setMaxConcurrentConsumers(1);//container.setConcurrentConsumers(1); 默认为1//container.setExposeListenerChannel(true);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置为手动,默认为 AUTO,如果设置了手动应答 basicack,就要设置manualreturn container;}@Beanpublic MessageReceiver receiver() {return new MessageReceiver();}}
生产者
@Component
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** logger*/private static final Logger log = LoggerFactory.getLogger(MessageSender.class);public void send() {// public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)// exchange:    交换机名称// routingKey:  路由关键字// object:      发送的消息内容// correlationData:消息IDCorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());// ConfirmListener是当消息无法发送到Exchange被触发,此时Ack为False,这时cause包含发送失败的原因,例如exchange不存在时// 需要在系统配置文件中设置 publisher-confirms: trueif (!rabbitTemplate.isConfirmListener()) {rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info(">>>>>>> 消息id:{} 发送成功", correlationData.getId());} else {log.info(">>>>>>> 消息id:{} 发送失败", correlationData.getId());}});}// ReturnCallback 是在交换器无法将路由键路由到任何一个队列中,会触发这个方法。// 需要在系统配置文件中设置 publisher-returns: truerabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("消息id:{} 发送失败", message.getMessageProperties().getCorrelationId());});rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGES_NAME, RabbitMQConfig.ROUTING_KEY, ">>>>> Hello World", correlationId);log.info("Already sent message.");}}
测试发送消息

先启动系统启动类,消费者开始订阅,启动测试类发送消息。

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqApplicationTests {@Autowiredprivate MessageSender sender;@Testpublic void testReceiver() {sender.send();}
}

可以在消费者接收到信息,并且发送端将打出日志 成功发送消息的记录,也可以测试下 Publisher Confirms and Returns机制 主要是测试 ConfirmCallbackReturnCallback 这两个方法。

  • ConfirmCallback ,确认消息是否到达交换器,例如我们发送一个消息到一个你没有创建过的 交换器上面去,看看情况,
  • ReturnCallback,确认消息是否到达队列,我们可以这样测试,定义一个路由键,不会被任何队列订阅到,最后查看结果就可以了。

使用注解的方式

引入依赖和连接参数

跟文章第一步的配置一样的。

消费者
@Component
@Log
public class MessageReceiver {/*** 无返回消息的** @param message*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = Constant.QUEUE_NAME, durable = "true", exclusive = "false", autoDelete = "false"),exchange = @Exchange(value = Constant.EXCHANGES_NAME, ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC, autoDelete = "false"),key = Constant.ROUTING_KEY))public void receive(byte[] message) {log.info(">>>>>>>>>>> receive:" + new String(message));}/*** 设置有返回消息的* 需要注意的是,* 1. 在消息的在生产者(发送消息端)一定要使用 SendAndReceive(……) 这种带有 receive 的方法,否则会抛异常,不捕获会死循环。* 2. 该方法调用时会锁定当前线程,并且有可能会造成MQ的性能下降或者服务端/客户端出现死循环现象,请谨慎使用。** @param message* @return*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = Constant.QUEUE_NAME, durable = "true", exclusive = "false", autoDelete = "false"),exchange = @Exchange(value = Constant.EXCHANGES_NAME, ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC, autoDelete = "false"),key = Constant.ROUTING_REPLY_KEY))public String receiveAndReply(byte[] message) {log.info(">>>>>>>>>>> receive:" + new String(message));return ">>>>>>>> I got the message";}}

主要是使用到 @RabbitListener,虽然看起来参数很多,仔细的你会发现这个和写配置类里面的基本属性是一摸一样的,没有任何区别。

需要注意的是我在这里多做了个有返回值的消息,这个使用异常的话,会不断重试消息,从而阻塞了线程。而且使用它的时候只能使用带有 receive 的方法给它发送消息。

生产者

生产者没什么变化。

@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {/*** logger*/private static final Logger log = LoggerFactory.getLogger(MessageSender.class);private RabbitTemplate rabbitTemplate;/*** 注入 RabbitTemplate*/@Autowiredpublic MessageSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;this.rabbitTemplate.setConfirmCallback(this);this.rabbitTemplate.setReturnCallback(this);}/*** 测试无返回消息的*/public void send() {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(Constant.EXCHANGES_NAME, Constant.ROUTING_KEY, ">>>>>> Hello World".getBytes(), correlationData);log.info(">>>>>>>>>> Already sent message");}/*** 测试有返回消息的,需要注意一些问题*/public void sendAndReceive() {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());Object o = rabbitTemplate.convertSendAndReceive(Constant.EXCHANGES_NAME, Constant.ROUTING_REPLY_KEY, ">>>>>>>> Hello World Second".getBytes(), correlationData);log.info(">>>>>>>>>>> {}", Objects.toString(o));}/*** Confirmation callback.** @param correlationData correlation data for the callback.* @param ack             true for ack, false for nack* @param cause           An optional cause, for nack, when available, otherwise null.*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info(">>>>>>> 消息id:{} 发送成功", correlationData.getId());} else {log.info(">>>>>>> 消息id:{} 发送失败", correlationData.getId());}}/*** Returned message callback.** @param message    the returned message.* @param replyCode  the reply code.* @param replyText  the reply text.* @param exchange   the exchange.* @param routingKey the routing key.*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息id:{} 发送失败", message.getMessageProperties().getCorrelationId());}
}
测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootAnnotationApplicationTests {@Autowiredprivate MessageSender sender;@Testpublic void send() {sender.send();}@Testpublic void sendAndReceive() {sender.sendAndReceive();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/556803.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

appnode php,环境软件路径参考

[TOC]## Nginx* 主程序路径&#xff1a;/usr/sbin/nginx* 配置文件路径&#xff1a;/etc/nginx.conf* 服务控制命令&#xff1a;* CentOS 6&#xff1a;service nginx start|stop|reload|restart* CentOS 7&#xff1a;systemctl start|stop|reload|restart nginx.service## PH…

学习Spring Boot:(二十七)Spring Boot 2.0 中使用 Actuator

前言 主要是完成微服务的监控&#xff0c;完成监控治理。可以查看微服务间的数据处理和调用&#xff0c;当它们之间出现了异常&#xff0c;就可以快速定位到出现问题的地方。 springboot - version: 2.0 正文 依赖 maven 项目 在 pom.xml 文件中加入 actuator 的依赖&…

php实现注销功能,laravel 实现用户登录注销并限制功能

在项目根目录输入&#xff1a; php artisan make:controller Admin/LoginControllerphp artisan make:model Model/Admin -m运行之后 项目中会新增两个PHP文件新创建了admins用户表&#xff0c;此用户表默认新建中只有主键&#xff0c;创建时间&#xff0c;编辑时间。我们接下来…

将ipynb文件转为py的简单方法(图文并茂)

打开可以使用jupyter命令的命令窗口&#xff08;如果没有jupyter则需要先安装jupyter&#xff09;&#xff0c;cd 命令进入到 ipynb 文件所在的文件夹&#xff0c;执行 jupyter nbconvert --to script xxx.ipynb 即可完成 ipynb 文件到 py 文件的转化&#xff0c;执行 jupyter …

学习Spring Boot:(二十八)Spring Security 权限认证

前言 主要实现 Spring Security 的安全认证&#xff0c;结合 RESTful API 的风格&#xff0c;使用无状态的环境。 主要实现是通过请求的 URL &#xff0c;通过过滤器来做不同的授权策略操作&#xff0c;为该请求提供某个认证的方法&#xff0c;然后进行认证&#xff0c;授权成…

在PHP中如何要json中的数据,如何在不知道键值的情况下在php中读取JSON数据

我需要在php中读取firebase JSON URL然后显示它.我的firebase得到了以下.json数据&#xff1a;{"dDsdE4AlB7P5YYd4fWbYTQKCLPh1":{"email":"abhigmail.com","name":"abhishek"},"z1ceiLhdh9YVu7lGnVvqDWoWHFH3":{…

oracle 分区字符转换,Oracle 普通表与分区表转换

oracle 9i提供了dbms_redefinition包来实现数据库的表的在线重定义功能。在实际的应用上&#xff0c;我们可以利用这个包来进行&#xff1a;(1)堆表与分区之间进行转换。(2)重建表以减少HWM。10g能shrink&#xff0c;9i如果用move tablespace and rebuild index在move的时候会锁…

Java中使用有返回值的线程

在创建多线程程序的时候&#xff0c;我们常实现Runnable接口&#xff0c;Runnable没有返回值&#xff0c;要想获得返回值&#xff0c;Java5提供了一个新的接口Callable&#xff0c;可以获取线程中的返回值&#xff0c;但是获取线程的返回值的时候&#xff0c;需要注意&#xff…

Mybatis 插入时获取主键的方式

mybatis 作为一个主流的 ORM 框架&#xff0c;深受广大开发者的喜爱。有人的地方就有江湖&#xff0c;有代码的地方自然有坑&#xff0c;下面来说说获取 mybatis 的插入后返回的主键。 我们可以想一下自动增长的主键特性&#xff0c;在数据库里面肯定有某个地方管理 ID 的自增…

linux脚本怎么把文件地址变成动态地址,Linux脚本程序自动修改网卡配置文件中的MAC地址...

在玩Linux虚拟机的时候&#xff0c;一个安装好linux系统的virtual HDD会用于创建多个虚拟机&#xff0c;这样就不需要在创建每个虚拟机都安装一遍系统了。virtual HDD加载到虚拟机后&#xff0c;新的虚拟机的MAC地址就会和virtual系统中ifcfg-eth[0&#xff0d;9]中的MAC地址不…

MySQL + MyBatis 批量插入时存在则忽略或更新记录

一、存在时则忽略 为什么在发现重复时会忽略&#xff1f;这里面涉及到两个地方。 1、重复则忽略。那么首先是需要判断是否重复&#xff0c;这里是通过唯一索引判断是否重复的。如果表中唯一索引的字段已经存在与将要插入的记录行中唯一索引的字段值相同&#xff0c;则标识为重…

Linux 目录所属组设置,Linux系统用户与组管理命令及配置文件总结

一、Linux系统用户及组分类1、用户类别Linux系统中的用户大致可分为三类&#xff1a;root用户、系统用户、普通用户。每一个用户都拥有一个唯一的身份标识UID。2、组分类与用户信息对应的&#xff0c;Linux系统中的组也可分为三类&#xff1a;root组、系统组、普通组。每一个组…

linux 字符串 空,linux – bash空字符串/命令

你似乎把bash与其他一些编程语言混淆了.变量被替换,然后左边的内容被执行."$a"这是引号之间的a的内容. a是空的,所以这相当于&#xff1a;""那不是命令. “没有找到指令.”由于存在错误,执行不成功(shell返回代码不为0),因此命令的后半部分 – && …

@GetMapping和@PostMapping详解

首先要了解一下RequestMapping注解。 RequestMapping用于映射url到控制器类的一个特定处理程序方法。可用于方法或者类上面。也就是可以通过url找到对应的方法。 RequestMapping有8个属性。 value&#xff1a;指定请求的实际地址。 method&#xff1a;指定请求的method类型&…

实验楼 linux内核原理与分析,《Linux内核原理与分析》第一周作业 20189210

实验一 Linux系统简介这一节主要学习了Linux的历史&#xff0c;Linux有关的重要人物以及学习Linux的方法&#xff0c;Linux和Windows的区别。其中学到了LInux中的应用程序大都为开源自由的软件&#xff0c;用户可以修改定制再发布&#xff1b;内核是实现多任务运行和硬件管理的…

MySQL的INSERT INTO··· ON DUPLICATE KEY UPDATE使用的几种情况

保存或更新 在MySQL数据库中&#xff0c;如果在insert语句后面带上ON DUPLICATE KEY UPDATE 子句&#xff0c;而要插入的行与表中现有记录的惟一索引或主键中产生重复值&#xff0c;那么就会发生旧行的更新&#xff1b;如果插入的行数据与现有表中记录的唯一索引或者主键不重复…

linux svn 指定端口号,linux(Ubuntu)搭建Subversion服务器+修改svn端口号

一、搭建 Subversion 服务器1、首先需要安装 subversion 这个软件&#xff1a;sudo apt-get install subversion注&#xff1a;使用apt-get安装软件&#xff0c;ubuntu默认将软件下载到 /etc/bash_completion.d/ 目录下&#xff0c;可使用 sudo apt-get source packagename 下载…

java8 Stream API详解

文章目录一、Stream流概述二、创建Stream的方式相关API三、Stream的中间操作筛选与切片映射排序四、终止操作第一大类API&#xff08;太过简单&#xff09;第二大类AP归约收集一、Stream流概述 1、java8中有两大最为重要的改变&#xff0c;第一就是Lambda表达式&#xff0c;另…

linux实验3编写内核模块,实验2.3_内核模块_实验报告

实验报告题目: 内核模块实验1、实验目的模块是Linux系统的一种特有机制&#xff0c;可用以动态扩展操作系统内核功能。编写实现某些特定功能的模块&#xff0c;将其作为内核的一部分在管态下运行。本实验通过内核模块编程在/porc文件系统中实现系统时钟的读操作接口。2、实验内…

Java 8 Stream Api 中的 peek、map、foreach区别

#1. 前言 我在Java8 Stream中讲述了 Java 8 Stream API 的一些内容。今天再看一下peek、map、foreach区别。 2. peek peek 操作接收的是一个 Consumer 函数。顾名思义 peek 操作会按照 Consumer 函数提供的逻辑去消费流中的每一个元素&#xff0c;同时有可能改变元素内部的一…