spring boot rabbitmq_Spring Boot+RabbitMQ 实现延迟消息实现完整版,实用!

本文同步Java知音社区,专注于Java

作者:Sam哥哥http://blog.csdn.net/linsongbin1/article/details/80178122

概述

曾经去网易面试的时候,面试官问了我一个问题,说

下完订单后,如果用户未支付,需要取消订单,可以怎么做

我当时的回答是,用定时任务扫描DB表即可。面试官不是很满意,提出:

用定时任务无法做到准实时通知,有没有其他办法?

我当时的回答是:

可以用队列,订单下完后,发送一个消息到队列里,并指定过期时间,时间一到,执行回调接口。

面试官听完后,就不再问了。其实我当时的思路是对的,只不过讲的不是很专业而已。专业说法是利用延迟消息。

其实用定时任务,确实有点问题,原本业务系统希望10分钟后,如果订单未支付,就马上取消订单,并释放商品库存。但是一旦数据量大的话,就会加长获取未支付订单数据的时间,部分订单就做不到10分钟后取消了,可能是15分钟,20分钟之类的。这样的话,库存就无法及时得到释放,也就会影响成单数。而利用延迟消息,则理论上是可以做到按照设定的时间,进行订单取消操作的。

目前网上关于使用RabbitMQ实现延迟消息的文章,大多都是讲如何利用RabbitMQ的死信队列来实现,实现方案看起来都很繁琐复杂,并且还是使用原始的RabbitMQ Client API来实现的,更加显得啰嗦。更多springboot整合实战内容,可以在Java知音公众号回复“springboot内容聚合”

Spring Boot 已经对RabbitMQ Client API进行了包装,使用起来简洁很多,下面详细介绍一下如何利用rabbitmq_delayed_message_exchange 插件和Spring Boot来实现延迟消息。

软件准备

erlang

请参考Win10下安装erlang

https://blog.csdn.net/linsongbin1/article/details/80170487

本文使用的版本是:

  • Erlang 20.3

RabbitMQ

请参考win10下安装rabbitmq

https://blog.csdn.net/linsongbin1/article/details/80170567

本文使用的是window版本的RabbitMQ,版本号是:

  • 3.7.4

rabbitmq_delayed_message_exchange插件

插件下载地址:

http://www.rabbitmq.com/community-plugins.html

打开网址后,ctrl + f,搜索rabbitmq_delayed_message_exchange。

v2-30e8d48e5657842ff83d33faa7ea76b3_b.jpg

千万记住,一定选好版本号,由于我使用的是RabbitMQ 3.7.4,因此对应的rabbitmq_delayed_message_exchange插件也必须选择3.7.x的。

如果没有选对版本,在使用延迟消息的时候,会遇到各种各样的奇葩问题,而且网上还找不到解决方案。我因为这个问题,折腾了整整一个晚上。请牢记,要选对插件版本。

下载完插件后,将其放置到RabbitMQ安装目录下的plugins目录下,并使用如下命令启动这个插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如果启动成功会出现如下信息:

The following plugins have been enabled: rabbitmq_delayed_message_exchange

启动插件成功后,记得重启一下RabbitMQ,让其生效。

集成RabbitMQ

这个就非常简单了,直接在maven工程的pom.xml文件中加入

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Spring Boot的版本我使用的是2.0.1.RELEASE.

接下来在application.properties文件中加入redis配置:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

定义ConnectionFactory和RabbitTemplate

也很简单,代码如下:

package com.mq.rabbitmq;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {private String host;private int port;private String userName;private String password;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);cachingConnectionFactory.setUsername(userName);cachingConnectionFactory.setPassword(password);cachingConnectionFactory.setVirtualHost("/");cachingConnectionFactory.setPublisherConfirms(true);return cachingConnectionFactory;}@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());return rabbitTemplate;}public String getHost() {return host;}public void setHost(String host) {this.host = host;}public int getPort() {return port;}public void setPort(int port) {this.port = port;}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}
}

Exchange和Queue配置

package com.mq.rabbitmq;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class QueueConfig {@Beanpublic CustomExchange delayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("test_exchange", "x-delayed-message",true, false,args);}@Beanpublic Queue queue() {Queue queue = new Queue("test_queue_1", true);return queue;}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs();}
}

这里要特别注意的是,使用的是CustomExchange,不是DirectExchange,另外CustomExchange的类型必须是x-delayed-message。

实现消息发送

package com.mq.rabbitmq;import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.text.SimpleDateFormat;
import java.util.Date;@Service
public class MessageServiceImpl {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String queueName,String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息发送时间:"+sdf.format(new Date()));rabbitTemplate.convertAndSend("test_exchange", queueName, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay",3000);return message;}});}
}

注意在发送的时候,必须加上一个header

  • x-delay

