Rabbitmq入门与应用(五)-延迟队列的设计与实现

延迟队列设计

在开发过程中涉及到延迟队列的应用,例如订单生成后有30分钟的付款时间,注册是有60秒的邮件或者短信的发送读取时间等。
常规使用rabbitmq设计延迟队列有两种方式

  1. 使用创建一个延迟队列阻塞消息
  2. 使用延迟队列插件

Dead Letter Exchanges — RabbitMQ

image-20231119180143512

image-20230619235935374

配置

  1. To set the DLX for a queue, specify the optional x-dead-letter-exchange argument when declaring the queue. The value must be an exchange name in the same virtual host:
  2. You may also specify a routing key to use when the messages are being dead-lettered. If the routing key is not set, the message’s own routing keys are used. args.put("x-dead-letter-routing-key", “some-routing-key”);
package com.wnhz.mq.common.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DlxConfig {@Beanpublic Queue dlxQueue(){return new Queue("dlx_queue_test");}@Beanpublic DirectExchange dlxExchange(){return new DirectExchange("dlx_exchange_test");}@Beanpublic Binding dlxBinding(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx_routing_key");}@Beanpublic Queue normalQueue(){Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange", "dlx_exchange_test");map.put("x-dead-letter-routing-key","dlx_routing_key");return new Queue("normal_queue_test",true,false,false,map);}@Beanpublic DirectExchange normalExchange(){return new DirectExchange("normal_exchange_test");}@Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal_routing_test");}}
server:port: 10005spring:application:name: book-consumerautoconfigure:exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure, org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfigurationrabbitmq:host: 192.168.198.130port: 5672username: adminpassword: 123publisher-confirm-type: correlatedpublisher-returns: truelistener:simple:prefetch: 1acknowledge-mode: auto
logging:level:com.wnhz.mq.consumer: debug

生产者发送信息

image-20230723161609896

    @Overridepublic void delaySendMessage() {String uuid = UUID.randomUUID().toString();CorrelationData data = new CorrelationData(uuid);String msg = "hello delay";int delayTime =5000;rabbitTemplate.convertAndSend("normal_exchange_test", "normal_routing_test", msg,p -> {p.getMessageProperties().setExpiration(String.valueOf(delayTime ));return p;});log.debug("发送一条消息{},当前时间:{},延迟{}秒", msg, new Date(), delayTime / 1000);}
}

消费者消费

   @RabbitListener(queues = "dlx_queue_test")public void delayConsume(Message message){log.debug("消费者消费信息:{},当前时间:{}",message.getBody(),new Date());}

延迟队列插件安装

访问官网

Community Plugins — RabbitMQ

image-20230619214424612

image-20230619214539126

进入rabbitmq docker容器

[root@localhost ~]# docker exec -it rabbitmq bash

查询插件列表是否存在延迟插件

