203、RabbitMQ 之 使用 direct 类型的 Exchange 实现 消息路由 (RoutingKey)

目录

  • ★ 使用direct实现消息路由
  • 代码演示这个情况二
    • ConstantUtil 常量工具类
    • ConnectionUtil 连接RabbitMQ的工具类
    • Publisher 消息生产者
      • 测试消息生产者
    • Consumer01 消息消费者01
      • 测试消费者结果:
    • Consumer02 消息消费者02
      • 测试消费者结果:
  • 完整代码:
    • ConstantUtil
    • ConnectionUtil
    • Publisher
    • Consumer01
    • Consumer02
    • pom.xml

★ 使用direct实现消息路由

direct类型的Exchange: 生产者发送给Exchange 消息的路由key 要和 Exchange 绑定 Queue 消息队列的路由key一致,这个生产者发送的消息才会被Exchange 分发到那个消息队列里面去。

在这里插入图片描述

direct类型的Exchange会根据消息的路由key将消息分发给指定的Queue。

▲ 一个队列能与一个Exchange绑定多个路由key,
Q1队列与Exchange就绑定的路由key:orange
比如Q2队列与Exchange就绑定了两个路由key:black和green。

若消息生产者发送路由key为orange的消息到Exchange时,该消息将会被分发到Q1队列;

若发送路由key为black或green的消息到Exchange时,该消息都将被分发到Q2队列。

▲ 情况一

RabbitMQ也允许多个队列绑定相同的路由key,此时又变成了Pub-Sub模型。

比如C1、C2两个队列都绑定了black作为路由key,这意味着若消息生产者发送路由key为black的消息时,该消息将会被分发Q1和Q2两个队列,Q1、Q2两个队列将会收到各自不同副本——这就是Pub-Sub模型。
在这里插入图片描述

▲ 情况二

C1队列绑定了error作为路由key,而Q2则绑定了info、error和warning作为路由key,

这意味着若消息生产者发送路由key为error的消息时,该消息将会被分发Q1和Q2两个队列,Q1、Q2两个队列将会同时收到各自的副本;

若消息生产者发送路由key为info或warning的消息时,该消息只会被分发给Q2队列,Q1队列不会收到任何消息。

在这里插入图片描述

代码演示这个情况二

需求:就是演示这张图
在这里插入图片描述
过程:
1、创建一个消息生产者,2个消息消费者,
2、生产者声明一个Exchange,2个消息队列,
然后如图,Exchange绑定Q1这个消息队列用1个路由key,Exchange绑定Q2这个消息队列用3个路由key,
3、生产者发送30条消息给Exchange,每个路由key发送10条。
结果应该是:Q1消息队列得到10条消息,Q2消息队列得到30条消息。
4、创建2个消费者,分别消费Q1和Q2.

ConstantUtil 常量工具类

在这里插入图片描述

ConnectionUtil 连接RabbitMQ的工具类

在这里插入图片描述

Publisher 消息生产者

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

测试消息生产者

如图:结果正确。
消息队列queue_01,因为只绑定一个路由key–error,所以exchange分发10条路由key为error的消息给它。
消息队列queue_01,因为只绑定3个路由key–error、info、warning,所以exchange分发30条消息给它。
在这里插入图片描述

在这里插入图片描述

Consumer01 消息消费者01

消息消费者就没啥好说的,
Consumer01 消费 queue_01 消息队列,Consumer02 消费 queue_02 消息队列。
代码差不多。

在这里插入图片描述
在这里插入图片描述

测试消费者结果:

正常成功,符合期望
在这里插入图片描述

Consumer02 消息消费者02

在这里插入图片描述
在这里插入图片描述

测试消费者结果:

正常成功,符合期望
在这里插入图片描述




完整代码:

ConstantUtil

