通过rabbitmq生成延时消息,并生成rabbitmq镜像

通过rabbitmq生成延时消息队列,并生成rabbitmq镜像

  • 整体描述
    • 1. 使用场景
    • 2. 目前问题
    • 3. 前期准备
  • 具体步骤
    • 1. 拉取镜像
    • 2. 运行镜像
    • 3. 安装插件
    • 4. 代码支持
      • 4.1 config文件
      • 4.2 消费监听
      • 4.2 消息生产
    • 5. 功能测试
  • 镜像操作
    • 1. 镜像制作
    • 2. 镜像导入
  • 总结

整体描述

1. 使用场景

在使用消息队列时,我们有时候需要生成一些延时消息,比如判断一个任务的开始时间,我在创建任务的时候计算出此时距离任务开始的时间,然后往消息队列里发送一个延时消息,我们希望等到任务开始的时候,再消费此消息,此时任务开始,可以进行一些业务上的操作。

2. 目前问题

之前写过一篇创建rabbitmq镜像的文章,链接: 在centos搭建rabbitmq并制作docker镜像,使用的rabbitmq的版本是3.6.8,只能通过过期时间expiration来设置消息的过期时间,在消息过期的时候,会进入死信队列中,也能达到上述要求。但是,但是,这个过期时间expiration,rabbitmq在处理的时候有个坑,前面消息如果没有过期,后面的消息就算过期了,也不会触发,就是先发的消息没有到期,之后再发的消息就算到期了,也不会触发回调。这显然不行。

3. 前期准备

需要准备的主要就是docker环境,这个可以自行搜一下怎么安装docker环境,由于和本文主要讲的内容关系不大,略…

具体步骤

为了解决此问题,我们可以用延时队列插件来实现,这个插件时一个开发者写的,在github上但是已经被rabbitmq官方接受了,所以可以放心用。

1. 拉取镜像

首先我们先拉取一个rabbitmq的官方镜像进行操作,这个需要注意一下拉取的版本,由于延时队列的插件支持的版本是3.7之后的rabbitmq,所以需要拉取3.7之后的,我这拉取的是3.8.17版本。在命令行输入:

docker pull rabbitmq:3.8.17-management

注:这个如果报错,看下自己的docker环境有没有问题。带management是带管理页面的镜像,我们选用的带management的镜像,后期使用的时候好操作和定位问题。

2. 运行镜像

拉取成功之后,使用命令:

docker images

查看镜像是否拉取成功,如下就是成功了:
rabbitmq镜像
之前用的3.6.8的,不支持延时消息队列的插件…
然后运行镜像,创建容器并启动:

docker run --name rabbitmq-server -p 5672:5672 -p 15672:15672 -d rabbitmq:3.8.17-management

此时用:

docker ps -a

查看容器:
rabbitmq容器
容器已经创建并启动,我们通过web页面可以访问rabbitmq的管理页面,在浏览器输入:http://localhost:15672/
默认账号:guest,密码:guest
rabbitmq登录页面

3. 安装插件

此时rabbitmq已经运行,我们需要安装插件来支持延时消息队列,插件下载地址
选择相应的rabbitmq版本进行下载,注意版本不要选错了。下载完是一个rabbitmq_delayed_message_exchange-3.8.0.ez的文件,我们需要把这个文件上传到docker的/opt/rabbitmq/plugins目录下。
上传之后,进入/opt/rabbitmq/sbin目录执行如下命令让插件生效:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

执行之后看到如下就成功了:
插件启动成功
成功之后刷新一下管理页面,在新建交换机那里,type能多一个x-delayed-message的选项:
添加延时交换机
此时,我们的rabbitmq就配置完成了。

4. 代码支持

rabbitmq目前已经可以接收延时消息了,在代码端我们也需要进行相应的修改,以达到发送延时消息的目的。

4.1 config文件

