4 路由模式

路由模式

逻辑图

image-20210811143722455

如果我们将生产环境的日志进行处理,而日志是分等级的,我们就按照 error waring info三个等级来讲解

一个消费者是处理【所有】(info,error,warning)的日志,用于做数据仓库,数据挖掘的

一个消费者是处理【错误】(error)日志,用以检测生产环境哪里有bug的

如果有一条 error 的日志,它应当既发送给【所有】,又发送给【错误】

如果有一条 info 的日志,它应当只发送给【所有】

如果有一条 warning 的日志,它应当只发送给【所有】

如果使用发布订阅,将不太好处理以上情形,所有使用路由模式,根据 routingKey 指定规则

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;public class RoutingProducer {/*** 生产者 → 消息队列* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* ----------------* 创建交换机* 创建队列* 交换机绑定到队列* <p>* 发送消息*///定义交换机名称private static final String ROUTING_EXCHANGE_NAME = "my_routing_exchange";//定义一个 error 队列,仅有 error 的日志到这个队列private static final String ERROR_QUEUE_NAME = "my_error_queue";//定义一个 all 队列, error info warning 级别的日志都到这个队列private static final String ALL_QUEUE_NAME = "my_all_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}Connection connection = factory.newConnection();Channel channel = connection.createChannel();//创建交换机,使用路由模式的交换机channel.exchangeDeclare(ROUTING_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);//创建队列channel.queueDeclare(ERROR_QUEUE_NAME, true, false, false, null);channel.queueDeclare(ALL_QUEUE_NAME, true, false, false, null);//绑定交换机/*** String queue                 :队列名称* String exchange              :交换机名称* String routingKey            :路由键,fanout 广播模式不需要路由键* Map<String, Object> arguments:参数*/channel.queueBind(ERROR_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "error");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "info");channel.queueBind(ALL_QUEUE_NAME, ROUTING_EXCHANGE_NAME, "warning");//发送短信String[] keys = {"error", "info", "warning"};int errorCount = 0;int infoCount = 0;int warningCount = 0;for (int i = 0; i < 30; i++) {int random = (int) (Math.random() * (3 - 1 + 1)) + 0;   //生成0,1,2随机数String logLevel = keys[random];String str = "我是 " + logLevel + "\t消息\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());System.out.println("发送消息:\t" + str);channel.basicPublish(ROUTING_EXCHANGE_NAME, logLevel, null, str.getBytes());if (random == 0) {errorCount++;} else if (random == 1) {infoCount++;} else if (random == 2) {warningCount++;}}System.out.println("error\t共计: " + errorCount + "条");System.out.println("info\t共计: " + infoCount + "条");System.out.println("warning\t共计: " + warningCount + "条");// 关闭资源channel.close();connection.close();}
}

消费者

