rabbitmq交换机,死信队列的简单例子

       假设我们有一个场景,生产者有消息发到某个直连交换机,这个交换机上有两个队列分别存储两种类型的消息,但是与这两个队列相连的消费者太不争气了,处理消息有点慢,我们想5秒钟这个消息在队列中还没有被消费的话,就给它丢进死信队列里得了(我们平时听到的延时队列其实就可按此方法实现,故意让它过期然后延时处理),后续再处理,但是这俩队列明显存储的消息不一样,我们又不好意思将它都扔到同一个死信队列里去,如果我们想要俩死信队列分别装这两个消费者漏掉的消息,那我们怎么做呢?

        下面就是一个简单的例子,如果用spring boot之类的去做也是类似,原理差不多,感兴趣的可以自己改造。

        预处理:我们先创建一个工具类用来连接rabbitmq,注意你需要去创建对应的虚拟主机,以及对应的登录账号和密码。

工具类如下: 

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;public class ConnectionUtil {public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("localhost");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhost//VirtualHost(虚拟主机)是一个逻辑上独立的RabbitMQ服务实例。每个VirtualHost都有自己的队列、交换机、绑定等对象,并且它们之间是相互隔离的,即exchange、queue、message不能互通。factory.setVirtualHost("myVirtualHost");factory.setUsername("mytest");factory.setPassword("mytest");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}
}

        

