typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存

Manjaro安装RabbitMQ

安装

sudo pacman -S rabbitmq rabbitmqadmin

启动管理模块

sudo rabbitmq-plugins enable rabbitmq_managementsudo rabbitmq-server

管理界面
http://127.0.0.1:15672/
默认用户名和密码都是guest。
要使用 rabbitmqctl 命令添加用户并分配权限,您可以按照以下步骤进行操作:

  1. 添加用户
rabbitmqctl add_user mingcai password

请将 password 替换为您想要设置的实际密码。

  1. 分配权限
rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"

这个命令将用户 mingcai 授予对所有虚拟主机的所有资源的读、写和管理权限。如果您只想给予特定权限,请适当调整正则表达式 ".*",以授予适当的权限。例如,如果您只想给予读取权限,可以使用 "^amq\."

  1. 可选步骤:设置用户角色

您可以将用户分配给不同的角色,以便更好地管理权限。例如,您可以将用户添加到 administrator 角色以获取管理员权限:

rabbitmqctl set_user_tags mingcai administrator

这样,用户 mingcai 就被赋予了管理员权限。

请确保您具有适当的权限来执行这些操作,并确保替换示例中的用户名和密码为您自己的实际值。

死信队列

在这里插入图片描述

在这里插入图片描述
标题:利用RabbitMQ死信队列处理消息的三种情况

在消息队列的应用中,处理异常情况和消息的延迟成为了一项重要的任务。RabbitMQ作为一款流行的消息队列服务,提供了死信队列(Dead Letter Exchange)功能,能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况。本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况,并提供了TypeScript示例代码。

1. 消息被拒绝

当消费者无法处理某条消息时,可以选择将其标记为“被拒绝”。这种情况下,我们可以配置RabbitMQ,将被拒绝的消息发送到一个死信队列,以后再处理。

// 引入amqplib库
import * as amqp from 'amqplib';// 连接到RabbitMQ服务器
const connection = await amqp.connect('amqp://localhost');// 创建Channel
const channel = await connection.createChannel();// 定义队列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {// 设置死信交换机deadLetterExchange: 'my_dead_letter_exchange'
});// 消费消息
channel.consume(queueName, (msg) => {// 处理消息if (msg) {// 处理失败,拒绝消息并将其重新放回队列// channel.reject(msg, true); // 第二个参数设为 true 表示将消息重新放回队列// 处理失败,拒绝消息channel.reject(msg, false); // 第二个参数设为 false 表示将消息投递到死信队列// or 处理失败,拒绝消息并将其重新放回死信队列channel.nack(msg, false, false); // 第二个参数设为 false 表示不将消息重新放回原队列,第三个参数设为 false 表示不拒绝当前和之前所有未确认的消息}
});

2. 消息过期

有时候我们希望消息在一定时间内被处理,如果超过了这个时间,就认为消息已经过期。RabbitMQ允许我们设置消息的过期时间,并在消息过期后将其发送到死信队列。

// 发布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {expiration: '60000' // 设置过期时间为60秒
});

3. 队列达到最大长度

为了避免队列过载,我们可以限制队列的最大长度。当队列达到最大长度时,新的消息将被拒绝,并发送到死信队列。

// 定义队列
await channel.assertQueue(queueName, {maxLength: 100, // 设置最大队列长度为100deadLetterExchange: 'my_dead_letter_exchange'
});

通过以上配置,我们可以利用RabbitMQ的死信队列来处理消息被拒绝、消息过期以及队列达到最大长度等情况,保证消息系统的稳定性和可靠性。

以上是利用TypeScript示例代码演示了如何在RabbitMQ中使用死信队列。希望这篇文章对你有所帮助!

延时队列

什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

npm install amqplib --save
npm install @types/amqplib --save-dev

在这里插入图片描述

总结

在这里插入图片描述

rabbitmqadmin 使用入门

rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于执行各种管理任务,如创建队列、交换机,查看队列状态等。以下是一些基本的用法示例:

export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=passwordrabbitmqadmin list exchanges
  1. 查看 RabbitMQ 服务器信息
rabbitmqadmin status
  1. 列出所有交换机
rabbitmqadmin list exchanges
  1. 列出所有队列
rabbitmqadmin list queues
  1. 创建一个交换机
rabbitmqadmin declare exchange name=my_exchange type=direct
  1. 创建一个队列
rabbitmqadmin declare queue name=my_queue
  1. 绑定队列到交换机
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
  1. 发送消息到指定交换机
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"
  1. 获取队列消息
rabbitmqadmin get queue=my_queue

这些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和选项。你可以通过运行 rabbitmqadmin help 命令来获取更详细的帮助信息,或者查看官方文档以了解更多选项和使用方法。

延时3秒和8秒全部代码

