Spring Boot(十四)RabbitMQ延迟队列

一、前言

延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单;2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度;3.过1分钟给新注册会员的用户,发送注册邮件等。

实现延迟队列的方式有两种:

  1. 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;
  2. 使用rabbitmq-delayed-message-exchange插件实现延迟功能;

注意: 延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依赖Erlang/OPT 18.0及以上运行环境。

由于使用死信交换器相对曲折,本文重点介绍第二种方式,使用rabbitmq-delayed-message-exchange插件完成延迟队列的功能。

二、安装延迟插件

1.1 下载插件

打开官网下载:http://www.rabbitmq.com/community-plugins.html

选择相应的对应的版本“3.7.x”点击下载。

注意: 下载的是.zip的安装包,下载完之后需要手动解压。

1.2 安装插件

拷贝插件到Docker:

docker cp D:\rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbit:/plugins

RabbitMQ在Docker的安装,请参照本系列的上一篇文章:http://www.apigo.cn/2018/09/11/springboot13/

1.3 启动插件

进入docker内部:

docker exec -it rabbit /bin/bash

开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查询安装的所有插件:

rabbitmq-plugins list

安装正常,效果如下图:

重启RabbitMQ,使插件生效

docker restart rabbit

三、代码实现

3.1 配置队列

import com.example.rabbitmq.mq.DirectConfig;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class DelayedConfig {final static String QUEUE_NAME = "delayed.goods.order";final static String EXCHANGE_NAME = "delayedec";@Beanpublic Queue queue() {return new Queue(DelayedConfig.QUEUE_NAME);}// 配置默认的交换机@BeanCustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");//参数二为类型:必须是x-delayed-messagereturn new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 绑定队列到交换器@BeanBinding binding(Queue queue, CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();}
}

3.2 发送消息

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("发送时间:" + sf.format(new Date()));rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", 3000);return message;}});}
}

3.3 消费消息

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
@RabbitListener(queues = "delayed.goods.order")
public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收时间:" + sdf.format(new Date()));System.out.println("消息内容:" + msg);}
}

3.4 测试队列

