假设我们有一个场景,生产者有消息发到某个直连交换机,这个交换机上有两个队列分别存储两种类型的消息,但是与这两个队列相连的消费者太不争气了,处理消息有点慢,我们想5秒钟这个消息在队列中还没有被消费的话,就给它丢进死信队列里得了(我们平时听到的延时队列其实就可按此方法实现,故意让它过期然后延时处理),后续再处理,但是这俩队列明显存储的消息不一样,我们又不好意思将它都扔到同一个死信队列里去,如果我们想要俩死信队列分别装这两个消费者漏掉的消息,那我们怎么做呢?
下面就是一个简单的例子,如果用spring boot之类的去做也是类似,原理差不多,感兴趣的可以自己改造。
预处理:我们先创建一个工具类用来连接rabbitmq,注意你需要去创建对应的虚拟主机,以及对应的登录账号和密码。
工具类如下:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;public class ConnectionUtil {public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("localhost");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhost//VirtualHost(虚拟主机)是一个逻辑上独立的RabbitMQ服务实例。每个VirtualHost都有自己的队列、交换机、绑定等对象,并且它们之间是相互隔离的,即exchange、queue、message不能互通。factory.setVirtualHost("myVirtualHost");factory.setUsername("mytest");factory.setPassword("mytest");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}
}
现在我们有一个直连交换机test_exchange_direct(直连交换机即根据设置的固定键直接路由到对应的队列,注意与主题topic队列的区分),我们往这个交换机里每300毫秒分别发送键为good和bad的数据各30条。
import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class SendToExchange {private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");//这里要注意,如果你没有响应的队列的话即交换机还没有绑定队列,发送消息到交换机这些消息会丢失。for (int i = 0; i < 30; i++) {// 消息内容String message = "good " + i;//会路由到good对应的队列上channel.basicPublish(EXCHANGE_NAME, "good", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(300);}for (int i = 0; i < 30; i++) {// 消息内容String message = "bad " + i;//会路由到bad对应的队列上channel.basicPublish(EXCHANGE_NAME, "bad", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(300);}channel.close();connection.close();}
}
我们再创建一个直连死信交换机dead_exchange_direct,和连接到此私信交换机上的两个队列dead_queue,dead_queue1,对应的键分别为dead-good和dead-bad。
import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class DeadExchange {private final static String EXCHANGE_NAME = "dead_exchange_direct";private final static String QUEUE_NAME = "dead_queue";private final static String QUEUE_NAME1 = "dead_queue1";public static void main(String[] argv) throws Exception {channel1();channel2();}public static void channel1() throws Exception{// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定队列到交换机 死信路由键为deadchannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dead-good");channel.close();connection.close();}public static void channel2() throws Exception{// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME1, true, false, false, null);// 绑定队列到交换机 死信路由键为deadchannel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "dead-bad");channel.close();connection.close();}
第一个不争气消费者RecvFromExchange,这个消费者对应的队列是good_queue队列,它800毫秒能处理一条消息,给他设置读取队列消息过期时间为5秒,绑定dead_exchange_direct死信交换机,死信队列路由键为dead-good。
import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;import java.util.HashMap;
import java.util.Map;public class RecvFromExchange {private final static String QUEUE_NAME = "good_queue";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();Map<String,Object> args = new HashMap<>();args.put("x-message-ttl",5000);args.put("x-dead-letter-exchange","dead_exchange_direct");args.put("x-dead-letter-routing-key","dead-good"); // 死信路由键dead 路由到键为dead的死信队列// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME, true, false, false, args);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "good");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(800);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}
第二个不争气消费者RecvFromExchange2,这个消费者对应的队列是bad_queue队列,它1秒能处理一条消息,它虽然慢一些但是我就是一视同仁给他设置读取队列消息过期时间也为5秒,绑定dead_exchange_direct死信交换机,死信队列路由键为dead-bad。
import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;import java.util.HashMap;
import java.util.Map;public class RecvFromExchange2 {private final static String QUEUE_NAME = "bad_queue";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();Map<String,Object> args = new HashMap<>();args.put("x-message-ttl",5000);args.put("x-dead-letter-exchange","dead_exchange_direct");args.put("x-dead-letter-routing-key","dead-bad"); // 死信路由键dead 路由到键为dead的死信队列// 声明队列 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费// 队列名称,是否持久化,是否排他,是否自动删除,自定义属性channel.queueDeclare(QUEUE_NAME, true, false, false, args);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "bad");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(1000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}
按顺序先启动DeadExchange,SendToExchange,RecvFromExchange,RecvFromExchange2。然后再次启动SendToExchange,重新发数据观察发现,这两个不争气的消费者漏掉的数据最后被死信队列接收了。
接下来我们对我们喜欢的绑定键dead-good的好队列给它兜底擦屁股。
import com.dubbo.study.dubbostudy.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;public class FuckDeadQueue {private final static String EXCHANGE_NAME = "dead_exchange_direct";private final static String QUEUE_NAME = "dead_queue";public static void main(String[] argv) throws Exception {// 获取到连接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 绑定队列到交换机 死信路由键为deadchannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dead-good");// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 定义队列的消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 获取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [Recv] Received '" + message + "'");Thread.sleep(1000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}}
执行后发现,死信队列里的消息被我们消费掉了。