现在我们有一个直连交换机test_exchange_direct(直连交换机即根据设置的固定键直接路由到对应的队列,注意与主题topic队列的区分),我们往这个交换机里每300毫秒分别发送键为good和bad的数据各30条。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class SendToExchange {private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");//这里要注意,如果你没有响应的队列的话即交换机还没有绑定队列,发送消息到交换机这些消息会丢失。for (int i = 0; i < 30; i++) {// 消息内容String message = "good " + i;//会路由到good对应的队列上channel.basicPublish(EXCHANGE_NAME, "good", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(300);}for (int i = 0; i < 30; i++) {// 消息内容String message = "bad " + i;//会路由到bad对应的队列上channel.basicPublish(EXCHANGE_NAME, "bad", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(300);}channel.close();connection.close();}
}

        我们再创建一个直连死信交换机dead_exchange_direct,和连接到此私信交换机上的两个队列dead_queue,dead_queue1,对应的键分别为dead-good和dead-bad。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class DeadExchange {private final static String EXCHANGE_NAME = "dead_exchange_direct";private final static String QUEUE_NAME = "dead_queue";private final static String QUEUE_NAME1 = "dead_queue1";public static void main(String[] argv) throws Exception {channel1();channel2();}public static void channel1() throws Exception{// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定队列到交换机  死信路由键为deadchannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dead-good");channel.close();connection.close();}public static void channel2() throws Exception{// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME1, true, false, false, null);// 绑定队列到交换机  死信路由键为deadchannel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "dead-bad");channel.close();connection.close();}

第一个不争气消费者RecvFromExchange,这个消费者对应的队列是good_queue队列,它800毫秒能处理一条消息,给他设置读取队列消息过期时间为5秒,绑定dead_exchange_direct死信交换机,死信队列路由键为dead-good

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;import java.util.HashMap;
import java.util.Map;public class RecvFromExchange {private final static String QUEUE_NAME = "good_queue";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();Map<String,Object> args = new HashMap<>();args.put("x-message-ttl",5000);args.put("x-dead-letter-exchange","dead_exchange_direct");args.put("x-dead-letter-routing-key","dead-good"); // 死信路由键dead 路由到键为dead的死信队列// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME, true, false, false, args);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "good");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(800);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}

第二个不争气消费者RecvFromExchange2,这个消费者对应的队列是bad_queue队列,它1秒能处理一条消息,它虽然慢一些但是我就是一视同仁给他设置读取队列消息过期时间也为5秒,绑定dead_exchange_direct死信交换机,死信队列路由键为dead-bad

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;import java.util.HashMap;
import java.util.Map;public class RecvFromExchange2 {private final static String QUEUE_NAME = "bad_queue";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();Map<String,Object> args = new HashMap<>();args.put("x-message-ttl",5000);args.put("x-dead-letter-exchange","dead_exchange_direct");args.put("x-dead-letter-routing-key","dead-bad"); // 死信路由键dead 路由到键为dead的死信队列// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME, true, false, false, args);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "bad");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(1000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}

按顺序先启动DeadExchange,SendToExchange,RecvFromExchange,RecvFromExchange2。然后再次启动SendToExchange,重新发数据观察发现,这两个不争气的消费者漏掉的数据最后被死信队列接收了。

 

接下来我们对我们喜欢的绑定键dead-good的好队列给它兜底擦屁股。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;public class FuckDeadQueue {private final static String EXCHANGE_NAME = "dead_exchange_direct";private final static String QUEUE_NAME = "dead_queue";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 绑定队列到交换机  死信路由键为deadchannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dead-good");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(1000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}

 执行后发现,死信队列里的消息被我们消费掉了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/12104.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MIT6.S081】Lab7: Multithreading(详细解答版)

实验内容网址:https://xv6.dgs.zone/labs/requirements/lab7.html 本实验的代码分支:https://gitee.com/dragonlalala/xv6-labs-2020/tree/thread2/ Uthread: switching between threads 关键点:线程切换、swtch 思路: 本实验完成的任务为用户级线程系统设计上下文切换机制…

SGPM02陀螺仪模块通过惯性导航助力AGV小车的发展

之前我们介绍过SGPM01系列陀螺仪模块在智能泳池清洁机器人导航的方案(SGPM01)。这款惯性导航模块收到了许多企业的欢迎。由此&#xff0c;爱普生推出了SGPM02系列陀螺仪模块通过惯性导航&#xff0c;助力AGV小车的发展。 AGV是一种用于运输材料的无人驾驶车辆&#xff0c;并且A…

ICode国际青少年编程竞赛- Python-5级训练场-带参数函数

ICode国际青少年编程竞赛- Python-5级训练场-带参数函数 1、 def get_item(a):Dev.step(a)Dev.step(-a) get_item(4) Spaceship.step(2) get_item(2) Spaceship.step(3) get_item(5) Spaceship.step(2) get_item(3) Spaceship.step(3) get_item(4)2、 def get_item(a): D…

老杨说运维 | 金融业数据中心的发展趋势

【这是老杨在2023.10乌镇大会上的演讲&#xff08;一&#xff09;。接下来&#xff0c;6月初老杨又要在成都开讲了。到时候再发新的】 最近几年&#xff0c;“企业数字化转型”是行业内最热的一个词。当然&#xff0c;“新质生产力”又成了这个月最热的词。虽然新词热词层出不…

基于JAVA8的lambda递归的treeNode树形遍历

1.TreeNode类 import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.util.List;Data NoArgsConstructor AllArgsConstructor public class TreeNode {Integer id;String value;/** 子节点信息 */List<TreeNode> childr…

SHELL-双重循环习题练习

1.99乘法表 #!/bin/bash #99乘法表for ((second1; second<9; second)) dofor ((first1; first<second; first))do echo -n -e "${first}*${second}$[first*second]\t" done echo done ######### 首先定义了一个外循环变量second&#xff0c;初始值为1&am…

AI 情感聊天机器人工作之旅 —— 与复读机问题的相遇与别离

前言&#xff1a;先前在杭州的一家大模型公司从事海外闲聊机器人产品&#xff0c;目前已经离职&#xff0c;文章主要讨论在闲聊场景下遇到的“复读机”问题以及一些我个人的思考和解决方案。文章内部已经对相关公司和人员信息做了去敏&#xff0c;如仍涉及到机密等情况&#xf…

linux学习:多媒体开发库SDL+视频、音频、事件子系统+处理yuv视频源

目录 编译和移植 视频子系统 视频子系统产生图像的步骤 api 初始化 SDL 的相关子系统 使用指定的宽、高和色深来创建一个视窗 surface 使用 fmt 指定的格式创建一个像素点​编辑 将 dst 上的矩形 dstrect 填充为单色 color​编辑 将 src 快速叠加到 dst 上​编辑 更新…

连锁收银系统源代码有哪些功能,进销存+收银+会员+门店补货+线上商城

在现代零售行业&#xff0c;高效的管理系统是保持连锁店运营顺畅的关键。而开源连锁收银系统作为一款功能丰富的管理软件&#xff0c;为零售企业提供了全面的解决方案&#xff0c;涵盖了进销存管理、收银、会员、门店补货以及线上商城等多个方面&#xff0c;帮助企业实现精细化…

C语言判断字符旋转

前言 今天我们使用c语言来写代码来实现字符串选择的判断&#xff0c;我们来看题目 题目描述 写一个函数&#xff0c;判断一个字符串是否为另外一个字符串旋转之后的字符串。 例如&#xff1a;给定s1 AABCD和s2 BCDAA&#xff0c;返回1 给定s1abcd和s2ACBD&#xff0c;返回0. A…

想白嫖?音视频的文本提取和总结?NoteGPT满足你

NoteGPT实现了音频、录音以及视频的AI总结 NoteGPT最近做了一个功能&#xff1a;Audio Summary&#xff08;Audio Summary with AI - NoteGPT&#xff09; 1&#xff09;完全免费&#xff1b; 2&#xff09;支持mp3、mp4&#xff1b; 3&#xff09;支持URL和本地上传&…

【UE Niagara】在UI上生成粒子

效果 步骤 1. 在虚幻商城中将“Niagara UI Render”插件安装到引擎 2. 打开虚幻编辑器&#xff0c;勾选插件“Niagara UI Renderer”&#xff0c;然后重启编辑器 3. 先创建一个控件蓝图&#xff0c;该控件蓝图只包含一个按钮 这里设置尺寸框尺寸为200*50 4. 显示该控件 5. 新…

Excel——项目管理,设置时间到期自动提醒及颜色高亮

效果图 第一步、自动获取合同到期日期 1、首先合同【签约日期】和【到期日期】下面的数据必须是日期格式&#xff0c;不能是其它的格式否则无法计算&#xff0c;如果是其它格式需要转换成标准的日期格式&#xff0c;如下图所示。 2、在“到期日期”下面的第一个单元格中输入公…

如何让机器理解人类语言?Embedding技术详解

如何让机器理解人类语言&#xff1f;Embedding技术详解 文章目录 如何让机器理解人类语言&#xff1f;Embedding技术详解介绍什么是词嵌入&#xff1f;什么是句子嵌入&#xff1f;句子嵌入模型实现句子嵌入的方法值得尝试的句子嵌入模型 句子嵌入库实践Step 1Step 2Step 3 Doc2…

GBJ3510-ASEMI室内空调机GBJ3510

编辑&#xff1a;ll GBJ3510-ASEMI室内空调机GBJ3510 型号&#xff1a;GBJ3510 品牌&#xff1a;ASEMI 封装&#xff1a;GBJ-4 最大重复峰值反向电压&#xff1a;1000V 最大正向平均整流电流(Vdss)&#xff1a;35A 功率(Pd)&#xff1a;中小功率 芯片个数&#xff1a;4…

股东那些事儿:解锁企业背后的权力玩家与盈利秘籍

Hello&#xff0c;大家好啊&#xff0c;今天咱们要聊的主角&#xff0c;是每个企业背后不可或缺的隐形巨擘——股东。他们是谁&#xff1f;他们怎样从公司的经营中分一杯羹&#xff1f;又如何在商业棋盘上运筹帷幄&#xff1f;搬好小板凳&#xff0c;咱们这就开启股东世界的探秘…

Node.js 学习笔记 express框架

express express 使用express下载express 初体验 express 路由什么是路由1路由的使用验证的方法 2获取请求报文参数3获取路由参数4响应设置响应报文 express 中间件5中间件全局中间件路由中间件 6静态资源中间件注意事项案例 7请求体数据8防盗链实现防盗链 9路由模块化router E…

Java——类和对象第二节——封装

1.什么是封装 封装是面向对象程序的三大特性之一&#xff0c;面向对象程序的三大特性分别是封装&#xff0c;继承&#xff0c;多态 而封装简单来说就是套壳隐藏细节 打个比方&#xff1a; 在一些电脑厂商生产电脑时&#xff0c;仅对用户提供开关机键&#xff0c;键盘输入&a…

瑞友科技质量改进服务事业部总经理张力受邀为第十三届中国PMO大会演讲嘉宾

全国PMO专业人士年度盛会 北京瑞友科技股份有限公司质量改进服务事业部总经理张力先生受邀为PMO评论主办的2024第十三届中国PMO大会演讲嘉宾&#xff0c;演讲议题为“PMO如何对接战略成为企业IT投资成功的有效保障”。大会将于6月29-30日在北京举办&#xff0c;敬请关注&#x…

一个不知名的开源项目可以带来多少收入

起源 2020 年新冠疫情开始蔓延&#xff0c;当时我在同时经营 3 个不同的公司。除了其中的体育赛事平台因为疫情关门大吉之外&#xff0c;另外两个公司并没有受影响&#xff0c;营收和利润反而都持续增加。但是连续几个月不能出远门&#xff0c;也不能随便见朋友和客户&#xff…