import com.example.rabbitmq.RabbitmqApplication;
import com.example.rabbitmq.mq.delayed.DelayedSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.text.SimpleDateFormat;
import java.util.Date;@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayedTest {@Autowiredprivate DelayedSender sender;@Testpublic void Test() throws InterruptedException {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.send("Hi Admin.");Thread.sleep(5 * 1000); //等待接收程序执行之后,再退出测试}
}

执行结果如下:

发送时间:2018-09-11 20:47:51
接收时间:2018-09-11 20:47:54
消息内容:Hi Admin.

完整代码访问我的GitHub:https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq

四、总结

到此为止我们已经使用“rabbitmq-delayed-message-exchange”插件实现了延迟功能,但是需要注意的一点是,如果使用命令“rabbitmq-plugins disable rabbitmq_delayed_message_exchange”禁用了延迟插件,那么所有未发送的延迟消息都将丢失。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/546842.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

bzoj 2121 DP

首先如果我们能处理出来i,j段能不能消掉&#xff0c;这样就可以直接dp转移了&#xff0c;设w[i]为前i为最少剩下多少&#xff0c;那么w[i]w[j-1] (flag[j][i])。 现在我们来求flag[i][j]&#xff0c;首先我们可以把字符串组建立trie然后处理在串L中从left位置开始的所有的flag&…

三、Win10 64位PyCharm下打包.py程序为可执行exe文件且兼容32位和64位

WIN10 64位下Pycharm打包.py程序为可执行文件exe 上面衔接WIN10 64位下Pycharm打包.py程序为可执行文件exe,存在不兼容32位和64位的情况。 下面Win10 64位PyCharm下打包.py程序为可执行exe文件且兼容32位和64位说明: 前提条件 python3.8.2 32 位;注意:原来有 64 位 Pyth…

Java核心(一)线程Thread详解

一、概述 在开始学习Thread之前&#xff0c;我们先来了解一下 线程和进程之间的关系&#xff1a; 线程(Thread)是进程的一个实体&#xff0c;是CPU调度和分派的基本单位。 线程不能够独立执行&#xff0c;必须依存在应用程序中&#xff0c;由应用程序提供多个线程执行控制。 线…

Jetson Nano配置与使用(5)cuda测试及tensorflow gpu安装

Jetson Nano利用官方镜像进行安装后&#xff0c;系统已经安装好了JetPack&#xff0c;cuda&#xff0c;cudaa&#xff0c;OpenCV等组件&#xff0c;不过需要修改下环境变量才可以使用。 1.修改环境变量 利用vim打开 ~ 路径下.bashrc文件&#xff1a; sudo vi ~./bashrc文件的…

工作方法

刚入职场的年轻人&#xff0c;总不喜欢写工作汇报&#xff0c;想来有如下原因:觉得每天都在做同样的事情&#xff0c;没有多少有趣新鲜的素材好写觉得这是领导对自己的监控&#xff0c;写了会有不少工作疏漏落在领导手上不太知道如何将本日工作进行总结其实每日工作汇报是非常重…

Python弹窗提示警告框MessageBox

需要安装pywin32模块&#xff0c;pip install pywin32 # pip install pywin32 import win32api import win32con# 提醒OK消息框 win32api.MessageBox(0, "这是一个测试提醒OK消息框", "提醒",win32con.MB_OK)# 是否信息框 win32api.MessageBox(0, "这…

一次失败的蛋疼的设计

需求&#xff1a;当一个用户上传一条记录之后&#xff0c;通知某一个组或者某几个组的用户查看。用户可以属于多个组。 分析&#xff1a;当用户登录之后&#xff0c;判断自己所在的组是否属于通知组&#xff0c;是&#xff0c;则提醒。 SQL&#xff1a; select * from newsGro…

Java核心(二)深入理解线程池ThreadPool

本文你将获得以下信息&#xff1a; 线程池源码解读线程池执行流程分析带返回值的线程池实现延迟线程池实现 为了方便读者理解&#xff0c;本文会由浅入深&#xff0c;先从线程池的使用开始再延伸到源码解读和源码分析等高级内容&#xff0c;读者可根据自己的情况自主选择阅读…

XCOPY不是内部或外部命令,也不是可运行程序 修复

系统常用命令无法识别 解决 1.进入系统安装目录的system32中&#xff0c;一般目录为C:\Windows\System32&#xff0c;找一下可执行文件是否存在&#xff0c;是否可以运行&#xff08;如ipconfig&#xff0c;直接点击会出现一个命令行窗口&#xff0c;一闪而逝&#xff09;&…

Jetson Nano安装pytorch 基于torch1.6和torchvision0.7

需要注意的是&#xff0c;博主使用的是win10主机&#xff0c;通过局域网连接的jetson nano&#xff0c; 其中jetson nano的预制CUDA版本为10.2 Jetpack 4.1.1 分别执行以下命令&#xff0c;即可查看自己的jetson nano 预搭载的CUDA版本 sudo pip3 install jetson-stats sudo …

SEO之基础篇(一)

1、关键字的竞争你能应对吗&#xff1f;那些每索20W次的关键字你能做到吗&#xff1f;因此我们选择关键字要适合自己的。你是单枪匹马还是团队&#xff0c;要考虑清楚。 2、找到适合自己的关键字。新站长最好选择1~2个关键字&#xff0c;切忌不要太多。等网站流量上去了后再…

Python获取硬件信息(硬盘序列号,CPU序列号)

原文衔接 https://www.cnblogs.com/blog-rui/p/12108072.html pip install wmi pip install pywin32import wmic wmi.WMI()# # 硬盘序列号 for physical_disk in c.Win32_DiskDrive():print(physical_disk.SerialNumber)# CPU序列号 for cpu in c.Win32_Processor():print(cp…

Java核心(三)并发中的线程同步与锁

乐观锁、悲观锁、公平锁、自旋锁、偏向锁、轻量级锁、重量级锁、锁膨胀…难理解&#xff1f;不存的&#xff01;来&#xff0c;话不多说&#xff0c;带你飙车。 上一篇介绍了线程池的使用&#xff0c;在享受线程池带给我们的性能优势之外&#xff0c;似乎也带来了另一个问题&a…

【Jetson-Nano】2.Tensorflow和Pytorch的安装

文章目录 1、Tensorflow多版本安装 1.1 Protobuf 安装1.2 安装依赖包及tensorflow1.151.3 安装其它常用库1.4 测试python包是否安装成功1.5 TensorRT和Opencv的安装1.6 pycuda和onnx安装1.7 Tensorflow2.3安装2、Pytorch安装 2.1 安装pytroch和torchvision2.2 安装环境验证参考…

Spring Boot 终极清单

一、Spring Boot 终极清单诞生原因我上学那会主要学的是 Java 和 .Net 两种语言&#xff0c;当时对于语言分类这事儿没什么概念&#xff0c;恰好在2009年毕业那会阴差阳错的先找到了 .Net 的工作&#xff0c;此后就开始了漫长的 .Net 编程之旅&#xff0c;说实话最初的“编程思…

Python SHA1加密算法

SHA1 加密 SHA1的全称是Secure Hash Algorithm(安全哈希算法) 。SHA1基于MD5&#xff0c;加密后的数据长度更长&#xff0c; 它对长度小于264的输入&#xff0c;产生长度为160bit的散列值。比MD5多32位。 因此&#xff0c;比MD5更加安全&#xff0c;但SHA1的运算速度就比MD5…

简单的喷淋实验--嵌入式实训

目录 喷淋实验--嵌入式实训 1.MQTT通信原理 2.MQTT库的移植 3.代码流程 运行视频如下: 喷淋实验--嵌入式实训 1.MQTT通信原理 MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;是一种轻量级的发布/订阅消息传输协议&#xff0c;旨在提供可靠、高效的通信…

Yolov5系列AI常见数据集(1)车辆,行人,自动驾驶,人脸,烟雾

下述所有数据可在下方二维码公众号回复&#xff1a; 数据大礼包 获得&#xff01;&#xff01;&#xff01; Fashion-MNIST图像数据集&#xff08;200.4MB&#xff09; 每个训练和测试样本都按照以下类别进行了标注&#xff1a; 标注编号描述0T-shirt/top&#xff08;T恤&…

Java核心(四)你不知道的数据集合

导读&#xff1a;Map竟然不属于Java集合框架的子集&#xff1f;队列也和List一样属于集合的三大子集之一&#xff1f;更有队列的正确使用姿势&#xff0c;一起来看吧&#xff01; Java中的集合通常指的是Collection下的三个集合框架List、Set、Queue和Map集合&#xff0c;Map并…

[PyQt5]点击主窗口弹出另一个窗口

[PyQt5]点击主窗口弹出另一个窗口