Rabbitmq入门与应用(五)-延迟队列的设计与实现

延迟队列设计

在开发过程中涉及到延迟队列的应用,例如订单生成后有30分钟的付款时间,注册是有60秒的邮件或者短信的发送读取时间等。
常规使用rabbitmq设计延迟队列有两种方式

  1. 使用创建一个延迟队列阻塞消息
  2. 使用延迟队列插件

Dead Letter Exchanges — RabbitMQ

image-20231119180143512

image-20230619235935374

配置

  1. To set the DLX for a queue, specify the optional x-dead-letter-exchange argument when declaring the queue. The value must be an exchange name in the same virtual host:
  2. You may also specify a routing key to use when the messages are being dead-lettered. If the routing key is not set, the message’s own routing keys are used. args.put("x-dead-letter-routing-key", “some-routing-key”);
package com.wnhz.mq.common.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DlxConfig {@Beanpublic Queue dlxQueue(){return new Queue("dlx_queue_test");}@Beanpublic DirectExchange dlxExchange(){return new DirectExchange("dlx_exchange_test");}@Beanpublic Binding dlxBinding(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx_routing_key");}@Beanpublic Queue normalQueue(){Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange", "dlx_exchange_test");map.put("x-dead-letter-routing-key","dlx_routing_key");return new Queue("normal_queue_test",true,false,false,map);}@Beanpublic DirectExchange normalExchange(){return new DirectExchange("normal_exchange_test");}@Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("normal_routing_test");}}
server:port: 10005spring:application:name: book-consumerautoconfigure:exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure, org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfigurationrabbitmq:host: 192.168.198.130port: 5672username: adminpassword: 123publisher-confirm-type: correlatedpublisher-returns: truelistener:simple:prefetch: 1acknowledge-mode: auto
logging:level:com.wnhz.mq.consumer: debug

生产者发送信息

image-20230723161609896

    @Overridepublic void delaySendMessage() {String uuid = UUID.randomUUID().toString();CorrelationData data = new CorrelationData(uuid);String msg = "hello delay";int delayTime =5000;rabbitTemplate.convertAndSend("normal_exchange_test", "normal_routing_test", msg,p -> {p.getMessageProperties().setExpiration(String.valueOf(delayTime ));return p;});log.debug("发送一条消息{},当前时间:{},延迟{}秒", msg, new Date(), delayTime / 1000);}
}

消费者消费

   @RabbitListener(queues = "dlx_queue_test")public void delayConsume(Message message){log.debug("消费者消费信息:{},当前时间:{}",message.getBody(),new Date());}

延迟队列插件安装

访问官网

Community Plugins — RabbitMQ

image-20230619214424612

image-20230619214539126

进入rabbitmq docker容器

[root@localhost ~]# docker exec -it rabbitmq bash

查询插件列表是否存在延迟插件