// delayProducer.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();const exchange = 'routing_exchange';// 定义 dlx-exchangeconst dlxExchangeName = 'dlx-exchange';// 声明交换机await channel.assertExchange(exchange, 'direct', {durable: true});await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丢失const dlxqueueBindings= [{dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',},{dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'}];for (const binding of dlxqueueBindings) {// 绑定延迟死信队列await channel.assertQueue(binding.dlxQueueName );//死信交换机和死信队列绑定 Routing key fast 的消息await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 将 dlx-queue 绑定到死信交换机}// 定义队列和路由键的映射const queueBindings = [{queue: '3_second_queue', routingKey: 'fast', arguments: {'x-message-ttl': 3000, // TTL 设置为 3 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}},{queue: '8_second_queue', routingKey: 'slow', arguments: {'x-message-ttl': 8000, // TTL 设置为 8 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称}}];// 声明队列,并将队列绑定到交换机上for (const binding of queueBindings) {await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});await channel.bindQueue(binding.queue, exchange, binding.routingKey);}for (let i = 0; i < 10; i++) {await new Promise((resolve) => {setTimeout(() => {resolve(1)}, 1000)})const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log('当前中国时间:', chinaTime);// 发送消息到交换机,并设置不同的路由键await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);}// 关闭连接setTimeout(async () => {await channel.close();await connection.close();}, 10000); //10 秒后关闭连接
}async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {channel.publish(exchange, routingKey, Buffer.from(message));console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}setupRouting().catch(console.error);
//消费者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-3_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';async function setupRouting() {const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');const channel = await connection.createChannel();let queue = 'dlx-8_second_queue'// 定义队列和路由键的映射await channel.consume(queue, (msg) => {if (msg !== null) {const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);channel.ack(msg); // 确认消息已被处理}});}setupRouting().catch(console.error);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/777748.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

怎样去保证 Redis 缓存与数据库双写一致性?

解决方案 那么我们这里列出来所有策略&#xff0c;并且讨论他们优劣性。 先更新数据库&#xff0c;后更新缓存先更新数据库&#xff0c;后删除缓存先更新缓存&#xff0c;后更新数据库先删除缓存&#xff0c;后更新数据库 先更新数据库&#xff0c;后更新缓存 这种方法是不推…

在scroll-view中使用input,input键盘弹出时,滚动页面,输入框内容会出现错位问题?

解决办法 <view classpages><view><scroll-view scroll-y"{{sysScroll}}" scroll-top"{{scrollTop}}" class"scroll-hei-2 bg-def">...<input bindfocus"onfocus" bindblur"onblur" placeholder&quo…

RestTemplate 请求响应数据出现乱码问题,RestTemplate 如何解压缩 gzip 数据

文章目录 1.问题描述2.问题分析3.问题解决3.1 Apache HttpClient 依赖3.2 RestTemplate 配置类3.3 测试 1.问题描述 直接通过浏览器访问请求没有问题&#xff0c;但是通过 RestTemplate 访问请求却会出现乱码问题。 2.问题分析 首先我认为是 SpringBoot 版本、JDK 版本、项目结…

HTTP——Cookie

HTTP——Cookie 什么是Cookie通过Cookie访问网站 我们之前了解了HTTP协议&#xff0c;如果还有小伙伴还不清楚HTTP协议&#xff0c;可以点击这里&#xff1a; https://blog.csdn.net/qq_67693066/article/details/136895597 我们今天来稍微了解一下HTTP里面一个很小的部分&…

OpenHarmony中的LLDB高性能调试器

概述 LLDB&#xff08;Low Lever Debugger&#xff09;是新一代高性能调试器。详细说明参考 LLDB官方文档 。 当前OpenHarmony中的LLDB工具是在 llvm15.0.4 基础上适配演进出来的工具&#xff0c;是HUAWEI DevEco Studio工具中默认的调试器&#xff0c;支持调试C和C应用。 工…

【直播课】2024年PostgreSQL CM认证实战培训课程于4月27日开课!

课程介绍 了解关注开源技术&#xff0c;学习PG以点带面 Linux/Andriod&#xff08;操作系统&#xff09;、Apache/Tomcat&#xff08;应用服务器&#xff09;、OpenStack/KVM&#xff08;虚拟化&#xff09;、Docker/K8S&#xff08;容器化&#xff09;、Hadoop&#xff08;大…

IoTeX(IOTX) 推出首个DEPIN数据平台,蓝筹项目合作进入新时代。

首先来了解一下什么是IoTeX(IOTX) 2024年1月25日&#xff0c;作为由IoTeX驱动的首个DEPIN类别优先数据平台&#xff0c;与蓝筹DePIN项目Helium、Akash、Theta、DIMO、Pocket、Drife、WiFi Map和Streamr合作推出。这一官方发布标志着DePIN&#xff08;去中心化物理基础设施网络&…

西门子 S7-200 SMART 系列十三:实例详解用s7-200 smart 向导配置PID回路参数设定

原文链接&#xff1a;西门子 S7-200 SMART 系列十三&#xff1a;实例详解用s7-200 smart 向导配置PID回路&参数设定 在往期文章中介绍了s7-200 smart的通讯应用&#xff0c;Modbus RTU&#xff0c;S7通信&#xff0c;Profinet通信&#xff0c;TCP通信等&#xff0c;基本包含…

代码学习记录29----贪心最后一天