root@6d2342d51b11:/plugins# rabbitmq-plugins list
root@6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern ".*" ...Configured: E = explicitly enabled; e = implicitly enabled| Status: * = running on rabbit@6d2342d51b11|/
[  ] rabbitmq_amqp1_0                  3.9.11
[  ] rabbitmq_auth_backend_cache       3.9.11
[  ] rabbitmq_auth_backend_http        3.9.11
[  ] rabbitmq_auth_backend_ldap        3.9.11
[  ] rabbitmq_auth_backend_oauth2      3.9.11
[  ] rabbitmq_auth_mechanism_ssl       3.9.11
[  ] rabbitmq_consistent_hash_exchange 3.9.11
[  ] rabbitmq_event_exchange           3.9.11
[  ] rabbitmq_federation               3.9.11
[  ] rabbitmq_federation_management    3.9.11
[  ] rabbitmq_jms_topic_exchange       3.9.11
[E*] rabbitmq_management               3.9.11
[e*] rabbitmq_management_agent         3.9.11
[  ] rabbitmq_mqtt                     3.9.11
[  ] rabbitmq_peer_discovery_aws       3.9.11
[  ] rabbitmq_peer_discovery_common    3.9.11
[  ] rabbitmq_peer_discovery_consul    3.9.11
[  ] rabbitmq_peer_discovery_etcd      3.9.11
[  ] rabbitmq_peer_discovery_k8s       3.9.11
[E*] rabbitmq_prometheus               3.9.11
[  ] rabbitmq_random_exchange          3.9.11
[  ] rabbitmq_recent_history_exchange  3.9.11
[  ] rabbitmq_sharding                 3.9.11
[  ] rabbitmq_shovel                   3.9.11
[  ] rabbitmq_shovel_management        3.9.11
[  ] rabbitmq_stomp                    3.9.11
[  ] rabbitmq_stream                   3.9.11
[  ] rabbitmq_stream_management        3.9.11
[  ] rabbitmq_top                      3.9.11
[  ] rabbitmq_tracing                  3.9.11
[  ] rabbitmq_trust_store              3.9.11
[e*] rabbitmq_web_dispatch             3.9.11
[  ] rabbitmq_web_mqtt                 3.9.11
[  ] rabbitmq_web_mqtt_examples        3.9.11
[  ] rabbitmq_web_stomp                3.9.11
[  ] rabbitmq_web_stomp_examples       3.9.11

下载支持3.9.x的插件

image-20230619215207816

退出容器:

root@6d2342d51b11:/plugins# exit
exit

上传到linux服务器

在/usr/local/software/下创建文件夹rabbitmq/plugins

[root@localhost software]# mkdir -p rabbitmq/plugins
image-20230619215427865

拷贝插件到容器中

