rabbitmq交换机,死信队列的简单例子

       假设我们有一个场景,生产者有消息发到某个直连交换机,这个交换机上有两个队列分别存储两种类型的消息,但是与这两个队列相连的消费者太不争气了,处理消息有点慢,我们想5秒钟这个消息在队列中还没有被消费的话,就给它丢进死信队列里得了(我们平时听到的延时队列其实就可按此方法实现,故意让它过期然后延时处理),后续再处理,但是这俩队列明显存储的消息不一样,我们又不好意思将它都扔到同一个死信队列里去,如果我们想要俩死信队列分别装这两个消费者漏掉的消息,那我们怎么做呢?

        下面就是一个简单的例子,如果用spring boot之类的去做也是类似,原理差不多,感兴趣的可以自己改造。

        预处理:我们先创建一个工具类用来连接rabbitmq,注意你需要去创建对应的虚拟主机,以及对应的登录账号和密码。

工具类如下: 

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;public class ConnectionUtil {public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("localhost");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhost//VirtualHost(虚拟主机)是一个逻辑上独立的RabbitMQ服务实例。每个VirtualHost都有自己的队列、交换机、绑定等对象,并且它们之间是相互隔离的,即exchange、queue、message不能互通。factory.setVirtualHost("myVirtualHost");factory.setUsername("mytest");factory.setPassword("mytest");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}
}

        