随想录日记part29 t i m e &#xff1a; time&#xff1a; time&#xff1a; 2024.03.28 主要内容&#xff1a;今天是学习贪心算法最后一天&#xff0c;接下来是针对题目的讲解&#xff1a;1.单调递增的数字;2.监控二叉树; 3. 总结 738.单调递增的数字 968.监控二叉树 总结 To…

通知中心架构:打造高效沟通平台,提升信息传递效率

随着信息技术的快速发展&#xff0c;通知中心架构作为一种关键的沟通工具&#xff0c;正逐渐成为各类应用和系统中必不可少的组成部分。本文将深入探讨通知中心架构的意义、设计原则以及在实际场景中的应用。 ### 什么是通知中心架构&#xff1f; 通知中心架构是指通过集中管…

Git_常用命令+代码冲突解决方案

文章目录 基本命令的使用查看git的当前版本初始化配置设置用户名及邮箱设置仓库的认证方式查看当前配置 创建仓库从远程服务器克隆仓库创建本地仓库 添加和提交文件工作区域和文件状态工作区域文件状态 查看文件状态及分支信息查看暂存区的内容添加文件提交文件查看提交日志回退…

verilog设计-cdc:多比特信号跨时钟域(DMUX)

一、前言 多比特一般为数据&#xff0c;其在跨时钟域传输的过程中有多种处理方式&#xff0c;比如DMUX&#xff0c;异步FIFO&#xff0c;双口RAM&#xff0c;握手处理。本文介绍通过DMUX的方式传输多比特信号。 二、DMUX同步跨时钟域数据 dmux表示数据分配器&#xff0c;该方…

ChatGLM3:AttributeError_ can‘t set attribute ‘eos_token‘

最近在微调 ChatGLM3-6b 时&#xff0c;训练好模型之后&#xff0c;调用inference_hf.py函数验证模型的时候报了如下错误&#xff0c;下面是解决方案。 我在训练时使用的是ptuning_v2.yaml配置文件&#xff0c;训练运行代码如下&#xff1a; CUDA_VISIBLE_DEVICES1 python fi…

3.Labview字符串与路径精讲(下) — 字符串及路径的使用

本章讲解labview中的字符串和路径具体实践用例&#xff0c;从前面板字符串属性到后面板字符串函数应用做出详细概述&#xff0c;通过本文的学习希望大家了解到字符串及路径在labview编程中的重要地位。 本系列文章为labview 从基础到强化到精通的学习文章&#xff0c;大家可以随…

Unity3d使用Jenkins自动化打包(Windows)(二)

文章目录 前言一、Unity工程准备二、Unity调取命令行实战一实战二实战三实战四实战五 总结 前言 自动化打包的价值在于让程序员更轻松地创建和管理构建工具链&#xff0c;提高编程效率&#xff0c;将繁杂的工作碎片化&#xff0c;变成人人&#xff08;游戏行业特指策划&#x…

RegSeg 学习笔记(待完善)

论文阅读 解决的问题 引用别的论文的内容 可以用 controlf 寻找想要的内容 PPM 空间金字塔池化改进 SPP / SPPF / SimSPPF / ASPP / RFB / SPPCSPC / SPPFCSPC / SPPELAN &#xfffc; ASPP STDC&#xff1a;short-term dense concatenate module 和 DDRNet SE-ResNeXt …

如何为企业策划一场XR虚拟直播?

活动年年办&#xff0c;都是老一套&#xff0c;想玩点新花样&#xff1f; 预算有限&#xff0c;但还是想把活动办的逼格高一点&#xff1f; 想通过活动&#xff0c;让更多的人知道自己企业的品牌&#xff1f; 随着AIGC技术的不断演变&#xff0c;企业活动的形式和内容也在不…

Linux中的vim/vi编辑器

VI 是 Unix 操作系统和类 Unix 操作系统中最通用的文本编辑器。 VIM 编辑器是从 VI 发展出来的一个性能更强大的文本编辑器&#xff0c;可以说是&#xff1a;编辑器之神。可以主动的以字体颜 色辨别语法的正确性&#xff0c;方便程序设计。VIM 与 VI 编辑器完全兼容。 一:三种…

MoonBit MeetUp回顾——张正、宗喆:编程语言在云原生与区块链领域的技术探索

宗喆和张正分别给我们带了 KCL 相关的最新进展&#xff0c;由蚂蚁集团开发的 Rust 编写的开源 DSL&#xff0c;目标是优化云原生策略配置和用户体验。它通过引入动态配置管理、配置校验和基础设施抽象等核心概念&#xff0c;解决开发者认知负担、配置膨胀和标准化工具缺乏的问题…

Unity AI Navigation自动寻路

目录 前言一、Unity中AI Navigation是什么&#xff1f;二、使用步骤1.安装AI Navigation2.创建模型和材质3.编写向目标移动的脚本4.NavMeshLink桥接组件5.NavMeshObstacle组件6.NavMeshModifler组件 三、效果总结 前言 Unity是一款强大的游戏开发引擎&#xff0c;而人工智能&a…