[root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins

进入容器安装插件

[root@localhost plugins]# docker  exec -it rabbitmq bash
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

打开管理页面

进入Exchange页面,下拉Type看是否已经安装成功。

image-20230619220041631

代码实现

配置类
package com.wnhz.rabbitmq.mq.config;public interface RabbitmqConstants {String DELAYX_QUEUE = "mq_delayx__queue";String DELAYX_ROUTING_KEY = "mq_delayx_routing_key";String DELAYX_EXCHANGE = "mq_delayx__exchange";String DELAYX_EXCHANGE_TYPE = "x-delayed-message";
}
package com.wnhz.rabbitmq.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;import java.util.HashMap;@Configuration
@Slf4j
public class RabbitmqConfig {@Beanpublic Queue delayxQueue() {return new Queue(RabbitmqConstants.DELAYX_QUEUE);}@Beanpublic CustomExchange delayRoutingExchange() {return new CustomExchange(RabbitmqConstants.DELAYX_EXCHANGE,RabbitmqConstants.DELAYX_EXCHANGE_TYPE,true,false,new HashMap<String, Object>() {{put("x-delayed-type","direct");}});}@Beanpublic Binding delayxBinding() {return BindingBuilder.bind(delayxQueue()).to(delayRoutingExchange()).with(RabbitmqConstants.DELAYX_ROUTING_KEY).noargs();}@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());log.debug("rabbitmq配置:{}完成", rabbitTemplate);return rabbitTemplate;}
}
生产者
@Service
@Slf4j
public class ProduceServiceImpl implements IProduceService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendDelayxUser(User user) {int delayTime = 10000;rabbitTemplate.convertAndSend(RabbitmqConstants.DELAYX_EXCHANGE,RabbitmqConstants.DELAYX_ROUTING_KEY,user, mpp -> {mpp.getMessageProperties().setDelay(delayTime);return mpp;});log.debug("发送消息:{},发送时间:{},延迟:{}秒", user,new Date(),delayTime/1000);}
}
消费者
@Slf4j
@Service
public class ConsumeServiceImpl implements IConsumeService {@RabbitListener(queues = RabbitmqConstants.DELAYX_QUEUE)@Overridepublic void receiveDelayxUser(User user) {log.debug("消费者:接收到消息-->{},接收时间:{}",user,new Date());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/689577.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux——信号(2)

在上一张博客我们介绍了Linux中信号的概念和信号是如何产生的&#xff0c;虽然信号 有多种产生方式&#xff0c;但是最终只能由操作系统给对应进程发送特定信号。现在 我将更加规范的介绍Linux中的信号。上一章的遗留问题 我们上一章中在观察信号的默认处理的时候发现终止信号…

canal监听binlog记录业务数据的变更;canalAdmin对instance做web配置

概述 平时在开发中会通过logback打印一些开发日志&#xff0c;有时也会需要记录一些业务日志&#xff0c;简单的就直接用log记录一下&#xff0c;但是系统中需要记录日志的地方越来越多时&#xff0c;不能每个地方都写一套log记录&#xff1b; 由于平常用的大多都是mysql&…

【论文阅读笔记】Contrastive Learning with Stronger Augmentations

Contrastive Learning with Stronger Augmentations 摘要 基于提供的摘要&#xff0c;该论文的核心焦点是在对比学习领域提出的一个新框架——利用强数据增强的对比学习&#xff08;Contrastive Learning with Stronger Augmentations&#xff0c;简称CLSA&#xff09;。以下…

【Jvm】性能调优(上)线上问题排查工具汇总

文章目录 一.互联网概念1.产品闭环和业务闭环2.软件设计中的上游和下游3.JDK运行时常量池 二.CPU相关概念1.查询CPU信息2.CPU利用率&#xff08;CPU utilization&#xff09;和 CPU负载&#xff08;CPU load&#xff09;2.1.如何理解CPU负载2.2.top命令查看CPU负载均值2.3.CPU负…

Pytorch 配置 GPU 环境

1、Pytorch 深度学习跑代码的时候&#xff0c;因为简单的操作不适合cpu运行&#xff0c;我们更习惯用GPU加速代码。 本章将介绍怎么安装pytorch的gpu环境&#xff0c;以及常见的问题 关于conda的安装&#xff0c;参考之前文章&#xff1a;深度学习环境配置&#xff1a;Anaco…

初始树莓派 + VMware17 安装树莓派(Raspberry Pi 4B/5)

文章目录 树莓派入门 VMware17 安装树莓派(Raspberry Pi 4/5B)前言一、树莓派入门指南&#xff1a;从零开始探索树莓派树莓派4B和5对比 二、在VMware Workstation 17上安装树莓派4B/5操作系统&#xff1a;实现强大性能与便捷模拟工具准备开始安装树莓派1.创建一个虚拟机2. 选择…

PyCharm 取消所有断点

PyCharm 取消所有断点 1. Run -> View Breakpoints...2. Python Line Breakpoint3. Remove - DoneReferences 1. Run -> View Breakpoints… 2. Python Line Breakpoint ​​​ 3. Remove - Done References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/

【web安全】渗透测试实战思路

步骤一&#xff1a;选目标 1. 不建议太小的公司&#xff08;可能都是请别人来开发的&#xff0c;用现成成熟的框架&#xff09; 2. 不建议一线大厂&#xff1a;腾讯&#xff0c;字节&#xff0c;阿里等&#xff0c;你懂的 3. 不建议政府部门&#xff0c;安全设备多&#xff…

Spring MVC(基于 Spring4.x)基础学习

一、SpringMVC概述 二、SpringMVC的HelloWorld 三、使用RequestMapping映射请求 四、映射请求参数&请求头 五、处理模型数据 六、视图和视图解析器 七、RESTful CRUD 八、SpringMVC表单标签&处理静态资源 九、数据转换&数据格式化&数据校验 十、处理JSON:使用…

前端win10如何设置固定ip(简单明了)

1、右击这个 2、点击属性 3、双击协议版本4设置成以下就ok

原生微信小程序开发记录

1. 拿到项目 先构建 2.小程序与普通网页开发的区别 网页开发渲染线程和脚本线程是互斥的&#xff0c;这也是为什么长时间的脚本运行可能会导致页面失去响应&#xff0c;而在小程序中&#xff0c;二者是分开的&#xff0c;分别运行在不同的线程中。网页开发者可以使用到各种浏览…

【HarmonyOS】鸿蒙开发之Slider组件——第3.5章

组件应用场景: 设备音量大小&#xff0c;调节屏幕亮度等需求 slider组件内options属性简介 value&#xff1a;滑动条当前进度值。 min&#xff1a;设置滑动条设置最小值。 max&#xff1a;设置滑动条设置最大值&#xff0c;默认为 100 。 step&#xff1a;设置滑动条滑动跳动…

Python从进阶到高级—通俗易懂版

Python从进阶到高级—通俗易懂版 # # Author : Mikigo # Time : 2021/12/23 # 一、简介 Python 进阶是我一直很想写的&#xff0c;作为自己学习的记录&#xff0c;过去自己在看一些代码的时候经常会困惑&#xff0c;看不懂&#xff0c;然后自己去查资料、看书籍&#xff0…

JAVA之HashMap详解

HashMap 1. 设计原理 HashMap 基于哈希表的 Map 接口实现&#xff0c;是以 key-value 存储形式存在&#xff0c;即主要用来存放键值对。HashMap 的实现不是同步的&#xff0c;这意味着它不是线程安全的。它的 key、value 都可以为 null&#xff0c;此外&#xff0c;HashMap 中…

appium实现自动化测试原理

目录 1、Appium原理 1.1、Android Appium原理图文解析 1.1.2、原理详解 1.1.2.1、脚本端 1.1.2.2、appium-server 1.1.2.3、中间件bootstrap.jar 1.1.2.4、驱动引擎uiautomator 1.2、 IOS Appium原理 1、Appium原理 1.1、Android Appium原理图文解析 执行测试脚本全过…

C#,二进制数的按位交换(Bits swap)的算法与源代码

数字在指定位置指定位数的交换是常见算法。 1 源程序 using System; using System.Text; using System.Collections; using System.Collections.Generic; namespace Legalsoft.Truffer.Algorithm { public static partial class Algorithm_Gallery { /// <…

专业140+总分420+南京信息工程大学811信号与系统考研经验南信大电子信息与通信工程,真题,大纲,参考书

今年顺利被南信大电子信息录取&#xff0c;初试420&#xff0c;专业811信号与系统140&#xff08;Jenny老师辅导班上140很多&#xff0c;真是大佬云集&#xff09;&#xff0c;今年应该是南信大电子信息最卷的一年&#xff0c;复试线比往年提高了很多&#xff0c;录取平均分380…

扭蛋机小程序开发:发展优势

商场中精美的扭蛋机一直都是年轻人的心头好&#xff0c;目前&#xff0c;扭蛋机商品也不在局限于各种小型玩具&#xff0c;也逐渐与各类热门IP合作&#xff0c;打造出了各类手办、周边等&#xff0c;深受各个年龄层的喜爱。 如今&#xff0c;扭蛋机在互联网的推动下&#xff0…

算法的基本概念

设么是算法&#xff1f; 什么是好的算法/ 什么是算法&#xff1a; 量水的问题&#xff1a; 方案如下&#xff1a;&#xff08;核心思路就是两个桶差值为2&#xff0c;两次差值为4&#xff0c;7-(5-4) 6&#xff09; 算法&#xff1a;准确描述的 “操作步骤 (问题求解步骤)”&…

数据结构中图的概念以及遍历算法的实现

在数据结构中&#xff0c;图&#xff08;Graph&#xff09;是由节点&#xff08;Vertex&#xff09;和连接节点的边&#xff08;Edge&#xff09;组成的一种非线性数据结构。图可以用来表示各种实际问题中的关系和连接&#xff0c;如社交网络、道路网络、电路等。 图由两个主要…