虚拟主机存在的意义
核心 API 如下:
虚拟主机参数的设计
为了方便后期的可扩展,我们给每个虚拟主机设置一个主机名,所以有了第一个参数 : virturalHostName
由于虚拟主机要操作内存中的对象,所以需要提供 以下两个类的实例化对象: MemoryDataCenter DiskDataCenter
我们先前提到过对于虚拟机,我们设置了三种交换机,每种交换机对应着不同的转发规则: Router 实例化对象,这也是个核心类,等会进行一个补充。
此外,我们还需要记录以下消费者的信息,毕竟虚拟主机设计出来是要被调用的。
至于其他的聊到再加。
虚拟主机方法的设计
创建交换机(exchangeDelcare)
一个虚拟主机需要管理多个交换机,在不同的虚拟主机中可以存在同名的交换机,我们在这里给他们进行一个逻辑上的隔离设计。这个设计有多种方案,比如:
- ⽅案⼀:参考数据库设计,“⼀对多”⽅案,⽐如给交换机表,添加个属性,虚拟主机 id/name
- ⽅案⼆:交换机的名字 = 虚拟主机名字 + 交换机的真实名字
这里采用的是方案二,采用二有啥好处呢?
我们利用同样的方式给队列进行隔离,这样就进一步对绑定进行了隔离;再进一步说就是,消息和队列是强相关的,队列名区分开了,消息自然也就区分开了。
创建交换机大致流程:
- 把交换机名字加上虚拟主机名字作为前缀(只要是操作交换、队列、绑定都需要这一步操作,我就不在每一个流程中都说明一次了)
- 判断交换机是否存在,直接通过内存查询
- 真正去构造交换机对象(把参数都赋值上去)
- 当参数为 durable的时候,将交换机对象写⼊硬盘
- 将交换机写⼊内存
此外,除了第一步赋值名字不需要加锁外,其余的都需要加一个锁(如果不加锁,可以自己思考以下后果),交换机不加锁不会影响。因为这里的交换机锁会用到好几次,不如就把他设为一个属性:
// 操作交换机的锁对象
private final Object exchangeLocker = new Object();
上述逻辑, 先写硬盘, 后写内存. 目的就是因为硬盘更容易写失败. 如果硬盘写失败了, 内存就不写了;要是先写内存, 内存写成功了, 硬盘写失败了, 还需要把内存的数据给再删掉. 就比较麻烦了
这就不演示代码了,这一篇的代码非常多,在文章末尾,我会存放相关代码,根据我给出的方法名,可以查看到相关代码。
删除交换机(exchangeDelete)
删除交换机,顾名思义就是将虚拟主机上的交换机给删除掉;大致流程如下:
- 根据交换机的名字找到对应的交换机
- 删除硬盘数据(查看是否持久化,持久化了就删,没有就没必要)
- 删除内存中数据
三个步骤都需要给他们加锁,用的也是 exchangLocker 这个锁。
创建队列(queueDelcare)
- 判断队列是否存在
- 不存在则创建队列,设定参数
- 队列参数 durable 为 true的时候存⼊硬盘
- 将队列写⼊到内存
同样的,这里是操作队列,也需要给它进行一个加锁操作,这个队列又不同于交换机,所以给它设置另外一个锁,同样的,不只是这一步需要这个锁,于是我将其设为一个属性:
// 操作队列的锁对象
private final Object queueLocker = new Object();
删除队列(queueDelete)
- 判断队列是否存在
- 存在则删除,先在硬盘删除
- 在内存中删除
同样是需要加队列锁。
创建绑定(queueBind)
- 判断当前绑定在不在
- 验证当前的 routingKey 合不合法
- 如果合法,就创建绑定,设置参数
- 从内存中获取下绑定关系的队列和交换机是否存在
- 都存在,再次判定队列和交换机的durable是否都为 true
- 都为 true 则存⼊硬盘
- 再写⼊内存
关于绑定呢,我们这里既操作了队列,有需要交换机,所以需要使用到上述两个锁:
删除绑定(queueUnbind)
关于删除绑定,这有个依赖关系问题,就是 线程 A 先删除了队列,而此时,线程B 再去删除绑定消息时就会失败,关于此问题 我们有两种方法:
- ⽅案⼀:参考类似于 MySQL 的外键⼀样,删除交换机/队列的时候,判定⼀下当前队列/交换机是 否存在对应的绑定,如果存在,则禁⽌删除,要求先解除绑定,再尝试删除
- ⽅案⼆:直接删除,不判断 交换机和队列是否存在
我们采用第二种方式,直接删(暴力删除),无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用
大致流程就是:
- 获取绑定是否存在
- 删除硬盘上的数据,需要判断该绑定 durable 是否为 true
- 从内存中删除绑定
同样的,这里也需要加两个锁。
发送消息(basicPublish)
发送消息就是指:将消息发送到指定的 交换机/队列 中。
转发消息的大致逻辑如下图:
发送消息的时候,就会往 ConsumerManager 类中的阻塞队列种
BlockingQueue<String> tokenQueue
存在该队列,表示该队列存在消息。
关于 Consumer 相关的现在先不讲解,后面放在消费者管理模块细讲。
大致思路就是:
- 获取交换机名字
- 确认 routingKey 是否合法
- 查找交换机对象
- 判定交换机类型
- 根据不同的交换机作出不同的消息转发
如果是直接交换机,那么 routingKey 就是队列的名字,直接把消息写入指定的队列中;随后构造消息对象,查找该队列名对应的对象;(判断队列是否存在)最后给队列写入消息。
至于其他交换机:
- 需要找到该交换机关联的所有绑定,并且遍历这些绑定对象
- 获取绑定对象,判定对应的队列是否存在(不存在不需要抛异常,因为可能有多个队列,不能因为一个队列的失败,影响到其他队列的消息的传输)
- 构造对象,再判断交换机类型
如果是扇出交换机,那么就是直接给所有的绑定的队列都要转发消息。
如果是主题交换机,需要 匹配 bindingKey 和 routingKey 。
Topic 交换机转发规则
bingdingKey(创建绑定的时候,给绑定指定的特殊字符串) 我们约定:
- 只存在 数字、字母、下划线
- 使用 “ . ” 把整个 routingKey 分为若干个部分 形如: aaa.vvv.eewe
- 存在两种特殊符号,做为通配符
- 一个是 * 形如 : aaa.*.bbb(只能作为被 . 分割单独的存在)
- 一个是 # 形如 : aaa.#.bbb
上述规则,是根据 AMQP 协议规定的
验证 bindingKey 是否合法(checkBindingKey)
大致逻辑:
这一段代码其实也挺简单的,看源代码就好。
验证 routingKey 是否合法(checkRoutingKey)
大致逻辑:
匹配规则
约定,啥叫匹配成功:
大致逻辑图:
订阅消息(basicComsume)
什么是函数式接口
由于 Java的函数不能脱离类的存在,为了实现 lambda 表达式, Java 引入了函数式接口
如何实现?
- interface
- 只能有⼀个⽅法
- 还需要加 @FunctionalInterface 注解
一个虚拟主机种,有很多队列,每个队列上都有很多消息。那么针对是哪个消费者订阅了哪条队列的消息需要进行一个管理。
订阅消息
添加一个队列的订阅者,当队列收到消息之后,就需要把消息对送给他的订阅者;
根据传入的 autoAck 的值,判断是否要手动删除消息。
这一步涉及到了消费者等等,这一章就先到这。