RabbitMQ工作模式-路由模式

官方文档参考:https://www.rabbitmq.com/tutorials/tutorial-four-python.html
在这里插入图片描述

使用direct类型的Exchange,发N条消息并使用不同的routingKey,消费者定义队列并将队列routingKey、Exchange绑定。此时使用direct模式Exchange必须要routingKey完成匹配的情况下消息才会转发到对应的队列中被消费。

样例使用日志分发为样例。即按日志不同的级别,分发到不同的队列。每个队列只处理自己的对应的级别日志。

创建生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;public class Product {private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明交换机,交换器和消息队列的绑定不需要在这里处理。channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);for (int i = 0; i < 30; i++) {String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];String dataMsg = "[" + level + "] 消息发送 :" + i;// 发送消息channel.basicPublish("ex.routing", level, null, dataMsg.getBytes(StandardCharsets.UTF_8));}}
}

创建ERROR的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.nio.charset.StandardCharsets;public class ErrorConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列并绑定channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);// 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。channel.queueDeclare("log.error",// 永久false,// 排他false,// 自动删除true,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind("log.error", "ex.routing", "ERROR");// 通过chanel消费消息channel.basicConsume("log.error",(consumerTag, message) -> {System.out.println("ERROR收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag -> {});}
}

创建INFO级的消费者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class InfoConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列并绑定channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识true,// 属性null);// 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。channel.queueDeclare("log.info",// 永久false,// 排他false,// 自动删除false,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind("log.info", "ex.routing", "INFO");// 通过chanel消费消息channel.basicConsume("log.info",(consumerTag, message) -> {System.out.println("INFO收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag -> {});}
}