package cn.ljh.rabbitmq.util;//常量
public class ConstantUtil
{// 消息队列的名称public final static String QUEUE01 = "queue_01";public final static String QUEUE02 = "queue_02";// Exchange的名称public static final String EXCHANGE_NAME = "myex02.direct";// 三个路由key定义成一个数组的名称public static final String[] ROUTING_KEYS = {"info", "error", "warning"};}

ConnectionUtil

package cn.ljh.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.concurrent.TimeoutException;//连接工具
public class ConnectionUtil
{//获取连接的方法public static Connection getConnection() throws IOException, TimeoutException{//创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来ConnectionFactory connectionFactory =  new ConnectionFactory();//设置连接信息connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("ljh");connectionFactory.setPassword("123456");connectionFactory.setVirtualHost("/"); //连接虚拟主机//从连接工厂获取连接Connection connection = connectionFactory.newConnection();//返回连接return connection;}
}

Publisher

package cn.ljh.rabbitmq.producer;import cn.ljh.rabbitmq.consumer.Consumer01;
import cn.ljh.rabbitmq.consumer.Consumer02;
import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;//消息生产者--使用 direct 类型的exchange------就是广播模式
public class Publisher
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel。Channel channel = conn.createChannel();//3、调用exchangeDeclare()方法声明Exchange,--------调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定channel.exchangeDeclare(ConstantUtil.EXCHANGE_NAME,/* Exchange名字 */BuiltinExchangeType.DIRECT,/* Exchange 类型 */true,/* 是否持久化 */false,/* 是否自动栅除 */false,/* 是否为内部的 Exchange */null /* 指定 Exchange 的额外属性 */);//调用queueDeclare()方法声明队列----声明多个消息队列------声明第1个消息队列---------路由key是 errorchannel.queueDeclare(ConstantUtil.QUEUE01, true, false, false, null);//把 Exchange 和 Queue 绑定起来,绑定第一个消息队列channel.queueBind(ConstantUtil.QUEUE01, ConstantUtil.EXCHANGE_NAME,ConstantUtil.ROUTING_KEYS[1] /* exchange类型是direct,这里指定路由key */,null /* 指定 Exchange 的额外属性 */);//声明第2个消息队列--------这个exchange绑定这个queue,使用了三个路由key---info,error,warningchannel.queueDeclare(ConstantUtil.QUEUE02, true, false, false, null);//用循环为第2个消息队列绑定三个路由keyfor (int i = 0; i < ConstantUtil.ROUTING_KEYS.length; i++){//把 Exchange 和 Queue 绑定起来,绑定第2个消息队列channel.queueBind(ConstantUtil.QUEUE02,ConstantUtil.EXCHANGE_NAME,ConstantUtil.ROUTING_KEYS[i]  /* 循环绑定路由key */,null  /* 指定 Exchange 的额外属性 */);}//生产者发送30条消息for (int i = 1; i <= 30; i++){//用嵌套的三元运算符,动态的让消息的发送使用到不同的路由keyString routingKey = i <= 10 ? ConstantUtil.ROUTING_KEYS[0] :(i <= 20 ? ConstantUtil.ROUTING_KEYS[1] : ConstantUtil.ROUTING_KEYS[2]);//要发送的消息String message = "生产者发送的第【 " + i + " 】条消息的内容";//4、调用Channel 的 basicPublish() 方法发送消息channel.basicPublish(ConstantUtil.EXCHANGE_NAME /* 指定向这个Exchange发送消息 */,routingKey /* 动态指定路由key */,null /*指定额外的消息的属性*/,message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/);System.out.println("生产者发送【 " + i + " 】条消息完成,发送到Exchange的路由key是:"+routingKey);}//5、关闭资源//关闭通道channel.close();//关闭连接conn.close();}
}

Consumer01

package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;
// 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
//(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
//(2)通过Connection获取Channel。
//(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
//    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
//(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。//消息消费者1
public class Consumer01
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,//   如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(ConstantUtil.QUEUE01, /* 声明的队列名 */true,    /* 消息队列是否持久化 */false,  /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null   /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(ConstantUtil.QUEUE01 /*消费这个消费队列里面的消息*/,true /*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数   %s:输出字符串  %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}
}

Consumer02

