目录
需求分析
直接交换机(Direct )
主题交换机(Topic )
扇出交换机(Fanout )
Topic 交换机转发规则
routingKey 组成
bindingKey 组成
匹配规则
情况一
情况二
情况三
实现 Router 类
校验 bindingKey 和 routingKey
消息匹配机制
Topic 交换机匹配规则
针对 Router 单元测试
需求分析
直接交换机(Direct )
- 通过设定 消息的 routingKey = 队列名,以此指定该消息需传给哪个队列
实例理解
- 如下图所示,此时可直接无视绑定关系,直接从内存中拿到对应队列名的队列
- 然后再将消息传给该队列即可
主题交换机(Topic )
- 依据 Topic 交换机的转发规则,判定消息需传给哪些队列
扇出交换机(Fanout )
- 给该交换机中所有绑定好的队列均传入消息
Topic 交换机转发规则
- bindingKey:创建绑定时,给绑定指定的特殊字符串(相当于出题)
- routingKey:发布消息时,给消息上指定的特殊字符串(相当于做答案)
- 当 routingKey 的答案能够与 bindingKey 相对应时,便可将消息转发给该队列
routingKey 组成
- 数字、字母、下划线
- 使用 . 将整个 routingKey 分成多个部分
实例理解
- aaa.bbb.ccc 合法
- aaa.110.bbb 合法
- aaa 合法
bindingKey 组成
- 数字、字母、下划线
- 使用 . 将整个 bindingKey 分成多个部分
- 支持两种特殊的符号作为通配符( * 和 # 必须是作为被 . 分割出来的独立的部分)
- * ——> 可以匹配任何一个 独立的部分
- # ——> 可以匹配任何 0 个或者多个独立的部分
实例理解
- aaa.*.bbb 合法
- aaa.*bb.ccc 非法
匹配规则
情况一
- 当 bindingKey 中没有 * 或 # 这两个特殊符号时,必须要求 routingKey 和 bindingKey 一模一样,才能算匹配成功!
实例理解
- bindingKey = aaa.bbb.ccc
- routingKey = aaa.bbb.ccc (匹配成功)
- routingKey = aaa.bbb.ccd (匹配失败)
注意:
- 情况一非常类似于 Direct 交换机的转发
- 尤其是将 bindingKey 设置成和队列名字相同,此时就完全等价于 Direct 交换机了!
情况二
- 当 bindingKey 中有特殊符号 * 时
实例理解
- bindingKey = aaa.*.ccc
- routingKey = aaa.bbb.ccc (匹配成功)
- routingKey = aaa.b.ccc (匹配成功)
- routingKey = aaa.b.b.ccc(匹配失败)
情况三
- 当 bindingKey 中有特殊符号 # 时
实例理解
- bindingKey = aaa.#.ccc
- routingKey = aaa.bbb.ccc (匹配成功)
- routingKey = aaa.b.b.ccc (匹配成功)
- routingKey = aaa.ccc(匹配成功)
- routingKey = aaa.b.b.b(匹配失败)
问题:
- 将交换机中每个队列的 bindingKey 设置成一个 # 时,会有啥效果呢?
回答:
- 此时,该交换机中的 全部队列 都能匹配所有的 routingKey
- 即该交换机就相当于 Fanout 交换机了!
注意点一:
- Direct 交换机 和 Fanout 交换机,均属于 Topic 交换机的 特例
注意点二:
- 上述规则都是 AMQP 协议所约定的!RabbitMQ 仅仅只是实现了该规则而已!
实现 Router 类
校验 bindingKey 和 routingKey
// bindingKey 的构造规则: // 1、数字,字母,下划线 // 2、使用 . 分割成若干部分 // 3、允许使用 * 和 # 作为通配符,但是通配符只能作为独立的分段public boolean checkBindingKey(String bindingKey) {if(bindingKey.length() == 0) { // 空字符串也是合法情况,比如在使用 direct / fanout 交换机的时候,bindingKey 是用不上的 // 因为我们在使用 direct 交换机时,是直接将 routingKey 作为 消息队列的名字,直接根据名字来进行匹配 // 在使用 fanout 交换机时,无需匹配,直接将该消息转给交换机中绑定的所有消息队列return true;} // 检查字符串中不能存在非法字符for (int i = 0;i < bindingKey.length();i++) {char ch = bindingKey.charAt(i);if(ch >= 'A' && ch <= 'Z') {continue;}if(ch >= 'a' && ch <= 'z') {continue;}if(ch >= '0' && ch <= '9') {continue;}if(ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;} // 检查 * 或者 # 是否是独立的部分 // aaa.*.bbb 合法情况; aaa.a*.bbb 非法情况String[] words = bindingKey.split("\\.");for (String word : words) { // 检查 word 长度 > 1 并且包含了 * 或者 #,就是非法的格式了if(word.length() > 1 && (word.contains("*") || word.contains("#")) ) {return false;}} // 约定一下,通配符之间的相邻关系 (人为约定的) // 为啥这么约定?因为前三种相邻的时候,实现匹配的逻辑会非常繁琐,同时功能性提升不大 // 1、 aaa.#.#.bbb 非法情况 // 2、 aaa.#.*.bbb 非法情况 // 3、 aaa.*.#.bbb 非法情况 // 4、 aaa.*.*.bbb 合法情况for (int i = 0;i < words.length;i++) { // 连续两个 ##if(words[i].equals("#") && words[i+1].equals("#")) {return false;} // # *if(words[i].equals("#") && words[i+1].equals("*")) {return false;} // * #if(words[i].equals("*") && words[i+1].equals("#")) {return false;}}return true;}// routingKey 的构造规则 // 1、数字,字母,下划线 // 2、使用 . 分割成若干部分public boolean checkRoutingKey(String routingKey) {if(routingKey.length() == 0) { // 空字符串,合法的情况,比如在使用 fanout 交换机的时候,routingKey 用不上,就可以设定为 ""return true;}for (int i = 0;i<routingKey.length();i++) {char ch = routingKey.charAt(i); // 判定该字符是否是大写字母if(ch >= 'A' && ch <= 'Z') {continue;} // 判定该字符是否是小写字母if(ch >= 'a' && ch <= 'z') {continue;} // 判定该字符是否是阿拉伯数字if(ch >= '0' && ch <= '9') {continue;} // 判定是否是 _ 或者 .if(ch == '_' || ch == '.') {continue;} // 该字符,不是上述任何一种合法情况,就直接返回 falsereturn false;} // 把每个字符都检查过,没有遇到非法情况,此时直接返回 truereturn true;}
问题:
- 观察下图,为啥 split 方法中的参数 "." 需要加两个反斜杠呢?
回答:
- 首先 "." 在正则表达式中,是一个特殊的符号,此处是将 . 作为原始文本来进行匹配
- 要想使用 . 的原始文本,就需要进行转义,即 在正则中使用 "\." 的方式来表示
- 又因为在 Java 的字符串中,"\" 是一个特殊字符
- 所以要想写入 "\." 这样的文本,又得在其前面加上一个反斜杠来进行转义,即 "\\."
消息匹配机制
// 这个方法用来判定该消息是否可以转发给这个绑定对应的队列public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException { // 根据不同的 exhcangeType 使用不同的判定转发规则if(exchangeType == ExchangeType.FANOUT) { // 如果是 fanout 类型,该交换机上绑定的所有队列都需要转发return true;}else if(exchangeType == ExchangeType.TOPIC) { // 如果是 topic 主题交换机,规则就要更复杂一些return routeTopic(binding,message);}else { // 其他情况是不应该存在的throw new MqException("[Router] 交换机类型非法! exchangeType = " + exchangeType);}}
- 当为 Fanout 交换机时,无需匹配 bindingKey 和 routingKey,直接返回 true
- 让该消息转发给所以绑定了 Fanout 交换机的队列
- 当为 Topic 交换机时,则需要进行 bindingKey 和 routingKey 的匹配
Topic 交换机匹配规则
处理思路
- 此处我们采用 双指针算法 进行匹配
- 针对 bindingKey 的下标,判定当前下标指向部分的具体情况:
- 指向的是普通的字符串,此时要求和 routingKey 对应的下标指向的内容得完全一致!
- 指向的是 * ,此时无论 routingKey 这边指向的是啥,双方均同时下标前进
- 遇到了 # ,且 # 后面没有其他的内容了,直接返回 true,匹配成功!
- 遇到了 # ,但 # 后面还有其他的内容,拿着 # 后面的部分,去 routingKey 中查找,找到后面的部分,在 routingKey 中出现的位置,如果后面的部分,在 routingKey 中不存在,直接认为匹配失败,返回 false!如果后面的部分,在 routingKey 中存在,就将 routingKey 的箭头指向这个位置之后,然后继续往后匹配
- 两个箭头移动过程中,如果同时到达双方的末尾,则返回 true,如果一个箭头先到了末尾,另一个箭头还没到,则返回 false!
代码实现:
private boolean routeTopic(Binding binding,Message message) { // 先把这两个 key 进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 引入两个下标,指向上述两个数字,初始情况下都为 0int bindingIndex = 0;int routingIndex = 0; // 此处使用 while 更合适,每次循环,下标不一定就是 +1,不适合使用 forwhile (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {if(bindingTokens[bindingIndex].equals("*")) { // 【情况二】如果遇到 * 号直接进入下一轮,* 可以匹配到任意一个部分!bindingIndex++;routingIndex++;continue;}else if(bindingTokens[bindingIndex].equals("#")) { // 如果遇到 # 先要看看有没有下一个位置bindingIndex++;if(bindingIndex == bindingTokens.length) { // 【情况三】该 # 后面没有东西了,说明此时一定能匹配成功了!return true;} // # 【情况四】后面还有东西,拿着这个内容,去 routingKey 中往后找,找到对应的位置 // findNextMatch 这个方法用来查找该部分在 routingKey 的位置,返回该下标,没找到,就返回 -1routingIndex = findNextMatch(routingTokens,routingIndex,bindingTokens[bindingIndex]);if(routingIndex == -1) { // 没找到匹配的结果,匹配失败return false;} // 找到匹配的情况,继续往后匹配bindingIndex++;routingIndex++;}else { // 【情况一】如果遇到普通字符串,要求两边的内容是一样的if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex++;routingIndex++;}} // 【情况五】判断是否是双方同时到达末尾 // 比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的if(bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if(routingTokens[i].equals(bindingToken)) {return i;}}return -1;}
针对 Router 单元测试
- 编写测试用例代码是十分重要的!
package com.example.demo;import com.example.demo.common.MqException; import com.example.demo.mqserver.core.Binding; import com.example.demo.mqserver.core.ExchangeType; import com.example.demo.mqserver.core.Message; import com.example.demo.mqserver.core.Router; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest public class RouterTests {private Router router = new Router();private Binding binding = null;private Message message = null;@BeforeEachpublic void setUp() {binding = new Binding();message = new Message();}@AfterEachprivate void tearDown() {binding = null;message = null;}// 【测试用例】 // bindingKey routingKey result // aaa aaa true // aaa.bbb aaa.bbb true // aaa.bbb aaa.bbb.ccc false // aaa.bbb aaa.ccc false // aaa.bbb.ccc aaa.bbb.ccc true // aaa.* aaa.bbb true // aaa.*.bbb aaa.bbb.ccc false // *.aaa.bbb aaa.bbb false // # aaa.bbb.ccc true // aaa.# aaa.bbb true // aaa.# aaa.bbb.ccc true // aaa.#.ccc aaa.ccc true // aaa.#.ccc aaa.bbb.ccc true // aaa.#.ccc aaa.aaa.bbb.ccc true // #.ccc ccc true // #.ccc aaa.bbb.ccc true@Testpublic void test1() throws MqException {binding.setBindingKey("aaa");message.setRoutingKey("aaa");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test2() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test3() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test4() throws MqException {binding.setBindingKey("aaa.bbb");message.setRoutingKey("aaa.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test5() throws MqException {binding.setBindingKey("aaa.bbb.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test6() throws MqException {binding.setBindingKey("aaa.*");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test7() throws MqException {binding.setBindingKey("aaa.*.bbb");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test8() throws MqException {binding.setBindingKey("*.aaa.bbb");message.setRoutingKey("aaa.bbb");Assertions.assertFalse(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test9() throws MqException {binding.setBindingKey("#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test10() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test11() throws MqException {binding.setBindingKey("aaa.#");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test12() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test13() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test14() throws MqException {binding.setBindingKey("aaa.#.ccc");message.setRoutingKey("aaa.aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test15() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));}@Testpublic void test16() throws MqException {binding.setBindingKey("#.ccc");message.setRoutingKey("aaa.bbb.ccc");Assertions.assertTrue(router.route(ExchangeType.TOPIC,binding,message));} }