package com.thcb.rabbitmq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ的配置类** @author thcb* @date 2023-09-05*/
@Configuration
public class RabbitMqConfig {// 交换机private static final String DELAYED_EXCHANGE = "delayed.exchange";// 队列private static final String DELAYED_QUEUE = "delayed.queue";// 路由private static final String DELAYED_ROUTING_KEY = "delayed.routingKey";/*** 队列*/@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE);}/*** 交换机*/@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, args);}/*** 绑定延迟队列和交换机*/@Beanpublic Binding delayQueueBindingDelayExchange() {return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();}}

4.2 消费监听

package com.thcb.rabbitmq.recevier;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 消费监听** @author thcb* @date 2023-09-05*/
@Slf4j
@Component
public class DelayQueueReceiver {@RabbitListener(queues = "delayed.queue")public void receiveDelayedQueue(Message message) {String msg = new String(message.getBody());log.info("当前时间:{},收到DelayedQueue消息:{}", new Date().toString(), msg);}}

4.2 消息生产

这里创建一个controller来生产消息,里面有两个接口,一个生产消息的延时时间是5秒,另一个是30秒,用来测试延时时间。

package com.thcb.rabbitmq.controller;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;/*** 消息生产controller** @author thcb* @date 2023-09-05*/
@RestController
@RequestMapping("/HelloController")
public class HelloController {private static final Logger log = LoggerFactory.getLogger(HelloController.class);@Autowiredprivate AmqpTemplate rabbitTemplate;@RequestMapping("/sendXDLMessage1")@ResponseBodypublic String sendXDLMessage1() {int time = 5000;String message = "{\"type\":\"sendXDLMessage1\"}";log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {msg.getMessageProperties().setDelay(time);return msg;});return "sendXDLMessage1 success";}@RequestMapping("/sendXDLMessage2")@ResponseBodypublic String sendXDLMessage2() {int time = 30000;String message = "{\"type\":\"sendXDLMessage2\"}";log.info("当前时间:{},发送一条延迟{}毫秒的信息给延迟队列:{}", new Date().toString(), time, message);rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingKey", message, msg -> {msg.getMessageProperties().setDelay(time);return msg;});return "sendXDLMessage2 success";}
}

5. 功能测试

代码修改完,就可以测试了,启动工程之后,在rabbitmq管理页面能看到自动创建了如下交换机和队列:
创建的交换机
创建的队列
可以看到交换机的类型是x-delayed-message。
接下来就可以调用测试接口,生产2条消息看看了。先调用sendXDLMessage2接口,生产一个延时30秒的消息,过一会再调用sendXDLMessage1的接口,生产一个延时5秒的消息。log结果如下:
运行结果
结果符合我们的预期,先发的30秒延时消息消息2,之后发的5秒延时消息1,然后过了5秒消息1先回调,之后30秒消息2回调。

镜像操作

使用docker主要就是要制作镜像,之后直接就可以用了要不每次还得配置。提示制作之前,把现在的队列和交换机都删除,队列和交换机是通过代码创建的,账号可以换一个,默认的guest不太安全。
一切都准备就绪,就可以制作镜像了。

1. 镜像制作

将镜像打包成tar文件。

docker commit 【镜像id】 rabbitmq:3.8.17
docker save -o rabbitmq-3.8.17.tar rabbitmq:3.8.17

2. 镜像导入

制作完镜像进行导入

docker load <rabbitmq-3.8.17.tar
docker run -d -p 5672:5672 -p 15672:15672 --privileged --restart=always --name rabbitmq rabbitmq:3.8.17

总结

以上就是rabbitmq延时消息的相关内容,另外这个延时消息在消息很多的情况下可能会有一些性能问题,使用的时候需要注意一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/73712.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

蠕虫病毒流量分析案例

背景 某供排水集团的网络管理员对其网络的健康状况持认可态度&#xff0c;表示网络运行正常&#xff0c;没有发现异常行为。然而&#xff0c;由于网络环境变得越来越复杂&#xff0c;仅凭借传统的网络经验已经不能全面了解网络情况。因此&#xff0c;我们为供排水集团安装了Ne…

Golang复习

golang的特点 Golang 针对并发进行了优化&#xff0c;并且在规模上运行良好 自动垃圾收集明显比 Java 或 Python 更有效&#xff0c;因为它与程序同时执行 golang数据类型 基本数据类型&#xff08;值类型&#xff09; 布尔类型 数字类型 整型 根据有符号分为&#xff1a;…

[NLP]LLM---FineTune自己的Llama2模型

一 数据集准备 Let’s talk a bit about the parameters we can tune here. First, we want to load a llama-2-7b-hf model and train it on the mlabonne/guanaco-llama2-1k (1,000 samples), which will produce our fine-tuned model llama-2-7b-miniguanaco. If you’re …

华为云API对话机器人CBS的魅力—实现简单的对话操作

云服务、API、SDK&#xff0c;调试&#xff0c;查看&#xff0c;我都行 阅读短文您可以学习到&#xff1a;人工智能AI智能的问答管理、全面的对话管理、高效训练部署 1.IntelliJ IDEA 之API插件介绍 API插件支持 VS Code IDE、IntelliJ IDEA等平台、以及华为云自研 CodeArts …

每日刷题-3

目录 一、选择题 二、编程题 1、计算糖果 2、进制转换 一、选择题 1、 解析&#xff1a;在C语言中&#xff0c;以0开头的整数常量是八进制的&#xff0c;而不是十进制的。所以&#xff0c;0123的八进制表示相当于83的十进制表示&#xff0c;而123的十进制表示不变。printf函数…

ASP.NET Core IOC容器

//IOC容器支持依赖注入{ServiceCollection serviceDescriptors new ServiceCollection();serviceDescriptors.AddTransient<IMicrophone, Microphone>();serviceDescriptors.AddTransient<IPower, Power>();serviceDescriptors.AddTransient<IHeadphone, Headp…

【SQL应知应会】索引 • Oracle版:B-树索引;位图索引;函数索引;单列与复合索引;分区索引

欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享,与更多的人进行学习交流 本文免费学习,自发文起3天后,会收录于SQL应知应会专栏,本专栏主要用于记录对于数据库的一些学习,有基础也有进阶,有MySQL也有Oracle 索引 • MySQL版 前言一、Oracle索引1.索引概述及分类…

upload-labs1-17思路

1 直接写一个php文件测试一下&#xff0c;发现弹窗不让上传 原理很简单&#xff0c;就是把后缀名拿出来过滤一遍&#xff0c;而白名单就是弹窗的这三个 解决方法&#xff1a; 因为这是在前端防御的一个手段&#xff0c;所以直接在浏览器设置上禁用js就行了&#xff1a; 也可…

微服务-OpenFeign基本使用

一、前言 二、OpenFeign基本使用 1、OpenFeign简介 OpenFeign是一种声明式、模板化的HTTP客户端&#xff0c;它使得调用RESTful网络服务变得简单。在Spring Cloud中使用OpenFeign&#xff0c;可以做到像调用本地方法一样使用HTTP请求访问远程服务&#xff0c;开发者无需关注…

stm32 学习笔记:GPIO输出

一、GPIO简介 引脚电平 0-3.3V,部分可容忍5V&#xff0c;对输出而言最大只能输出3.3V, 只要可以用高低电平来控制的地方&#xff0c;都可以用GPIO来完成&#xff0c;如果控制的功率比较大的设备&#xff0c;只需加入驱动电路即可 GPIO 通用输入输出口&#xff0c;可配置为 8种 …

2023国赛数学建模E题思路代码 - 黄河水沙监测数据分析

# 1 赛题 E 题 黄河水沙监测数据分析 黄河是中华民族的母亲河。研究黄河水沙通量的变化规律对沿黄流域的环境治理、气候变 化和人民生活的影响&#xff0c; 以及对优化黄河流域水资源分配、协调人地关系、调水调沙、防洪减灾 等方面都具有重要的理论指导意义。 附件 1 给出了位…

【实训】“宅急送”订餐管理系统(程序设计综合能力实训)

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 前言 大一小学期&#xff0c;我迎来了人生中的第一次实训…

2023/9/8 -- C++/QT

作业 1> 自行封装一个栈的类&#xff0c;包含私有成员属性&#xff1a;栈的数组、记录栈顶的变量 成员函数完成&#xff1a;构造函数、析构函数、拷贝构造函数、入栈、出栈、清空栈、判空、判满、获取栈顶元素、求栈的大小 02stack.h: #ifndef __02STACK_H__ #define __…

无涯教程-JavaScript - IMSIN函数

描述 IMSIN函数以x yi或x yj文本格式返回复数的正弦。复数的正弦为- $$\sin(x yi) \sin(x)\cosh(y) \cos(x)\sin(y)i $$ 语法 IMSIN (inumber)争论 Argument描述Required/OptionalInumberA Complex Number for which you want the sine.Required Notes Excel中的复数仅…

活动预告 | 龙智、紫龙游戏与JFrog专家将出席龙智DevSecOps研讨会,探讨企业大规模开发创新

2023年9月8日&#xff08;周五&#xff09;下午13:30-19:45&#xff0c;龙智即将携手Atlassian与JFrog在上海共同举办主题为“大规模开发创新&#xff1a;如何提升企业级开发效率与质量”的线下研讨会。 在此次研讨会上&#xff0c;龙智高级咨询顾问、Atlassian认证专家叶燕秀…

【List篇】使用Arrays.asList生成的List集合,操作add方法报错

早上到公司&#xff0c;刚到工位&#xff0c;测试同事就跑来说"功能不行了&#xff0c;报服务器异常了&#xff0c;咋回事";我一脸蒙&#xff0c;早饭都顾不上吃&#xff0c;要来了测试账号复现了一下&#xff0c;然后仔细观察测试服务器日志&#xff0c;发现报了一个…

K8S 基础概念学习

1.K8S 通过Deployment 实现滚动发布&#xff0c;比如左边的ReplicatSet 的 pod 中 是V1版本的镜像&#xff0c;Deployment通过 再启动一个 ReplicatSet 中启动 pod中 镜像就是V2 2.每个pod 中都有一个pause 容器&#xff0c;他会连接本pod中的其他容器&#xff0c;实现互通。p…

【计算机网络】HTTP(上)

文章目录 1.HTTP概念2. URLurlencode 和 urldecode转义规则 3. HTTP的宏观理解HTTP的请求HTTP的响应 4. 见一见HTTP请求和响应请求报头 1. 模拟一个简单的响应response响应报头 2. 从路径中获取内容ReadFile函数的实现 3.不同资源进行区分反序列化的实现ReadOneLine函数的实现P…

2.11 PE结构:添加新的节区

在可执行PE文件中&#xff0c;节&#xff08;section&#xff09;是文件的组成部分之一&#xff0c;用于存储特定类型的数据。每个节都具有特定的作用和属性&#xff0c;通常来说一个正常的程序在被编译器创建后会生成一些固定的节&#xff0c;通过将数据组织在不同的节中&…

数学建模B多波束测线问题B

数学建模多波束测线问题 1.问题重述&#xff1a; 单波束测深是一种利用声波在水中传播的技术来测量水深的方法。它通过测量从船上发送声波到声波返回所用的时间来计算水深。然而&#xff0c;由于它是在单一点上连续测量的&#xff0c;因此数据在航迹上非常密集&#xff0c;但…