在这里我设置的延迟时间是3秒。

消息消费者

package com.mq.rabbitmq;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;@Component
public class MessageReceiver {@RabbitListener(queues = "test_queue_1")public void receive(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息接收时间:"+sdf.format(new Date()));System.out.println("接收到的消息:"+msg);}
}

运行Spring Boot程序和发送消息

直接在main方法里运行Spring Boot程序,Spring Boot会自动解析MessageReceiver类的。

接下来只需要用Junit运行一下发送消息的接口即可。

package com.mq.rabbitmq;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {@Autowiredprivate MessageServiceImpl messageService;@Testpublic void send() {messageService.sendMsg("test_queue_1","hello i am delay msg");}}

运行完后,可以看到如下信息:

消息发送时间:2018-05-03 12:44:53

3秒钟后,Spring Boot控制台会输出:

消息接收时间:2018-05-03 12:44:56
接收到的消息:hello i am delay msg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/335329.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jax-rs jax-ws_信守承诺:针对JAX-RS API的基于合同的测试

jax-rs jax-ws自从我们谈论测试和应用有效的TDD做法以来&#xff0c;已经有一段时间了&#xff0c;特别是与REST&#xff08;ful&#xff09; Web服务和API有关的做法。 但是&#xff0c;这个主题永远都不应忘记&#xff0c;特别是在每个人都在做微服务的世界中&#xff0c;无论…

手机流量共享 linux,linux – 通过HTB共享带宽和优先处理实时流量,哪种方案更好?...

我想在我们的互联网线路上添加一些流量管理.在阅读了大量文档之后,我认为HFSC对我来说太复杂了(我不了解所有曲线的东西,我担心我永远不会把它弄好),CBQ不推荐,基本上HTB就是通往适合大多数人.我们的内部网络有三个“段”,我想在这些段之间或多或少地分享带宽(至少在开始时).此…

负载均衡解决方案

某网站随着知名度的提高&#xff0c;用户访问量日渐增多&#xff0c;近段时间以来&#xff0c;由于访问量的激增&#xff0c;服务不可用的情况时有发生。针对这种状况&#xff0c;结合实际情况&#xff0c;设计了一套解决方案。 技术实现 1、负载均衡。2台同样配置的linux服务…

pcf8523_PCF上的Spring Cloud合同和Spring Cloud Services

pcf8523最近&#xff0c;我们有一个客户&#xff0c;对于使用Spring Cloud Contract &#xff08;SCC&#xff09;来防止微服务团队之间的API“漂移”&#xff0c;微型开发团队会照顾个体的API&#xff08;构成企业应用程序的一部分&#xff09;&#xff0c;这些客户非常感兴趣…

python二分法查找时间点_python有序查找算法:二分法

二分法是一种快速查找的方法&#xff0c;时间复杂度低&#xff0c;逻辑简单易懂&#xff0c;总的来说就是不断的除以2除以2... 但是需要注意&#xff1a;待查找的序列区间单调有序 例如需要查找有序数组arr里面的某个关键字key的位置&#xff0c;那么首先确认arr的中位数或者中…

linux shell ls -l,linux之ls -l|grep ^-|wc -l命令 Shell 中常見的日志統計方法

轉&#xff1a;http://www.cnblogs.com/senior-engineer/p/6203268.htmlShell 中常見的日志統計方法https://my.oschina.net/waterbear/blog/371845Linux Shell工具grep awk cut sort uniq sort 使用小結http://www.linuxidc.com/Linux/2012-05/61126.htm查看某文件夾下文件的個…

怎么运行aws的示例程序_使Spring Boot应用程序在AWS上无服务器运行

怎么运行aws的示例程序在之前的 几篇 文章中&#xff0c;我描述了如何设置Spring Boot应用程序并在AWS Elastic Beanstalk上运行它。 尽管这是从物理服务器到云服务器的重要一步&#xff0c;但还有更好的可能&#xff01; 走向无服务器 。 这意味着无需花费任何服务器费用&…

自己写的 ORACLE 函数的解读

DECLARE MAXWRITNO INTEGER ; <!--声明了2个变量&#xff0c;变量类型是INTEGER-->LINETY INTEGER; BEGIN <!--表示进入方法体-->----------获取要循环的结果集&#xff0c;APPLINPER班线许可申请表FOR REC IN (select * from $PRDLINE.APPLINPER a where (A.APPP…

python调用linux命令输出结果,Python-运行shell命令并捕获输出

小编典典这个问题的答案取决于你使用的Python版本。最简单的方法是使用以下subprocess.check_output功能&#xff1a;>>> subprocess.check_output([ls, -l])btotal 0\n-rw-r--r-- 1 memyself staff 0 Mar 14 11:04 files\n该check_output功能适用于仍在广泛使用的几乎…

python cls_关于python中的self,ins , cls的解释

关于python中的self,ins,cls的解释参考下面两篇博文self比较好理解&#xff0c;self指的是类实例对象本身(注意&#xff1a;不是类本身) class Person: def _init_(self,name): self.namename def sayhello(self): print My name is:,self.name pPerson(Tom) print p 为什么不是…

脚本语言和工程语言_语言工程中有趣的事情

脚本语言和工程语言如果您阅读此博客&#xff0c;您将知道我坚信语言的力量。 所以&#xff0c;我当然有很大的偏见&#xff0c;但是我感觉语言工程社区正在增长&#xff0c;并且越来越有趣的东西正在涌现。 为此&#xff0c;我认为通过查看社区中正在发生的事情并列出一些我发…

linux useradd 数字,详解linux useradd用户组合权限管理等

1&#xff0c;权限相关概念Rwx任何一个文件都应该由两部分组成&#xff0c;这两部分其实基于文件系统来组织&#xff0c;磁盘分区创建完成后&#xff0c;在高级格式化的时候&#xff0c;就把整个磁盘分区分成两部分&#xff0c;其中一部分是源数据&#xff0c;一部分是来放数据…

windows上的python能否在unix上使用_怎么用python在Windows系统下,生成UNIX格式文件

在Windows下换行时&#xff0c;有两个字符&#xff1a;回车(/r)和换行(/n)。但在Linux下&#xff0c;只有一个换行(/n)可使用unix2dos和dos2unix命令进行格式的转换&#xff1a; 参数&#xff1a; -k 保持输出文件和输入文件的日期时间戳不变 -o file 默认模式 . 将file转换&am…

.dmp数据文件的解读

通过PL/SQL Developer导出表数据&#xff0c;生成后缀名为.dmp的文件&#xff0c;如果你以某个用户例如&#xff1a;HYT2LINEHN访问数据库&#xff0c;那么你可以将这个用户下的所有表导成dmp文件&#xff0c;那么这个dmp文件里就包含了这个用户下的所有表的数据&#xff0c;当…

junit jndi_使用Spring创建用于JUnit测试的JNDI资源

junit jndi直到最近&#xff0c;我还使用静态方法来设置内存数据库&#xff08;HSQLDB&#xff09;。 我在JUnit测试的setUp / tearDown中调用了这些方法。 当我使用Spring时&#xff0c;这对我来说总是有点不自然&#xff0c;并且所有内容都应在其应用程序上下文中运行。 创建…

c语言程序经过编译以后生成的文件名的后缀为,c语言源文件经过编译后生成文件的后缀是什么...

c语言源文件经过编译后生成文件的后缀是什么c语言源文件经过编译后生成文件的后缀是“.obj”。C语言源程序经过编译程序编译之后&#xff0c;生成一个后缀为“.obj”的文件&#xff0c;最后由称为“连接程序”的软件&#xff0c;把此“.obj”文件与各种库函数连接在一起&#x…

Java中获取系统日期时间/系统时间

int y,m,d,h,mi,s;Calendar calCalendar.getInstance();ycal.get(Calendar.YEAR);mcal.get(Calendar.MONTH) 1;dcal.get(Calendar.DATE);hcal.get(Calendar.HOUR_OF_DAY);mical.get(Calendar.MINUTE);scal.get(Calendar.SECOND);System.out.println("现在时刻是"y&q…

python如何读取csv文件列表页_每25行读取一个csv文件,并使用python传递到列表

我想读取一个文件,并将该文件的每25行转换为一个列表,也就是说,它应该有4个列表,每个列表中包含25个项目(对于一个文件的100行)。我无法获得这个问题的代码。 输入文件看起来像这样,实际上它有100行: {PutRequest: {Item: {id: {S: E1DBEAE3}, value: {M: {result: {N: u0.0015…

Infinispan版本已映射到最低Java版本

我最近一直在与Infinispan交流 &#xff0c;我注意到这种“分布式内存键/值数据存储”的特征之一是它相对积极地被采用&#xff0c;甚至要求更高版本的Java。 根据参考的Infinispan文档 &#xff0c;以下内容将Infinispan发行版映射到最低Java SE版本。 Infinispan版本及其最…

通过PL/SQL developer工具访问远程的Oracle数据库_访问数据库_连接数据库_登录数据库

文章目录工具简介电脑没有安装 Oracle 数据库电脑安装了 Oracle 数据库工具简介 PL/SQL Developer 是 Oracle 数据库开发工具&#xff0c;PL/SQL Developer 功能很强大&#xff0c;可以做为集成调试器&#xff0c;有 SQL 窗口&#xff0c;命令窗口&#xff0c;对象浏览器和性能…