路由模式
逻辑图
如果我们将生产环境的日志进行处理,而日志是分等级的,我们就按照 error waring info三个等级来讲解
一个消费者是处理【所有】(info,error,warning)的日志,用于做数据仓库,数据挖掘的
一个消费者是处理【错误】(error)日志,用以检测生产环境哪里有bug的
如果有一条 error 的日志,它应当既发送给【所有】,又发送给【错误】
如果有一条 info 的日志,它应当只发送给【所有】
如果有一条 warning 的日志,它应当只发送给【所有】
如果使用发布订阅,将不太好处理以上情形,所有使用路由模式,根据 routingKey 指定规则
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;public class RoutingProducer {/*** 生产者 → 消息队列* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* ----------------* 创建交换机* 创建队列* 交换机绑定到队列* <p>* 发送消息*///定义交换机名称private static final String ROUTING_EXCHANGE_NAME = "my_routing_exchange";//定义一个 error 队列,仅有 error 的日志到这个队列private static final String ERROR_QUEUE_NAME = "my_error_queue";//定义一个 all 队列, error info warning 级别的日志都到这个队列private static final String ALL_QUEUE_NAME = "my_all_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建交换机,使用路由模式的交换机channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);//创建队列channel.queueDeclare(ERROR_QUEUE_NAME, true, false, false, null);channel.queueDeclare(ALL_QUEUE_NAME, true, false, false, null);//绑定交换机/*** String queue :队列名称* String exchange :交换机名称* String routingKey :路由键,fanout 广播模式不需要路由键* Map<String, Object> arguments:参数*/channel.queueBind(ERROR_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "info");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "warning");//发送短信String[] keys = {"error", "info", "warning"};int errorCount = 0;int infoCount = 0;int warningCount = 0;for (int i = 0; i < 30; i++) {int random = (int) (Math.random() * (3 - 1 + 1)) + 0; //生成0,1,2随机数String logLevel = keys[random];String str = "我是 " + logLevel + "\t消息\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());System.out.println("发送消息:\t" + str);channel.basicPublish(ROUTING_EXCHANGE_NAME, logLevel, null, str.getBytes());if (random == 0) {errorCount++;} else if (random == 1) {infoCount++;} else if (random == 2) {warningCount++;}}System.out.println("error\t共计: " + errorCount + "条");System.out.println("info\t共计: " + infoCount + "条");System.out.println("warning\t共计: " + warningCount + "条");// 关闭资源channel.close();connection.close();}
}
消费者
error
- 该消费者只订阅 error 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ErrorRoutingConsumer {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*///定义一个 error 队列,仅有 error 的日志到这个队列private static final String ERROR_QUEUE_NAME = "my_error_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** consumerTag 消费信息标签* delivery 回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("【error消费者】消费消息:\t" + new String(body));};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue : 队列名称* boolean autoAck : 是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback : 回调函数* CancelCallback cancelCallback : 消费者取消订阅时的回调函数*/channel.basicConsume(ERROR_QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
error info warning
- 该消费者订阅 all 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class AllRoutingConsumer {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*///定义一个 all 队列, error info warning 级别的日志都到这个队列private static final String ALL_QUEUE_NAME = "my_all_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** consumerTag 消费信息标签* delivery 回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("【all 消费者】消费消息:\t" + new String(body));};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue : 队列名称* boolean autoAck : 是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback : 回调函数* CancelCallback cancelCallback : 消费者取消订阅时的回调函数*/channel.basicConsume(ALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
测试
-
启动生产者,查看 RabbitMQ 网页控制条
-
启动 error 消费者
-
启动 all 消费者
-
再次启动生产者