root@6d2342d51b11:/plugins# rabbitmq-plugins list
root@6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern ".*" ...Configured: E = explicitly enabled; e = implicitly enabled| Status: * = running on rabbit@6d2342d51b11|/
[  ] rabbitmq_amqp1_0                  3.9.11
[  ] rabbitmq_auth_backend_cache       3.9.11
[  ] rabbitmq_auth_backend_http        3.9.11
[  ] rabbitmq_auth_backend_ldap        3.9.11
[  ] rabbitmq_auth_backend_oauth2      3.9.11
[  ] rabbitmq_auth_mechanism_ssl       3.9.11
[  ] rabbitmq_consistent_hash_exchange 3.9.11
[  ] rabbitmq_event_exchange           3.9.11
[  ] rabbitmq_federation               3.9.11
[  ] rabbitmq_federation_management    3.9.11
[  ] rabbitmq_jms_topic_exchange       3.9.11
[E*] rabbitmq_management               3.9.11
[e*] rabbitmq_management_agent         3.9.11
[  ] rabbitmq_mqtt                     3.9.11
[  ] rabbitmq_peer_discovery_aws       3.9.11
[  ] rabbitmq_peer_discovery_common    3.9.11
[  ] rabbitmq_peer_discovery_consul    3.9.11
[  ] rabbitmq_peer_discovery_etcd      3.9.11
[  ] rabbitmq_peer_discovery_k8s       3.9.11
[E*] rabbitmq_prometheus               3.9.11
[  ] rabbitmq_random_exchange          3.9.11
[  ] rabbitmq_recent_history_exchange  3.9.11
[  ] rabbitmq_sharding                 3.9.11
[  ] rabbitmq_shovel                   3.9.11
[  ] rabbitmq_shovel_management        3.9.11
[  ] rabbitmq_stomp                    3.9.11
[  ] rabbitmq_stream                   3.9.11
[  ] rabbitmq_stream_management        3.9.11
[  ] rabbitmq_top                      3.9.11
[  ] rabbitmq_tracing                  3.9.11
[  ] rabbitmq_trust_store              3.9.11
[e*] rabbitmq_web_dispatch             3.9.11
[  ] rabbitmq_web_mqtt                 3.9.11
[  ] rabbitmq_web_mqtt_examples        3.9.11
[  ] rabbitmq_web_stomp                3.9.11
[  ] rabbitmq_web_stomp_examples       3.9.11

下载支持3.9.x的插件

image-20230619215207816

退出容器:

root@6d2342d51b11:/plugins# exit
exit

上传到linux服务器

在/usr/local/software/下创建文件夹rabbitmq/plugins

[root@localhost software]# mkdir -p rabbitmq/plugins
image-20230619215427865

拷贝插件到容器中