创建WARN级别的消息者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;public class WarnConsumer {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri("amqp://root:123456@node1:5672/%2f");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列并绑定channel.exchangeDeclare("ex.routing",BuiltinExchangeType.DIRECT,// 持久的标识false,// 自动删除的标识false,// 属性null);// 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。channel.queueDeclare("log.warn",// 永久false,// 排他false,// 自动删除true,// 属性null);//消费者享有绑定到交换器的权力。channel.queueBind("log.warn", "ex.routing", "WARN");// 通过chanel消费消息channel.basicConsume("log.warn",(consumerTag, message) -> {System.out.println("warn收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));},consumerTag -> {});}
}

首先启动三个消费者:

查看队列及交换机情况

[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name               │ type    │
├────────────────────┼─────────┤
│ amq.fanout         │ fanout  │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic   │
├────────────────────┼─────────┤
│ amq.headers        │ headers │
├────────────────────┼─────────┤
│ amq.topic          │ topic   │
├────────────────────┼─────────┤
│ amq.direct         │ direct  │
├────────────────────┼─────────┤
│                    │ direct  │
├────────────────────┼─────────┤
│ ex.routing         │ direct  │
├────────────────────┼─────────┤
│ amq.match          │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌─────────────┬─────────────┬──────────────────┬──────────────────┬─────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.info         │ queue            │ log.info    │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.warn         │ queue            │ log.warn    │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│             │ exchange    │ log.error        │ queue            │ log.error   │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.error        │ queue            │ ERROR       │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.info         │ queue            │ INFO        │           │
├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
│ ex.routing  │ exchange    │ log.warn         │ queue            │ WARN        │           │
└─────────────┴─────────────┴──────────────────┴──────────────────┴─────────────┴───────────┘
[root@nullnull-os ~]# 

可以发现,交换器ex.routing 绑定了三个队列log.errorlog.info log.warn并指定了路由键。

启动消费者,查看消息通否被正常消费。

ERROR的消费者控制台输出

ERROR收到的消息:[ERROR] 消息发送 :1
ERROR收到的消息:[ERROR] 消息发送 :2
ERROR收到的消息:[ERROR] 消息发送 :6
ERROR收到的消息:[ERROR] 消息发送 :8
ERROR收到的消息:[ERROR] 消息发送 :9
ERROR收到的消息:[ERROR] 消息发送 :11
ERROR收到的消息:[ERROR] 消息发送 :15
ERROR收到的消息:[ERROR] 消息发送 :16
ERROR收到的消息:[ERROR] 消息发送 :19
ERROR收到的消息:[ERROR] 消息发送 :20
ERROR收到的消息:[ERROR] 消息发送 :21
ERROR收到的消息:[ERROR] 消息发送 :23
ERROR收到的消息:[ERROR] 消息发送 :24
ERROR收到的消息:[ERROR] 消息发送 :27
ERROR收到的消息:[ERROR] 消息发送 :28

INFO的消费者控制台输出:

INFO收到的消息:[INFO] 消息发送 :0
INFO收到的消息:[INFO] 消息发送 :3
INFO收到的消息:[INFO] 消息发送 :4
INFO收到的消息:[INFO] 消息发送 :13
INFO收到的消息:[INFO] 消息发送 :14
INFO收到的消息:[INFO] 消息发送 :22
INFO收到的消息:[INFO] 消息发送 :25

WARN的消费都控制台输出:

warn收到的消息:[WARN] 消息发送 :5
warn收到的消息:[WARN] 消息发送 :7
warn收到的消息:[WARN] 消息发送 :10
warn收到的消息:[WARN] 消息发送 :12
warn收到的消息:[WARN] 消息发送 :17
warn收到的消息:[WARN] 消息发送 :18
warn收到的消息:[WARN] 消息发送 :26
warn收到的消息:[WARN] 消息发送 :29

至此,验证已经完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/61163.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

模块化与组件化:开发中的双剑合璧

引言&#xff1a;模块化与组件化的重要性 在现代软件开发中&#xff0c;随着项目规模的增长和技术的复杂性增加&#xff0c;如何有效地组织和管理代码变得越来越重要。模块化与组件化作为两种主要的代码组织方法&#xff0c;为开发者提供了有效的工具&#xff0c;帮助他们创建…

C语言练习7(巩固提升)

C语言练习7 编程题 前言 “芳林新叶催陈叶&#xff0c;流水前波让后波。”改革开放40年来&#xff0c;我们以敢闯敢干的勇气和自我革新的担当&#xff0c;闯出了一条新路、好路&#xff0c;实现了从“赶上时代”到“引领时代”的伟大跨越。今天&#xff0c;我们要不忘初心、牢记…

MyBatisPlus实现多租户功能

前言&#xff1a;多租户是一种软件架构技术&#xff0c;在多用户的环境下&#xff0c;共有同一套系统&#xff0c;并且要注意数据之间的隔离性。 一、SaaS多租户简介 1.1、SaaS多租户 SaaS&#xff0c;是Software-as-a-Service的缩写名称&#xff0c;意思为软件即服务&#x…

CotEditor for mac 4.0.1 中文版(开源文本编辑器)

coteditorformac是一款简单实用的基于Cocoa的macOS纯文本编辑器&#xff0c;coteditormac版本可以用来编辑网页、结构化文本、程序源代码等文本文件&#xff0c;使用起来非常方便。 CotEditor for Mac具有正则表达式搜索和替换、语法高亮、编码等实用功能&#xff0c;而CotEdi…

qt信号与槽

输入账户密码成功则跳转界面 widget.cpp #include "widget.h" //自己的头文件Widget::Widget(QWidget *parent) //构造函数的定义: QWidget(parent) …

django自动创建model数据

目前使用的环境&#xff1a;django4.2.3&#xff0c;python3.10 django通过一些第三方库&#xff0c;可以轻易的自动生成一系列的后台数据。 首先先创建一个数据库&#xff1a; 然后&#xff0c;在setting.py中就可以指定我们新创建的数据库了。 DATABASES {default: {ENGI…

ChatGPT 与前端技术实现制作大屏可视化

像这样的综合案例实分析,我们可以提供案例,维度与指标数据,让ChatGPT与AIGC 帮写出完整代码,并进行一个2行2列的布局设置。 数据与指令如下: 商品名称 销量 目标 完成率 可乐 479 600 79.83% 雪碧 324 600 54.00% 红茶 379 600 63.…

【C语言】循环语句详解

✨个人主页&#xff1a; Anmia.&#x1f389;所属专栏&#xff1a; C Language &#x1f383;操作环境&#xff1a; Visual Studio 2019 版本 目录 1.什么是循环结构&#xff1f; 2.while循环 while流程图 while语句中的break和continue break continue 3.for循环 for流…

头歌MYSQL——课后作业1 数据库和数据表的建立、修改和删除

第1关&#xff1a;建立数据库 任务描述 本关任务&#xff1a;建立数据库 为了完成本关任务&#xff0c;你需要掌握&#xff1a; 如何创建数据库&#xff0c;显示已经建立的数据库 相关知识 创建数据库 创建数据库是在系统磁盘上划分一块区域用于数据的存储和管理。 命令格…

HTML 播放器效果

效果图 实现代码 <!DOCTYPE HTML> <html><head><title>爱看动漫社区 | 首页 </title><link href"css/bootstrap.css" relstylesheet typetext/css /><!-- jQuery --><script src"js/jquery-1.11.0.min.js"…

可拖拽编辑的流程图X6

先上图 //index.html&#xff0c;有时候可能加载失败&#xff0c;那就再找一个别的cdn 或者npm下载&#xff0c;如果npm下载&#xff0c; //那么需要全局引入或者局部引入&#xff0c;代码里面写法也会不同&#xff0c;详细的可以看示例<script src"https://cdn.jsdeli…

白嫖idea

白嫖idea 地址 https://www.jetbrains.com/toolbox-app/

每日一题:leetcode 1267 统计参与通信的服务器

这里有一幅服务器分布图&#xff0c;服务器的位置标识在 m * n 的整数矩阵网格 grid 中&#xff0c;1 表示单元格上有服务器&#xff0c;0 表示没有。 如果两台服务器位于同一行或者同一列&#xff0c;我们就认为它们之间可以进行通信。 请你统计并返回能够与至少一台其他服务…

jmeter性能测试步骤实战教程

1. Jmeter是什么&#xff1f; 2. Jmeter安装 2.1 JDK安装 由于Jmeter是基于java开发&#xff0c;首先需要下载安装JDK &#xff08;目前JMeter只支持到Java 8&#xff0c;尚不支持 Java 9&#xff09; 1. 官网下载地址&#xff1a; http://www.oracle.com/technetwork/java/…

【java安全】FastJson反序列化漏洞浅析

文章目录 【java安全】FastJson反序列化漏洞浅析0x00.前言0x01.FastJson概述0x02.FastJson使用序列化与反序列化 0x03.反序列化漏洞0x04.漏洞触发条件0x05.漏洞攻击方式JdbcRowSetImpl利用链TemplatesImpl利用链**漏洞版本**POC漏洞分析 【java安全】FastJson反序列化漏洞浅析 …

Ubuntu20.04安装ROS

Ubuntu20.04安装ROS Excerpt ubuntu安装方式有两种&#xff0c;一种是安装ubuntu系统&#xff0c;另一种是在windows下安装虚拟机&#xff0c;在虚拟机里安装ubuntu。下面为双系统安装ubuntu&#xff08;用虚拟机装ubuntu会很卡&#xff0c;bug很多&#xff0c;除非电脑配置极好…

java八股文面试[多线程]——Happens-Before规则

TODO 知识来源&#xff1a; 【23版面试突击】你知道什么是 happens-before 原则吗&#xff1f;_哔哩哔哩_bilibili 【2023年面试】Happens-Before规则是什么_哔哩哔哩_bilibili

八、MySQL(DML)如何修改表中的数据?

1、修改表数据 &#xff08;1&#xff09;基础语法&#xff1a; update 表名 SET 字段名1数值1,字段名2数值2&#xff0c;…… [where 条件]; &#xff08;2&#xff09; 操作实例&#xff1a; 第一步&#xff1a; 先准备一张表 insert into things values (10086,18,0x12…

本地电脑搭建Plex私人影音云盘教程,内网穿透实现远程访问

文章目录 1.前言2. Plex网站搭建2.1 Plex下载和安装2.2 Plex网页测试2.3 cpolar的安装和注册 3. 本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语6 总结 1.前言 用手机或者平板电脑看视频&#xff0c;已经算是生活中稀松平常的场景了&#xff0c;特…

三组学联合→HiC+Meta+Virome

病毒在微生物死亡率、多样性和生物地球化学循环中发挥着重要作用。地下水是全球最大的淡水&#xff0c;也是地球上最贫营养的水生系统之一&#xff0c;但在这个特殊的栖息地中微生物和病毒群落是如何形成的尚未被探索。本次经典文献分享给大家带来&#xff0c;宏基因组宏病毒组…