error

  • 该消费者只订阅 error 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ErrorRoutingConsumer {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*///定义一个 error 队列,仅有 error 的日志到这个队列private static final String ERROR_QUEUE_NAME = "my_error_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** consumerTag  消费信息标签* delivery     回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("【error消费者】消费消息:\t" + new String(body));};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue                         :   队列名称* boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback      :   回调函数* CancelCallback cancelCallback        :   消费者取消订阅时的回调函数*/channel.basicConsume(ERROR_QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

error info warning

  • 该消费者订阅 all 的队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class AllRoutingConsumer {/*** 消息队列 ← 消费者* 创建连接工厂,并设置参数* 创建连接 Connection* 创建通道 Channel* 订阅队列* 接收消息*///定义一个 all 队列, error info warning 级别的日志都到这个队列private static final String ALL_QUEUE_NAME = "my_all_queue";public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂,并设置参数ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}//创建连接 ConnectionConnection connection = factory.newConnection();//创建通道 ChannelChannel channel = connection.createChannel();/*** consumerTag  消费信息标签* delivery     回执*/DeliverCallback deliverCallback = (consumerTag, delivery) -> {byte[] body = delivery.getBody();System.out.println("【all 消费者】消费消息:\t" + new String(body));};/*** basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)* String queue                         :   队列名称* boolean autoAck                      :   是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列* DeliverCallback deliverCallback      :   回调函数* CancelCallback cancelCallback        :   消费者取消订阅时的回调函数*/channel.basicConsume(ALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

测试

  • 启动生产者,查看 RabbitMQ 网页控制条

  • 启动 error 消费者

  • 启动 all 消费者

  • 再次启动生产者

image-20210811155433733

image-20210811155448009image-20210811155457070

SpringBoot 整合

小结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/52423.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis搭建集成

图示 正常来讲配置一主两从需要三台服务器,博主内存告急,就使用一台进行操作了,使用多台跟一台操作没有区别,只是多台不需要新建太多配置文件 一. 准备配置文件 如果你跟我一样是在一台服务器里面进行配置主从服务的,跟我一起操作即可 找到redis目录 在bin目录同位置创建一…

Linux驱动.之驱动开发思维,设备,驱动,总线分析思想,驱动的分类(字符设备,块设备,网络设备)

在stm32&#xff0c;裸机开发时&#xff0c;偏底层&#xff0c;跟寄存器打交道&#xff0c;有些MCU提供了库&#xff0c;库也还是操作寄存器的&#xff0c;通过配置寄存器&#xff0c; 配置各种工作模式&#xff0c;时钟&#xff0c;等等&#xff0c;交换数据等等。 Linux下驱…

SpringBoot开发——整合Spring Data JPA

文章目录 1、创建项目,添加Spring Data JPA依赖2、创建数据表student3、创建实体类Student4、创建接口文件5、创建Controller6、配置application.yml7、整合完成8、其它一些说明Spring Data JPA(Java Persistence API) 是 Spring 基于 ORM 框架、JPA 规范的基础上封装的一套JP…

Unity笔记之静态/动态合批

借用博主链接 一、静态合批 1、首先项目设置里面需要勾选静态合批 2、添加静态合批选项 3、至此就完成了&#xff0c;至于成功没有就要去分析器里面看了。 静态合批注意问题&#xff1a; 二、动态合批 1、首先项目设置里面需要勾选动态合批 2、调用 StaticBatchingUtilit…

给大家推荐好用的AI网站

地址&#xff1a;https://ai.ashuiai.com/auth/register?inviteCode8E8DIC1QCR 个人觉得挺好用的&#xff0c;可以免费&#xff0c;免费有限制次数&#xff0c;也可以会员升级200永久免费&#xff0c;我用的200永久免费。 可以在国内使用以下ai模型 回答问题更智能&#xff…

IBM中国研发部裁员:全球化背景下的IT产业变局与应对之道

裁员风波中的思考与机遇 前言了解霍尼韦尔的“东方服务东方”施耐德电气的“中国中心”战略对比与分析 中国信息技术(IT)行业展现出蓬勃发展的前景**政府支持与政策导向****技术创新与应用****市场规模与需求****人才培养与就业**国际化与开放合作总结 前言 如何看待IBM中国研发…

“冰山之下”:谁在成为车企的真正智能助手?

“其实我们一直扮演的角色就是数字化助手&#xff0c;也就是别人可以去挖金&#xff0c;我们给大家提供铲子&#xff0c;这是我们扮演的角色&#xff0c;而现在我们希望给大家提供最好的铲子。” 作者| 皮爷 出品|产业家 如果说AI发展的最鲜明印痕是什么&#xff1f;有人…

【原创教程】自动化工程案例01:8工位插针装配机03-程序解读

在前面两篇文章中&#xff0c;我们介绍了8工位设备每个工位的情况&#xff0c;然后我们介绍了触摸屏的情况&#xff0c;接着我们来看一下程序。关于一些实物照片不宜公开发表&#xff0c;需要的可以私信。 程序系统块设置 系统块中的模块实际上是我们所使用的的硬件设施 符号…

本地Linux服务器使用docker搭建DashDot并实现公网实时监测服务器信息

文章目录 前言1. 本地环境检查1.1 安装docker1.2 下载Dashdot镜像 2. 部署DashDot应用3. 本地访问DashDot服务4. 安装cpolar内网穿透5. 固定DashDot公网地址 前言 本篇文章我们将使用Docker在本地部署DashDot服务器仪表盘&#xff0c;并且结合cpolar内网穿透工具可以实现公网实…

HC-SR501人体红外传感器详解(STM32)

目录 一、介绍 二、传感器原理 1.原理图 2.引脚描述 3.工作原理介绍 三、程序设计 main.c文件 body_hw.h文件 body_hw.c文件 四、实验效果 五、资料获取 项目分享 一、介绍 HC-SR501人体红外模块是基于红外线技术的自动控制模块&#xff0c;采用德国原装进口LHI77…

在 ArkTS 中,如何有效地进行内存管理和避免内存泄漏?

ArkTS 是鸿蒙生态的应用开发语言&#xff0c;它在 TypeScript 的基础上进行了优化和定制&#xff0c;以适应鸿蒙系统的需求。 以下是在 ArkTS 中进行有效的内存管理和避免内存泄漏&#xff1a; 1. 使用 const 和 let 合理声明变量&#xff1a; 使用 const 声明那些不会重新赋…

USB总线开关量DIO采集卡,24路数字量输入输出及32位计数器卡USB5801

阿尔泰科技 型号&#xff1a;USB5801 概述&#xff1a; 产品应用&#xff1a; 指标参数&#xff1a; 数字量 通道数 24路&#xff0c;每8路可配置成输入或输出 电气标准 TTL兼容 数字量输入 高电平的低电压&#xff1a;2V 低电平的高电压&#xff1a;0.8V 数字量输出 …

数据结构基础讲解(七)——数组和广义表专项练习

本文数据结构讲解参考书目&#xff1a; 通过网盘分享的文件&#xff1a;数据结构 C语言版.pdf 链接: https://pan.baidu.com/s/159y_QTbXqpMhNCNP_Fls9g?pwdze8e 提取码: ze8e 数据结构基础讲解&#xff08;六&#xff09;——串的专项练习-CSDN博客 个人主页&#xff1a;樱娆…

替代区块链

随着比特币的成功&#xff0c;人们逐渐意识到区块链技术的潜力&#xff0c;并随之出现了迅速的发展&#xff0c;各种区块链协议、应用程序和平台相应产生。 需要指出的是&#xff0c;在这种多元的局面下&#xff0c;很多项目迅速失去了它们的吸引力。事实上&#xff0c;有不少项…

ITK-高斯滤波

作者&#xff1a;翟天保Steven 版权声明&#xff1a;著作权归作者所有&#xff0c;商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处 高斯滤波原理 高斯滤波&#xff08;Gaussian Blur&#xff09;是数字图像处理中常见的一种平滑滤波器&#xff0c;旨在通过模糊处…

捉虫笔记(四)-- 空格引发的悬案

空格引发的悬案 1、描述现象: 在代码中有一段利用rmdir指令删除目录代码&#xff0c;但是有用户反馈一直删除失败&#xff0c;但是有没有看到错误的日志信息&#xff0c;正好有同事能复现,所以今天好好探究一番。 2、思考过程 很好奇的一点就是为什么有的环境就是正常。 首…

Edge-Triggered模式:反应堆

Linux: Linux Code - Gitee.comhttps://gitee.com/RuofengMao/linux/tree/master/Reactor

yolov5-6.2 在 rk3399pro 上的移植

文章目录 一、搭建yolov5环境二、导出onnx模型三、安装 rknn-toolkit四、通过netron 查看 yolov5s.onnx 输出节点五、创建 onnx2rknn.py 文件六、通过toolkit将onnx转换为rknn模型七、在rk3399pro开发板上运行rknn模型 一、搭建yolov5环境 从yolov5官方仓库 ultralytics/yolov…

[论文笔记] LLM大模型剪枝篇——2、剪枝总体方案

https://github.com/sramshetty/ShortGPT/tree/main My剪枝方案(暂定): 剪枝目标:1.5B —> 100~600M 剪枝方法: 层粒度剪枝 1、基于BI分数选择P%的冗余层,P=60~80 2、对前N%冗余层,直接删除full layer。N=20(N:剪枝崩溃临界点,LLaMA2在45%,Mistral-7B在35%,Qw…

【DS18B20 简单开发】

DS18B20 是一种数字温度传感器&#xff0c;由 Maxim Integrated 生产。它提供了一个非常简单的方式来将温度测量集成到微控制器系统中。以下是关于 DS18B20 的一些关键特性&#xff1a; 单总线协议&#xff1a;DS18B20 使用单总线&#xff08;1-Wire&#xff09;数字通信协议&…