[root@localhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins

进入容器安装插件

[root@localhost plugins]# docker  exec -it rabbitmq bash
root@6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

打开管理页面

进入Exchange页面,下拉Type看是否已经安装成功。

image-20230619220041631

代码实现

配置类
package com.wnhz.rabbitmq.mq.config;public interface RabbitmqConstants {String DELAYX_QUEUE = "mq_delayx__queue";String DELAYX_ROUTING_KEY = "mq_delayx_routing_key";String DELAYX_EXCHANGE = "mq_delayx__exchange";String DELAYX_EXCHANGE_TYPE = "x-delayed-message";
}
package com.wnhz.rabbitmq.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.MessageConverter;import java.util.HashMap;@Configuration
@Slf4j
public class RabbitmqConfig {@Beanpublic Queue delayxQueue() {return new Queue(RabbitmqConstants.DELAYX_QUEUE);}@Beanpublic CustomExchange delayRoutingExchange() {return new CustomExchange(RabbitmqConstants.DELAYX_EXCHANGE,RabbitmqConstants.DELAYX_EXCHANGE_TYPE,true,false,new HashMap<String, Object>() {{put("x-delayed-type","direct");}});}@Beanpublic Binding delayxBinding() {return BindingBuilder.bind(delayxQueue()).to(delayRoutingExchange()).with(RabbitmqConstants.DELAYX_ROUTING_KEY).noargs();}@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());log.debug("rabbitmq配置:{}完成", rabbitTemplate);return rabbitTemplate;}
}
生产者
@Service
@Slf4j
public class ProduceServiceImpl implements IProduceService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Overridepublic void sendDelayxUser(User user) {int delayTime = 10000;rabbitTemplate.convertAndSend(RabbitmqConstants.DELAYX_EXCHANGE,RabbitmqConstants.DELAYX_ROUTING_KEY,user, mpp -> {mpp.getMessageProperties().setDelay(delayTime);return mpp;});log.debug("发送消息:{},发送时间:{},延迟:{}秒", user,new Date(),delayTime/1000);}
}
消费者
@Slf4j
@Service
public class ConsumeServiceImpl implements IConsumeService {@RabbitListener(queues = RabbitmqConstants.DELAYX_QUEUE)@Overridepublic void receiveDelayxUser(User user) {log.debug("消费者:接收到消息-->{},接收时间:{}",user,new Date());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/689577.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

chatgpt的大致技术原理

当然可以&#xff0c;让我们从技术层面更详细地探讨一下ChatGPT的工作原理。 数据收集与预处理&#xff1a; 数据收集&#xff1a;ChatGPT首先会从各种来源&#xff08;如网页、新闻、书籍等&#xff09;收集大量的文本数据。这些数据为模型提供了丰富的语言模式和表达方式&a…

Linux——信号(2)

在上一张博客我们介绍了Linux中信号的概念和信号是如何产生的&#xff0c;虽然信号 有多种产生方式&#xff0c;但是最终只能由操作系统给对应进程发送特定信号。现在 我将更加规范的介绍Linux中的信号。上一章的遗留问题 我们上一章中在观察信号的默认处理的时候发现终止信号…

《数学建模》专栏导读

文章分类 相关概念入门快速建模相关混合整数线性规划&#xff08;MILP&#xff09;加速技巧数值问题探讨相关问题解决技巧 相关概念入门 文章相关概念离散优化模型的松弛模型线性松弛问题混合整数线性规划MILP问题中增添约束的影响约束的影响 快速建模相关 文章求解器涉及步…

canal监听binlog记录业务数据的变更;canalAdmin对instance做web配置

概述 平时在开发中会通过logback打印一些开发日志&#xff0c;有时也会需要记录一些业务日志&#xff0c;简单的就直接用log记录一下&#xff0c;但是系统中需要记录日志的地方越来越多时&#xff0c;不能每个地方都写一套log记录&#xff1b; 由于平常用的大多都是mysql&…

JavaScript:JSON、三种包装类

JOSN: 我们希望可以将一个对象在不同的语言中进行传递&#xff0c; 以达到通信的目的&#xff0c;最佳方式就是将一个对象转换为字符串的形式 JSON&#xff08;JavaScript Object Notation&#xff09; - JS的对象表示法 - JSON实际上就是一个字符串&#xff0c;它的语法格…

【论文阅读笔记】Contrastive Learning with Stronger Augmentations

Contrastive Learning with Stronger Augmentations 摘要 基于提供的摘要&#xff0c;该论文的核心焦点是在对比学习领域提出的一个新框架——利用强数据增强的对比学习&#xff08;Contrastive Learning with Stronger Augmentations&#xff0c;简称CLSA&#xff09;。以下…

【Jvm】性能调优(上)线上问题排查工具汇总

文章目录 一.互联网概念1.产品闭环和业务闭环2.软件设计中的上游和下游3.JDK运行时常量池 二.CPU相关概念1.查询CPU信息2.CPU利用率&#xff08;CPU utilization&#xff09;和 CPU负载&#xff08;CPU load&#xff09;2.1.如何理解CPU负载2.2.top命令查看CPU负载均值2.3.CPU负…

Pytorch 配置 GPU 环境

1、Pytorch 深度学习跑代码的时候&#xff0c;因为简单的操作不适合cpu运行&#xff0c;我们更习惯用GPU加速代码。 本章将介绍怎么安装pytorch的gpu环境&#xff0c;以及常见的问题 关于conda的安装&#xff0c;参考之前文章&#xff1a;深度学习环境配置&#xff1a;Anaco…

初始树莓派 + VMware17 安装树莓派(Raspberry Pi 4B/5)

文章目录 树莓派入门 VMware17 安装树莓派(Raspberry Pi 4/5B)前言一、树莓派入门指南&#xff1a;从零开始探索树莓派树莓派4B和5对比 二、在VMware Workstation 17上安装树莓派4B/5操作系统&#xff1a;实现强大性能与便捷模拟工具准备开始安装树莓派1.创建一个虚拟机2. 选择…

PyCharm 取消所有断点

PyCharm 取消所有断点 1. Run -> View Breakpoints...2. Python Line Breakpoint3. Remove - DoneReferences 1. Run -> View Breakpoints… 2. Python Line Breakpoint ​​​ 3. Remove - Done References [1] Yongqiang Cheng, https://yongqiang.blog.csdn.net/

【web安全】渗透测试实战思路

步骤一&#xff1a;选目标 1. 不建议太小的公司&#xff08;可能都是请别人来开发的&#xff0c;用现成成熟的框架&#xff09; 2. 不建议一线大厂&#xff1a;腾讯&#xff0c;字节&#xff0c;阿里等&#xff0c;你懂的 3. 不建议政府部门&#xff0c;安全设备多&#xff…

Spring MVC(基于 Spring4.x)基础学习

一、SpringMVC概述 二、SpringMVC的HelloWorld 三、使用RequestMapping映射请求 四、映射请求参数&请求头 五、处理模型数据 六、视图和视图解析器 七、RESTful CRUD 八、SpringMVC表单标签&处理静态资源 九、数据转换&数据格式化&数据校验 十、处理JSON:使用…

如何在1Panel上偷渡HTTP/3

本文 首发于 Anyeの小站&#xff0c;转载请取得作者同意。 前言 简介 HTTP/3 的基础即谷歌多年探索的基于 UDP 的 QUIC 协议。与 TCP 相比&#xff0c;使用 UDP 可以提供更大的灵活性&#xff0c;并且可以使 QUIC 完全于用户空间中实现——对协议实现的更新不像 TCP 那样需要绑…

前端win10如何设置固定ip(简单明了)

1、右击这个 2、点击属性 3、双击协议版本4设置成以下就ok

原生微信小程序开发记录

1. 拿到项目 先构建 2.小程序与普通网页开发的区别 网页开发渲染线程和脚本线程是互斥的&#xff0c;这也是为什么长时间的脚本运行可能会导致页面失去响应&#xff0c;而在小程序中&#xff0c;二者是分开的&#xff0c;分别运行在不同的线程中。网页开发者可以使用到各种浏览…

【HarmonyOS】鸿蒙开发之Slider组件——第3.5章

组件应用场景: 设备音量大小&#xff0c;调节屏幕亮度等需求 slider组件内options属性简介 value&#xff1a;滑动条当前进度值。 min&#xff1a;设置滑动条设置最小值。 max&#xff1a;设置滑动条设置最大值&#xff0c;默认为 100 。 step&#xff1a;设置滑动条滑动跳动…

【npm】npm镜像源及命令

淘宝镜像源 npm config set registry https://registry.npm.taobao.org &#xff08;旧版&#xff0c;已到期&#xff09; 淘宝中国镜像源 npm config set registry https://registry.npmmirror.com &#xff08;新版&#xff09; 腾讯云镜像源 npm config set registry h…

Linux命令-bzmore命令(查看bzip2压缩过的文本文件的内容)

说明 bzmore命令 用于查看bzip2压缩过的文本文件的内容&#xff0c;当下一屏显示不下时可以实现分屏显示。 语法 bzmore(参数)参数 文件&#xff1a;指定要分屏显示的.bz2压缩包。

Python从进阶到高级—通俗易懂版

Python从进阶到高级—通俗易懂版 # # Author : Mikigo # Time : 2021/12/23 # 一、简介 Python 进阶是我一直很想写的&#xff0c;作为自己学习的记录&#xff0c;过去自己在看一些代码的时候经常会困惑&#xff0c;看不懂&#xff0c;然后自己去查资料、看书籍&#xff0…

JAVA之HashMap详解

HashMap 1. 设计原理 HashMap 基于哈希表的 Map 接口实现&#xff0c;是以 key-value 存储形式存在&#xff0c;即主要用来存放键值对。HashMap 的实现不是同步的&#xff0c;这意味着它不是线程安全的。它的 key、value 都可以为 null&#xff0c;此外&#xff0c;HashMap 中…