4 路由模式

路由模式

逻辑图

image-20210811143722455

如果我们将生产环境的日志进行处理,而日志是分等级的,我们就按照 error waring info三个等级来讲解

一个消费者是处理【所有】(info,error,warning)的日志,用于做数据仓库,数据挖掘的

一个消费者是处理【错误】(error)日志,用以检测生产环境哪里有bug的

如果有一条 error 的日志,它应当既发送给【所有】,又发送给【错误】

如果有一条 info 的日志,它应当只发送给【所有】

如果有一条 warning 的日志,它应当只发送给【所有】

如果使用发布订阅,将不太好处理以上情形,所有使用路由模式,根据 routingKey 指定规则

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;public class RoutingProducer {/*** 生产者 → 消息队列* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* ----------------* 创建交换机* 创建队列* 交换机绑定到队列* <p>* 发送消息*///定义交换机名称private static final String ROUTING_EXCHANGE_NAME = "my_routing_exchange";//定义一个 error 队列,仅有 error 的日志到这个队列private static final String ERROR_QUEUE_NAME = "my_error_queue";//定义一个 all 队列, error info warning 级别的日志都到这个队列private static final String ALL_QUEUE_NAME = "my_all_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建交换机,使用路由模式的交换机channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);//创建队列channel.queueDeclare(ERROR_QUEUE_NAME, true, false, false, null);channel.queueDeclare(ALL_QUEUE_NAME, true, false, false, null);//绑定交换机/*** String queue                 :队列名称* String exchange              :交换机名称* String routingKey            :路由键,fanout 广播模式不需要路由键* Map<String, Object> arguments:参数*/channel.queueBind(ERROR_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "info");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "warning");//发送短信String[] keys = {"error", "info", "warning"};int errorCount = 0;int infoCount = 0;int warningCount = 0;for (int i = 0; i < 30; i++) {int random = (int) (Math.random() * (3 - 1 + 1)) + 0;   //生成0,1,2随机数String logLevel = keys[random];String str = "我是 " + logLevel + "\t消息\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());System.out.println("发送消息:\t" + str);channel.basicPublish(ROUTING_EXCHANGE_NAME, logLevel, null, str.getBytes());if (random == 0) {errorCount++;} else if (random == 1) {infoCount++;} else if (random == 2) {warningCount++;}}System.out.println("error\t共计: " + errorCount + "条");System.out.println("info\t共计: " + infoCount + "条");System.out.println("warning\t共计: " + warningCount + "条");// 关闭资源channel.close();connection.close();}
}

消费者