package cn.ljh.rabbitmq.consumer;import cn.ljh.rabbitmq.util.ConnectionUtil;
import cn.ljh.rabbitmq.util.ConstantUtil;
import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;//消息消费者2
public class Consumer02
{public static void main(String[] args) throws IOException, TimeoutException{//1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接Connection conn = ConnectionUtil.getConnection();//2、通过Connection获取Channel 消息通道Channel channel = conn.createChannel();//3、调用 Channel 的 queueDeclare() 方法声明队列,//   如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列channel.queueDeclare(ConstantUtil.QUEUE02, /* 声明的队列名 */true,    /* 消息队列是否持久化 */false,  /* 是否只允许该消息消费者消费该队列的消息,独占 */false, /* 是否自动删除 */null   /* 指定消息队列额外的属性 */);//4、调用Channel 的 basicConsume()方法开始处理消费消息channel.basicConsume(ConstantUtil.QUEUE02 /*消费这个名字的消费队列里面的消息*/,true/*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,new DefaultConsumer(channel){//处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:@Overridepublic void handleDelivery(String consumerTag,Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,AMQP.BasicProperties properties /*消息的那些属性*/,byte[] body /*body:消息的消息体*/) throws IOException{//把消息体中的消息拿出来String message = new String(body, "UTF-8");//printf:格式化输出函数   %s:输出字符串  %n:换行System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",envelope.getExchange(),envelope.getRoutingKey(),message);}});}}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.ljh</groupId><artifactId>direct</artifactId><version>1.0.0</version><name>direct</name><!--  属性  --><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><java.version>11</java.version></properties><!--  依赖  --><dependencies><!-- RabbitMQ 的依赖库 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies></project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/107310.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序框架---详细教程

&#x1f3ac; 艳艳耶✌️&#xff1a;个人主页 &#x1f525; 个人专栏 &#xff1a;《Spring与Mybatis集成整合》《Vue.js使用》 ⛺️ 越努力 &#xff0c;越幸运。 目录 1.框架 1.1响应的数据绑定 1.2.页面管理 1.3.基础组件 1.4.丰富的 API 2.视图层 View 2.1.介绍 …

Jmeter脚本参数化和正则匹配

我们在做接口测试过程中&#xff0c;往往会遇到以下几种情况 每次发送请求&#xff0c;都需要更改参数值为未使用的参数值&#xff0c;比如手机号注册、动态时间等 上一个接口的请求体参数用于下一个接口的请求体参数 上一个接口的响应体参数用于下一个接口的请求体参数&#…

【09】基础知识:React组件的生命周期

组件从创建到死亡它会经历一些特定的阶段。 React 组件中包含一系列勾子函数&#xff08;生命周期回调函数 <> 生命周期钩子函数 <> 生命周期函数 <> 生命周期钩子&#xff09;&#xff0c;会在特定的时刻调用。 我们在定义组件时&#xff0c;会在特定的生…

一文理解登录鉴权(Cookie、Session、Jwt、CAS、SSO)

1 前言 登录鉴权是任何一个网站都无法绕开的部分&#xff0c;当系统要正式上线前都会要求接入统一登陆系统&#xff0c;一方面能够让网站只允许合法的用户访问&#xff0c;另一方面&#xff0c;当用户在网站上进行操作时也需要识别操作的用户&#xff0c;用作后期的操作审计。…

灾害与环境遥感团队本科生在IEEE TGRS 发表高水平论文

2023年9月27日&#xff0c;地球科学和遥感领域顶级期刊《IEEE Transactions on Geoscience and Remote Sensing》&#xff08;IEEE TGRS&#xff09;在线预刊发了灾害与环境遥感团队的最新研究成果“A novel spectral index for rapid dust-proof net mapping based on Sentine…

想做WMS仓库管理系统,找了好久才找到云表

公司内部仓库管理原方式均基于人工电子表格管理方式来实现收发存管理&#xff0c;没有流程化管理&#xff0c;无法保证数据的准确性和及时性&#xff0c;同时现场操作和数据核对会出现不同步的情况&#xff0c;无法提高仓库的运作效率&#xff0c;因此&#xff0c;我们基于云表…

【设计模式】单例模式、“多例模式”的实现以及对单例的一些思考

文章目录 1.概述2.单例模式实现代码2.1.饿汉式单例2.2.懒汉式单例2.3.双检锁单例2.4.静态内部类单例2.5.枚举单例 3.对单例的一些思考3.1.是否需要严格的禁止单例被破坏&#xff1f;3.2.懒汉式真的比饿汉式更佳吗&#xff1f;3.3.单例存在的问题 4.其他作用范围的单例模式4.1.线…

关闭VS Code中的鼠标悬停时的提示框(MDN Reference)

在使用VS Code编辑器写html文件时&#xff0c;鼠标悬停在写的某些内容时会弹出一个提示框&#xff0c;如下图&#xff1a; 这个提示是比较烦人的&#xff0c;接下来分享关闭它的教程&#xff1a; 这里是以Win10版的Visual Studio Code为例 1.打开VS Code 的设置界面 2.在扩展…

《开箱元宇宙》:《福布斯》如何通过 Web3 改进讲故事的方式

你们是否想知道 The Sandbox 如何融入世界上最具标志性的品牌和名人的战略&#xff1f;在本期《开箱元宇宙》系列中&#xff0c;我们与《福布斯》一起探讨了他们为何决定在 The Sandbox 中尝试 Web3&#xff0c;以及他们如何改变讲故事的方式&#xff0c;以便在一次体验中吸引超…

字节跳动2023测试开发岗 3+1 面经+经验分享(收到offer,入职月薪27K)

现在&#xff0c;招聘黄金时间已经来临&#xff0c;在网上看了很多大佬的面经&#xff0c;也加了很多交流群&#xff0c;受到了很多朋友的提点&#xff0c;今天终于轮到我来分享面经啦&#xff0c;之前面试了几家公司&#xff0c;最后在十月初拿到了字节跳动测试岗的 offer&…

口袋参谋:如何写出高权重标题?用对这招很重要!

​如何写出高权重标题&#xff1f;这是99.99%的卖家都存在的疑虑&#xff01; 以前写高权重标题&#xff0c;很多卖家往往会复制同行竞品爆款标题到淘宝首页搜索框&#xff0c;然后在全标题后面加上几个字母&#xff0c;就可以拆分爆款标题。 这个问题我之前也说过&#xff0…

【GO入门】环境配置及Vscode配置

1 GO环境配置 欢迎来到Go的世界&#xff0c;让我们开始探索吧&#xff01; Go是一种新的语言&#xff0c;一种并发的、带垃圾回收的、快速编译的语言。它具有以下特点&#xff1a; 它可以在一台计算机上用几秒钟的时间编译一个大型的Go程序。Go为软件构造提供了一种模型&…

【C++】:string用法详解

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux的基础知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数…

使用PyTorch解决多分类问题:构建、训练和评估深度学习模型

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…

Vue配置全局变量config.js

Vue配置全局变量config.js 若config.js在public目录下 在index.html中引入 这样配置是为了防止路由前缀&#xff0c;如果直接“/config.js”&#xff0c;若路由没有前缀还好&#xff0c;要是有就需要配置为“<% BASE_URL %>config.js”

Xilinx IP 10G Ethernet PCS/PMA IP Core

Vivado 10G Ethernet PCS/PMA介绍 1介绍 完整的10G以太网接口如下图,分为10G PHY和10G MAC两部分。 这篇文章重点讲 10G Ethernet PCS/PMA。 2 IP的基本介绍 10G以太网物理编码子层/物理介质连接(PCS/PMA)核心在Xilinx 10G以太网介质访问控制器(MAC)核心和具有10Gb/s…

Linux网络编程系列之UDP组播

Linux网络编程系列 &#xff08;够吃&#xff0c;管饱&#xff09; 1、Linux网络编程系列之网络编程基础 2、Linux网络编程系列之TCP协议编程 3、Linux网络编程系列之UDP协议编程 4、Linux网络编程系列之UDP广播 5、Linux网络编程系列之UDP组播 6、Linux网络编程系列之服务器编…

尚硅谷Flink(一)

目录 ☄️前置工作 fenfa脚本 &#x1f30b;概述 ☄️Flink是什么 ☄️特点&#xff08;多nb&#xff09; ☄️应用场景&#xff08;不用看&#xff09; ☄️分层API &#x1f30b;配环境 ☄️wordcount ☄️WcDemoUnboundStreaming &#x1f30b;集群部署 ☄️集…

xml的语法

<!-- 1、每一个xml,有且只有一个根标签&#xff0c;所有xml标签必须写在根标签中 2、标签必须要有合闭 3、xml格式是否正确&#xff0c;可以通过浏览器打开xml。来校验xml格式是否正确 4、xml是区别大小写的 5、xml书写标签名时&#xff0c;不要出现空格等特…

微信小程序的框架

目录 一、视图层 1. WXML 数据绑定 列表渲染 条件渲染 模板 2. WXSS 尺寸单位 样式导入 内联样式 选择器 3. WXS事件 二、逻辑层 1. 页面生命周期 2.跳转 1. 一级跳一级 2. 一级跳二级 3. 二级跳二级 4. 二级跳一级 总结 带给我们的收获 一、视图层 1. …