现在我们有一个直连交换机test_exchange_direct(直连交换机即根据设置的固定键直接路由到对应的队列,注意与主题topic队列的区分),我们往这个交换机里每300毫秒分别发送键为good和bad的数据各30条。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class SendToExchange {private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");//这里要注意,如果你没有响应的队列的话即交换机还没有绑定队列,发送消息到交换机这些消息会丢失。for (int i = 0; i < 30; i++) {// 消息内容String message = "good " + i;//会路由到good对应的队列上channel.basicPublish(EXCHANGE_NAME, "good", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(300);}for (int i = 0; i < 30; i++) {// 消息内容String message = "bad " + i;//会路由到bad对应的队列上channel.basicPublish(EXCHANGE_NAME, "bad", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(300);}channel.close();connection.close();}
}

        我们再创建一个直连死信交换机dead_exchange_direct,和连接到此私信交换机上的两个队列dead_queue,dead_queue1,对应的键分别为dead-good和dead-bad。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class DeadExchange {private final static String EXCHANGE_NAME = "dead_exchange_direct";private final static String QUEUE_NAME = "dead_queue";private final static String QUEUE_NAME1 = "dead_queue1";public static void main(String[] argv) throws Exception {channel1();channel2();}public static void channel1() throws Exception{// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定队列到交换机  死信路由键为deadchannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dead-good");channel.close();connection.close();}public static void channel2() throws Exception{// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME1, true, false, false, null);// 绑定队列到交换机  死信路由键为deadchannel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "dead-bad");channel.close();connection.close();}

第一个不争气消费者RecvFromExchange,这个消费者对应的队列是good_queue队列,它800毫秒能处理一条消息,给他设置读取队列消息过期时间为5秒,绑定dead_exchange_direct死信交换机,死信队列路由键为dead-good

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;import java.util.HashMap;
import java.util.Map;public class RecvFromExchange {private final static String QUEUE_NAME = "good_queue";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();Map<String,Object> args = new HashMap<>();args.put("x-message-ttl",5000);args.put("x-dead-letter-exchange","dead_exchange_direct");args.put("x-dead-letter-routing-key","dead-good"); // 死信路由键dead 路由到键为dead的死信队列// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME, true, false, false, args);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "good");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(800);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}

第二个不争气消费者RecvFromExchange2,这个消费者对应的队列是bad_queue队列,它1秒能处理一条消息,它虽然慢一些但是我就是一视同仁给他设置读取队列消息过期时间也为5秒,绑定dead_exchange_direct死信交换机,死信队列路由键为dead-bad

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;import java.util.HashMap;
import java.util.Map;public class RecvFromExchange2 {private final static String QUEUE_NAME = "bad_queue";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();Map<String,Object> args = new HashMap<>();args.put("x-message-ttl",5000);args.put("x-dead-letter-exchange","dead_exchange_direct");args.put("x-dead-letter-routing-key","dead-bad"); // 死信路由键dead 路由到键为dead的死信队列// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME, true, false, false, args);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "bad");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(1000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}

按顺序先启动DeadExchange,SendToExchange,RecvFromExchange,RecvFromExchange2。然后再次启动SendToExchange,重新发数据观察发现,这两个不争气的消费者漏掉的数据最后被死信队列接收了。

 

接下来我们对我们喜欢的绑定键dead-good的好队列给它兜底擦屁股。

import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;public class FuckDeadQueue {private final static String EXCHANGE_NAME = "dead_exchange_direct";private final static String QUEUE_NAME = "dead_queue";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 绑定队列到交换机  死信路由键为deadchannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dead-good");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(1000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}

 执行后发现,死信队列里的消息被我们消费掉了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/12104.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MIT6.S081】Lab7: Multithreading(详细解答版)

实验内容网址:https://xv6.dgs.zone/labs/requirements/lab7.html 本实验的代码分支:https://gitee.com/dragonlalala/xv6-labs-2020/tree/thread2/ Uthread: switching between threads 关键点:线程切换、swtch 思路: 本实验完成的任务为用户级线程系统设计上下文切换机制…

obb iou计算,旋转框iou,python和c++版本

python版本 import math#包围盒转化为角点 def rbbox_to_corners(rbbox):# generate clockwise corners and rotate it clockwise# 顺时针方向返回角点位置cx, cy, x_d, y_d, angle = rbboxa_cos = math.cos(angle)a_sin = math.sin(angle)corners_x = [-x_d / 2, -x_d / 2, x_…

轻量Backbone论文汇总

持续更新 Rewrite the Stars PDF | Code

iOS 键盘相关

1.键盘出现消失时&#xff0c;参考的view变化的代码&#xff1a; (void)viewDidLoad { [superviewDidLoad]; // Do any additional setup after loading the view. __weaktypeof(self) weakSelf self; _textField [UITextField new]; _textField.backgroundColor [UICol…

python基础语法的数据类型

数据类型 Python中分为六种基本数据类型 不可变类型&#xff08;又叫静态数据类型&#xff0c;没有增删改操作&#xff09;&#xff1a;Number&#xff08;int、float&#xff09;-数值、Boolean-布 尔、String-字符串、Tuple-元组、Bytes-字节可变类型&#xff08;又叫动态…

SGPM02陀螺仪模块通过惯性导航助力AGV小车的发展

之前我们介绍过SGPM01系列陀螺仪模块在智能泳池清洁机器人导航的方案(SGPM01)。这款惯性导航模块收到了许多企业的欢迎。由此&#xff0c;爱普生推出了SGPM02系列陀螺仪模块通过惯性导航&#xff0c;助力AGV小车的发展。 AGV是一种用于运输材料的无人驾驶车辆&#xff0c;并且A…

ICode国际青少年编程竞赛- Python-5级训练场-带参数函数

ICode国际青少年编程竞赛- Python-5级训练场-带参数函数 1、 def get_item(a):Dev.step(a)Dev.step(-a) get_item(4) Spaceship.step(2) get_item(2) Spaceship.step(3) get_item(5) Spaceship.step(2) get_item(3) Spaceship.step(3) get_item(4)2、 def get_item(a): D…

老杨说运维 | 金融业数据中心的发展趋势

【这是老杨在2023.10乌镇大会上的演讲&#xff08;一&#xff09;。接下来&#xff0c;6月初老杨又要在成都开讲了。到时候再发新的】 最近几年&#xff0c;“企业数字化转型”是行业内最热的一个词。当然&#xff0c;“新质生产力”又成了这个月最热的词。虽然新词热词层出不…

基于JAVA8的lambda递归的treeNode树形遍历

1.TreeNode类 import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.util.List;Data NoArgsConstructor AllArgsConstructor public class TreeNode {Integer id;String value;/** 子节点信息 */List<TreeNode> childr…

【calcite】calcite实现SQL列级数据血缘 data lineage 查询

一、背景 大数据数据血缘&#xff0c;内部实现十分复杂一般需要依赖框架。calcite作为apache顶级项目&#xff0c;且为java体系成员&#xff0c;被多个项目所使用&#xff0c;如flink&#xff0c;spark&#xff0c;kafka等。calcite 对mysql&#xff0c;oracle&#xff0c;pos…

SHELL-双重循环习题练习

1.99乘法表 #!/bin/bash #99乘法表for ((second1; second<9; second)) dofor ((first1; first<second; first))do echo -n -e "${first}*${second}$[first*second]\t" done echo done ######### 首先定义了一个外循环变量second&#xff0c;初始值为1&am…

AI 情感聊天机器人工作之旅 —— 与复读机问题的相遇与别离

前言&#xff1a;先前在杭州的一家大模型公司从事海外闲聊机器人产品&#xff0c;目前已经离职&#xff0c;文章主要讨论在闲聊场景下遇到的“复读机”问题以及一些我个人的思考和解决方案。文章内部已经对相关公司和人员信息做了去敏&#xff0c;如仍涉及到机密等情况&#xf…

linux学习:多媒体开发库SDL+视频、音频、事件子系统+处理yuv视频源

目录 编译和移植 视频子系统 视频子系统产生图像的步骤 api 初始化 SDL 的相关子系统 使用指定的宽、高和色深来创建一个视窗 surface 使用 fmt 指定的格式创建一个像素点​编辑 将 dst 上的矩形 dstrect 填充为单色 color​编辑 将 src 快速叠加到 dst 上​编辑 更新…

连锁收银系统源代码有哪些功能,进销存+收银+会员+门店补货+线上商城

在现代零售行业&#xff0c;高效的管理系统是保持连锁店运营顺畅的关键。而开源连锁收银系统作为一款功能丰富的管理软件&#xff0c;为零售企业提供了全面的解决方案&#xff0c;涵盖了进销存管理、收银、会员、门店补货以及线上商城等多个方面&#xff0c;帮助企业实现精细化…

C语言判断字符旋转

前言 今天我们使用c语言来写代码来实现字符串选择的判断&#xff0c;我们来看题目 题目描述 写一个函数&#xff0c;判断一个字符串是否为另外一个字符串旋转之后的字符串。 例如&#xff1a;给定s1 AABCD和s2 BCDAA&#xff0c;返回1 给定s1abcd和s2ACBD&#xff0c;返回0. A…

想白嫖?音视频的文本提取和总结?NoteGPT满足你

NoteGPT实现了音频、录音以及视频的AI总结 NoteGPT最近做了一个功能&#xff1a;Audio Summary&#xff08;Audio Summary with AI - NoteGPT&#xff09; 1&#xff09;完全免费&#xff1b; 2&#xff09;支持mp3、mp4&#xff1b; 3&#xff09;支持URL和本地上传&…

数据特征降维 | 线性判别分析(LDA)附Python代码

线性判别分析(Linear Discriminant Analysis,LDA)是一种经典的监督学习方法,主要用于降维和模式识别任务。与主成分分析(PCA)不同,LDA考虑了类别信息,旨在找到一个投影,使得同类样本尽可能接近,不同类样本尽可能分开。 以下是LDA的基本步骤: 数据准备:收集带有类…

【UE Niagara】在UI上生成粒子

效果 步骤 1. 在虚幻商城中将“Niagara UI Render”插件安装到引擎 2. 打开虚幻编辑器&#xff0c;勾选插件“Niagara UI Renderer”&#xff0c;然后重启编辑器 3. 先创建一个控件蓝图&#xff0c;该控件蓝图只包含一个按钮 这里设置尺寸框尺寸为200*50 4. 显示该控件 5. 新…

Excel——项目管理,设置时间到期自动提醒及颜色高亮

效果图 第一步、自动获取合同到期日期 1、首先合同【签约日期】和【到期日期】下面的数据必须是日期格式&#xff0c;不能是其它的格式否则无法计算&#xff0c;如果是其它格式需要转换成标准的日期格式&#xff0c;如下图所示。 2、在“到期日期”下面的第一个单元格中输入公…

MySQL深入理解事务(详解)

事务概述 事务是数据库区别于文件系统的重要特性之一&#xff0c;当我们有了事务就会让数据库始终保持一致性&#xff0c;同时我们还能通过事务机制恢复到某个时间点&#xff0c;这样可以保证已提交到数据库的修改不会因为系统崩溃而丢失。 1、基本概念 事务&#xff1a;一组…