error

  • 该消费者只订阅 error 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ErrorRoutingConsumer {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*///定义一个 error 队列,仅有 error 的日志到这个队列private static final String ERROR_QUEUE_NAME = "my_error_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** consumerTag  消费信息标签* delivery     回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("【error消费者】消费消息:\t" + new String(body));};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue                         :   队列名称* boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback      :   回调函数* CancelCallback cancelCallback        :   消费者取消订阅时的回调函数*/channel.basicConsume(ERROR_QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

error info warning

  • 该消费者订阅 all 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class AllRoutingConsumer {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*///定义一个 all 队列, error info warning 级别的日志都到这个队列private static final String ALL_QUEUE_NAME = "my_all_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** consumerTag  消费信息标签* delivery     回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("【all 消费者】消费消息:\t" + new String(body));};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue                         :   队列名称* boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback      :   回调函数* CancelCallback cancelCallback        :   消费者取消订阅时的回调函数*/channel.basicConsume(ALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

测试

  • 启动生产者,查看 RabbitMQ 网页控制条

  • 启动 error 消费者

  • 启动 all 消费者

  • 再次启动生产者

image-20210811155433733

image-20210811155448009image-20210811155457070

SpringBoot 整合

小结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/52423.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis搭建集成

图示 正常来讲配置一主两从需要三台服务器,博主内存告急,就使用一台进行操作了,使用多台跟一台操作没有区别,只是多台不需要新建太多配置文件 一. 准备配置文件 如果你跟我一样是在一台服务器里面进行配置主从服务的,跟我一起操作即可 找到redis目录 在bin目录同位置创建一…

Linux驱动.之驱动开发思维,设备,驱动,总线分析思想,驱动的分类(字符设备,块设备,网络设备)

在stm32&#xff0c;裸机开发时&#xff0c;偏底层&#xff0c;跟寄存器打交道&#xff0c;有些MCU提供了库&#xff0c;库也还是操作寄存器的&#xff0c;通过配置寄存器&#xff0c; 配置各种工作模式&#xff0c;时钟&#xff0c;等等&#xff0c;交换数据等等。 Linux下驱…

Unity笔记之静态/动态合批

借用博主链接 一、静态合批 1、首先项目设置里面需要勾选静态合批 2、添加静态合批选项 3、至此就完成了&#xff0c;至于成功没有就要去分析器里面看了。 静态合批注意问题&#xff1a; 二、动态合批 1、首先项目设置里面需要勾选动态合批 2、调用 StaticBatchingUtilit…

给大家推荐好用的AI网站

地址&#xff1a;https://ai.ashuiai.com/auth/register?inviteCode8E8DIC1QCR 个人觉得挺好用的&#xff0c;可以免费&#xff0c;免费有限制次数&#xff0c;也可以会员升级200永久免费&#xff0c;我用的200永久免费。 可以在国内使用以下ai模型 回答问题更智能&#xff…

IBM中国研发部裁员:全球化背景下的IT产业变局与应对之道

裁员风波中的思考与机遇 前言了解霍尼韦尔的“东方服务东方”施耐德电气的“中国中心”战略对比与分析 中国信息技术(IT)行业展现出蓬勃发展的前景**政府支持与政策导向****技术创新与应用****市场规模与需求****人才培养与就业**国际化与开放合作总结 前言 如何看待IBM中国研发…

“冰山之下”:谁在成为车企的真正智能助手?

“其实我们一直扮演的角色就是数字化助手&#xff0c;也就是别人可以去挖金&#xff0c;我们给大家提供铲子&#xff0c;这是我们扮演的角色&#xff0c;而现在我们希望给大家提供最好的铲子。” 作者| 皮爷 出品|产业家 如果说AI发展的最鲜明印痕是什么&#xff1f;有人…

【原创教程】自动化工程案例01:8工位插针装配机03-程序解读

在前面两篇文章中&#xff0c;我们介绍了8工位设备每个工位的情况&#xff0c;然后我们介绍了触摸屏的情况&#xff0c;接着我们来看一下程序。关于一些实物照片不宜公开发表&#xff0c;需要的可以私信。 程序系统块设置 系统块中的模块实际上是我们所使用的的硬件设施 符号…

本地Linux服务器使用docker搭建DashDot并实现公网实时监测服务器信息

文章目录 前言1. 本地环境检查1.1 安装docker1.2 下载Dashdot镜像 2. 部署DashDot应用3. 本地访问DashDot服务4. 安装cpolar内网穿透5. 固定DashDot公网地址 前言 本篇文章我们将使用Docker在本地部署DashDot服务器仪表盘&#xff0c;并且结合cpolar内网穿透工具可以实现公网实…

HC-SR501人体红外传感器详解(STM32)

目录 一、介绍 二、传感器原理 1.原理图 2.引脚描述 3.工作原理介绍 三、程序设计 main.c文件 body_hw.h文件 body_hw.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 HC-SR501人体红外模块是基于红外线技术的自动控制模块&#xff0c;采用德国原装进口LHI77…

USB总线开关量DIO采集卡,24路数字量输入输出及32位计数器卡USB5801

阿尔泰科技 型号&#xff1a;USB5801 概述&#xff1a; 产品应用&#xff1a; 指标参数&#xff1a; 数字量 通道数 24路&#xff0c;每8路可配置成输入或输出 电气标准 TTL兼容 数字量输入 高电平的低电压&#xff1a;2V 低电平的高电压&#xff1a;0.8V 数字量输出 …

数据结构基础讲解(七)——数组和广义表专项练习

本文数据结构讲解参考书目&#xff1a; 通过网盘分享的文件&#xff1a;数据结构 C语言版.pdf 链接: https://pan.baidu.com/s/159y_QTbXqpMhNCNP_Fls9g?pwdze8e 提取码: ze8e 数据结构基础讲解&#xff08;六&#xff09;——串的专项练习-CSDN博客 个人主页&#xff1a;樱娆…

替代区块链

随着比特币的成功&#xff0c;人们逐渐意识到区块链技术的潜力&#xff0c;并随之出现了迅速的发展&#xff0c;各种区块链协议、应用程序和平台相应产生。 需要指出的是&#xff0c;在这种多元的局面下&#xff0c;很多项目迅速失去了它们的吸引力。事实上&#xff0c;有不少项…

ITK-高斯滤波

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 高斯滤波原理 高斯滤波&#xff08;Gaussian Blur&#xff09;是数字图像处理中常见的一种平滑滤波器&#xff0c;旨在通过模糊处…

Edge-Triggered模式:反应堆

Linux: Linux Code - Gitee.comhttps://gitee.com/RuofengMao/linux/tree/master/Reactor

yolov5-6.2 在 rk3399pro 上的移植

文章目录 一、搭建yolov5环境二、导出onnx模型三、安装 rknn-toolkit四、通过netron 查看 yolov5s.onnx 输出节点五、创建 onnx2rknn.py 文件六、通过toolkit将onnx转换为rknn模型七、在rk3399pro开发板上运行rknn模型 一、搭建yolov5环境 从yolov5官方仓库 ultralytics/yolov…

【DS18B20 简单开发】

DS18B20 是一种数字温度传感器&#xff0c;由 Maxim Integrated 生产。它提供了一个非常简单的方式来将温度测量集成到微控制器系统中。以下是关于 DS18B20 的一些关键特性&#xff1a; 单总线协议&#xff1a;DS18B20 使用单总线&#xff08;1-Wire&#xff09;数字通信协议&…

HTML添加文字

一、创建HTML5文档基本标签 <!DOCTYPE html> //定义文档类型 <html> //定义HTML文档<head> //定义关于文档的信息<title>文档标题</title> //定义文档的标题<meta charset"utf-8" /> //定义文档的字符编码</head&…

远程连接Hiveserver2服务

目录 1.修改 core-site.xml 和 hive-site.xml 的配置文件 2.启动HiveServer2服务 3.启动Beeline工具连接Hiveserver2服务 4.利用IDEA工具连接Hiveserver2服务 完成Hive本地模式安装后&#xff0c;可以启动hiveserver2服务进行远程连接和操作Hive。 1.修改 core-site.xml …

rancher upgrade 【rancher 升级】

文章目录 1. 背景2. 下载3. 安装4. 检查5. 测试5.1 创建项目5.2 创建应用5.3 删除集群5.4 注册集群 1. 背景 rancher v2.8.2 升级 v2.9.1 2. 下载 下载charts helm repo add rancher-latest https://releases.rancher.com/server-charts/latest helm repo update helm fetc…

常见概念 -- 光回波损耗

什么是回波损耗 回波损耗&#xff0c;又称为反射损耗&#xff0c;当高速信号进入或退出光纤的某个部分&#xff08;例如光纤连接器&#xff09;&#xff0c;不连续和阻抗不匹配会引起反射&#xff0c;这就是光纤回波损耗。器件的回波损耗Return Loss(RL)是光信号的输入端口的反…