从0实现一个消息队列中间件
- 什么是消息队列
- 需求分析
- 核心概念
- 核心API
- 交换机类型
- 持久化
- 网络通信
- 网络通信API
- 消息应答
- 模块划分
- 项目创建
- 创建核心类
- 创建Exchange
- 创建MSGQueue
- 创建Binding
- 创建Message
- 数据库设计
- 配置sqlite
- 实现创建表和数据库基本操作
- 实现DataBaseManager
- 创建DataBaseManager类
- 初始化数据库
- 实现checkDBExists
- 实现createTable
- 实现createDefaultData
- 封装其他数据库操作
- 消息存储设计
- 文件格式
- queue_data.txt 文件格式
- queue_stat.txt文件格式
- 创建MessageFileManager类
- 实现统计文件读写
- 实现创建队列目录
- 实现删除队列目录
- 检查队列文件是否存在
- 实现消息对象序列化/反序列化
- 实现写入消息文件
- 实现删除消息
- 实现消息加载
- 实现垃圾回收
- 整合数据库和文件
- 创建DiskDataCenter
- 内存数据结构设计
- 创建MemoryDataCenter
- 虚拟主机设计
- 创建VirtualHost
- 订阅消息
- 添加一个订阅者
- 创建订阅者管理类
- 消息确认
- 网络通信协议设计
- 设计应用层协议
- 定义Request/Response
- 定义参数父类
- 定义返回值父类
- 定义其他参数类
- ExchangeDeclareArguments
- ExchangeDeleteArguments
- QueueDeclareArguments
- QueueDeleteArguments
- QueueBindArguments
- QueueUnbindArguments
- BasicPublishArguments
- BasicConsumeArguments
- SubScribeReturns
- 实现BrokerServer
- 启动/停止服务器
- 实现处理连接
- 实现readRequest
- 实现writeResponse
- 实现处理请求
- 实现clearClosedSessio
- 实现客户端
- 创建 ConnectionFactory
- Connection 和Channel的定义
- Connection的定义
- 封装请求响应读写操作
- 创建channel
- Channel的定义
- 创建channel
- 实现generateRid
- 实现waitResult
- 关闭channel
- 创建交换机
- 删除交换机
- 创建队列
- 创建绑定
- 删除绑定
- 发送消息
- 订阅消息
- 确认消息
- 处理响应
- 创建扫描线程
- 实现响应的分发
- 关闭Connection
什么是消息队列
曾经我们学习过阻塞队列(BlockingQueue),我们说,阻塞队列最⼤的⽤途,就是⽤来实现⽣产者消费者模型.
⽣产者消费者模型,存在诸多好处,是后端开发的常⽤编程⽅式.
在实际的后端开发中,尤其是分布式系统⾥,跨主机之间使⽤⽣产者消费者模型,也是⾮常普遍的需求.因此,我们通常会把阻塞队列,封装成⼀个独⽴的服务器程序,并且赋予其更丰富的功能.这样的程序我们就称为消息队列(MessageQueue,MQ)市⾯上成熟的消息队列⾮常多.
RabbitMQ Kafka RocketMQ ActiveMQ…
其中,RabbitMQ是⼀个⾮常知名,功能强⼤,⼴泛使⽤的消息队列.咱们就仿照RabbitMQ,模拟实现⼀个简单的消息队列
需求分析
核心概念
⽣产者(Producer) 消费者(Consumer) 中间⼈(Broker) 发布(Publish) 订阅(Subscribe
其中,Broker是最核⼼的部分.负责消息的存储和转发
在Broker中,⼜存在以下概念.
虚拟机(VirtualHost): 类似于MySQL的"database",是⼀个逻辑上的集合.⼀个BrokerServer上可以存在多个VirtualHost.
交换机(Exchange):⽣产者把消息先发送到Broker的Exchange上.再根据不同的规则,把消息转发 给不同的Queue.
队列(Queue):真正⽤来存储消息的部分.每个消费者决定⾃⼰从哪个Queue上读取消息.
绑定(Binding):Exchange和Queue之间的关联关系.Exchange和Queue可以理解成"多对多"关系.使⽤⼀个关联表就可以把这两个概念联系起来. 消息(Message):传递的内容
这些概念,既需要在内存中存储,也需要在硬盘上存储.
- 内存存储:⽅便使⽤.
- 硬盘存储:重启数据不丢失
核心API
对于Broker来说,要实现以下核⼼API.通过这些API来实现消息队列的基本功能.
- 创建队列(queueDeclare)
- 销毁队列(queueDelete)
- 创建交换机(exchangeDeclare)
- 销毁交换机(exchangeDelete)
- 创建绑定(queueBind)
- 解除绑定(queueUnbind)
- 发布消息(basicPublish)
- 订阅消息(basicConsume)
- 确认消息(basicAck)
另⼀⽅⾯,Producer和Consumer则通过⽹络的⽅式,远程调⽤这些API,实现⽣产者消费者模型.
交换机类型
对于RabbitMQ来说,主要⽀持四种交换机类型.
Direct
Fanout
Topic
Header
其中Header这种⽅式⽐较复杂,⽐较少⻅.常⽤的是前三种交换机类型.咱们此处也主要实现这三种.
- Direct: ⽣产者发送消息时,直接指定被该交换机绑定的队列名.
- Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.
- Topic: 绑定队列到交换机上时,指定⼀个字符串为bindingKey.发送消息指定⼀个字符串为routingKey. 当 routingKey 和bindingKey满⾜⼀定的匹配条件的时候,则把消息投递到指定队列
持久化
Exchange, Queue, Binding, Message 都有持久化需求.当程序重启/主机重启,保证上述内容不丢失
网络通信
⽣产者和消费者都是客⼾端程序,broker则是作为服务器.通过⽹络进⾏通信.在⽹络通信的过程中,客⼾端部分要提供对应的api,来实现对服务器的操作
网络通信API
- 创建Connection
- 关闭Connection
- 创建Channel
- 关闭Channel
- 创建队列(queueDeclare)
- 销毁队列(queueDelete)
- 创建交换机(exchangeDeclare)
- 销毁交换机(exchangeDelete)
- 创建绑定(queueBind)
- 解除绑定(queueUnbind)
- 发布消息(basicPublish)
- 订阅消息(basicConsume)
- 确认消息(basicAck)
可以看到,在broker的基础上,客⼾端还要增加Connection操作和Channel操作.
Connection 对应⼀个TCP连接. Channel 则是Connection中的逻辑通道.⼀个Connection中可以包含多个Channel. Channel 和Channel之间的数据是独⽴的.不会相互⼲扰.
这样的设定主要是为了能够更好的复⽤TCP连接,达到⻓连接的效果,避免频繁的创建关闭TCP连接.
消息应答
被消费的消息,需要进⾏应答
应答模式分成两种:
⾃动应答:消费者只要消费了消息,就算应答完毕了.Broker直接删除这个消息.
⼿动应答:消费者⼿动调⽤应答接⼝,Broker收到应答请求之后,才真正删除这个消息.
⼿动应答的⽬的,是为了保证消息确实被消费者处理成功了.在⼀些对于数据可靠性要求⾼的场景,⽐较常⻅
模块划分
项目创建
创建SpringBoot项⽬.
使⽤SpringBoot2系列版本,Java8.
依赖引⼊SpringWeb和MyBatis
创建核心类
创建包mqserver.core
创建Exchange
public class Exchange {//使用name作为交换机的唯一身份标识private String name;//交换机类型DIRECT FANOUT TOPICprivate ExchangeType type = ExchangeType.DIRECT;//该交换机是否需要持久化存储private boolean durable = false;//Todo: 如果当前交换机没人使用 就自动删除private boolean autoDelete = false;//Todo: 额外参数选项//为了将这个数据存储到数据库中 要转换为json字符串存储private Map<String , Object> arguments = new HashMap<>();public String getArguments(){//把当前arguments参数从map转化为json字符串ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}//如果异常 返回空json字符串return "{}";}public void setArguments(String argumentsJson){//吧argumentsJson转化为map对象ObjectMapper objectMapper = new ObjectMapper();try {objectMapper.readValue(argumentsJson,new TypeReference<HashMap<String,Object>>(){});} catch (JsonProcessingException e) {e.printStackTrace();}}//针对arguments 再提供一组getter setter方法 在代码内部测试使用public Object getArguments(String key){return arguments.get(key);}public void setArguments(String key, Object value){arguments.put(key,value);}public void setArguments(Map<String,Object> arguments){this.arguments = arguments;}}
交换机类型作为枚举类
public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private final int type;private ExchangeType(int type){this.type = type;}public int getType(){return type;}}
name :交换机的名字.相当于交换机的⾝份标识.
type :交换机的类型.三种取值,DIRECT,FANOUT,TOPIC.
durable :交换机是否要持久化存储.true为持久化,false不持久化.
autoDelete:使⽤完毕后是否⾃动删除.预留字段,暂时未使⽤.
arguments :交换机的其他参数属性.预留字段,暂时未使⽤
创建MSGQueue
@Data
public class MSGQueue {//队列的唯一身份标识private String name;//表示队列是否持久化private boolean durable;//Todo: 表示是否独占//true 表示只能为一个消费者使用 false 表示为大家都能使用private boolean exclusive;//todo:自动删除private boolean autoDelete = false;//todo:参数列表private Map<String, Object> arguments = new HashMap<>();//表示当前队列都有哪些消费者订阅了private List<ConsumerEnv> consumerEnvList = new ArrayList<>();//记录当前取到了第几个消费者 方便实现轮询策略private AtomicInteger consumerSeq = new AtomicInteger(0);//添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}//暂时不考虑订阅者删除//挑选一个订阅者 来处理当前的消息public ConsumerEnv chooseConsumerEnv() {//轮询的方式取if (consumerEnvList.size() == 0) {//无人订阅return null;}//计算当前下标int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}public String getArguments() {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return "{}";}public void setArguments(String argumentsJson) {ObjectMapper objectMapper = new ObjectMapper();try {objectMapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key, Object value) {arguments.put(key, value);}public void setArguments(Map<String, Object> arguments) {this.arguments = arguments;}}
name :队列的名字.相当于队列的⾝份标识.
durable :交换机是否要持久化存储.true为持久化,false不持久化.
exclusive :独占(排他),队列只能被⼀个消费者使⽤.
autoDelete :使⽤完毕后是否⾃动删除.预留字段,暂时未使⽤.
arguments :交换机的其他参数属性.预留字段,暂时未使⽤
创建Binding
@Data
public class Binding {//交换机名private String exchangeName;//队列名private String queueName;//绑定关键字private String bindingKey;
}
exchangeName 交换机名字
queueName 队列名字
bindingKey 只在交换机类型为 TOPIC 时才有效.⽤于和消息中的 routingKey 进⾏匹配
创建Message
{private BasicProperties basicProperties = new BasicProperties();private byte[] body;//辅助属性//这两个属性不需要序列化//一个文件存储很多消息 找到某个消息使用下面两个偏移量来找到消息 前闭后开[)//文件开头到消息数据的位置偏移private transient long offsetBeg = 0;//文件结尾到消息数据的位置偏移private transient long offsetEnd = 0;//使用这个属性表示该消息在文件中是否为有效消息(逻辑删除)//0x1有效 0x0无效private byte isValid = 0x1;//创建工厂方法 让工厂方法帮我们封装一下message对象的过程//万一两个参数冲突 一外面的为主public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body){Message message = new Message();if(basicProperties != null){message.setBasicProperties(basicProperties);}//M-为前缀 和其他uuid做区分message.setMessageId("M-" + UUID.randomUUID().toString());message.basicProperties.setRoutingKey(routingKey);message.body = body;//此处是吧body和basicPro设置出来 其他属性暂时不设置return message;}//获取messageIdpublic String getMessageId(){return basicProperties.getMessageId();}//设置messageIdpublic void setMessageId(String messageId){basicProperties.setMessageId(messageId);}public String getRoutingKey(){return basicProperties.getRoutingKey();}public void setRoutingKey(String routingKey){basicProperties.setRoutingKey(routingKey);}public int getDeliverMode(){return basicProperties.getDeliverMode();}public void setDeliverMode(int mode){basicProperties.setDeliverMode(mode);}
}
BasicProperties参数类
@Data
public class BasicProperties implements Serializable {//消息的唯一身份标识 保证唯一性 使用UUID来创建private String messageId;//消息上带有的 和bindingKey做匹配private String routingKey;// 表示消息是否要持久化 1表示不持久化 2表示持久化private int deliverMode = 1;
}
Message 需要实现 Serializable 接⼝.后续需要把Message写⼊⽂件以及进⾏⽹络传输.
basicProperties 是消息的属性信息. body 是消息体.
offsetBeg 和 offsetEnd 表⽰消息在消息⽂件中所在的起始位置和结束位置.这⼀块具体的 设计后⾯再详细介绍.使⽤transient 关键字避免属性被序列化.
isValid ⽤来表⽰消息在⽂件中是否有效.这⼀块具体的设计后⾯再详细介绍.
createMessageWithId 相当于⼀个⼯⼚⽅法,⽤来创建⼀个Message实例.messageId通过 UUID的⽅式⽣成.
数据库设计
对于Exchange,MSGQueue,Binding,我们使⽤数据库进⾏持久化保存.
此处我们使⽤的数据库是SQLite,是⼀个更轻量的数据库. SQLite 只是⼀个动态库(当然,官⽅也提供了可执⾏程序exe),我们在Java中直接引⼊SQLite依赖,即可直接使⽤,不必安装其他的软件
配置sqlite
引入依赖
<dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version></dependency>
配置数据源
spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBCmybatis:mapper-locations: classpath:mapper/**Mapper.xml
此处我们约定,把数据库⽂件放到./data/meta.db 中. SQLite 只是把数据单纯的存储到⼀个⽂件中.⾮常简单⽅便
实现创建表和数据库基本操作
@Mapper
public interface MetaMapper {//提供三个核心建表方法void createExchangeTable();void createQueueTable();void createBindingTable();//针对上面三个基本概念进行插入删除void insertExchange(Exchange exchange);void deleteExchange(String exchange);void insertQueue(MSGQueue queue);void deleteQueue(String queueName);void insertBinding(Binding binding);void deleteBinding(Binding binding);List<Exchange> selectAllExchanges();List<MSGQueue> selectAllQueues();List<Binding> selectAllBindings();}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mq.mqserver.mapper.MetaMapper"><update id="createExchangeTable">create table if not exists exchange(name varchar(50) primary key,type int,durable boolean,autoDelete boolean,arguments varchar(1024));</update><update id="createQueueTable">create table if not exists queue(name varchar(50) primary key,durable boolean,exclusive boolean,autoDelete boolean,arguments varchar(1024));</update><update id="createBindingTable">create table if not exists binding(exchangeName varchar(50),queueName varchar(50),bindingKey varchar(256));</update>
实现DataBaseManager
管理数据库的
mqserver.datacenter.DataBaseManage
创建DataBaseManager类
通过这个类来封装针对数据库的操作
初始化数据库
public void init() {//手动获取到metaMappermetaMapper = MqApplication.context.getBean(MetaMapper.class);//建库建表 插入一些默认数据//如果数据库已经存在 不做任何操作 , 如果数据库不存在 则创建库创建表 构造默认数据//根据meta.db文件是否存在来做判断if (!checkDBExists()) {//数据库不存在 就进行建库建表操作//先创建一个data目录File dataDir = new File("./data");dataDir.mkdirs();//不存在 创建表createTable();createDefaultData();log.info("[DataBaseManager] 数据库初始化完成");} else {//数据库已经存在log.info("[DataBaseManager] 数据库已经存在");}}
针对MqApplication,需要新增⼀个context属性.并初始化.
@SpringBootApplication
public class MqApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(MqApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}}
实现checkDBExists
private boolean checkDBExists() {//判断meta.db文件是否存在File file = new File("./data/meta.db");return file.exists();}
实现createTable
//建表操作 不需要建库//首次执行数据库造作 就会自动创建meta.db文件(Mybatis执行)private void createTable() {metaMapper.createExchangeTable();metaMapper.createQueueTable();metaMapper.createBindingTable();log.info("[DataBaseManager] 创建表完成");}
实现createDefaultData
//创建默认数据//主要是添加默认交换机private void createDefaultData() {//构造默认交换机Exchange exchange = new Exchange();exchange.setName("");exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);log.info("[DataBaseManager] 创建默认数据完成");}
封装其他数据库操作
//封装其他数据库的操作public void insertExchange(Exchange exchange) {metaMapper.insertExchange(exchange);}public void deleteExchange(String exchangeName) {metaMapper.deleteExchange(exchangeName);}public void insertQueue(MSGQueue queue) {metaMapper.insertQueue(queue);}public void deleteQueue(String queueName) {metaMapper.deleteQueue(queueName);}public void deleteBinding(Binding binding) {metaMapper.deleteBinding(binding);}public void insertBinding(Binding binding) {metaMapper.insertBinding(binding);}public List<Exchange> selectAllExchanges() {return metaMapper.selectAllExchanges();}public List<MSGQueue> selectAllQueues(){return metaMapper.selectAllQueues();}public List<Binding> selectAllBindings(){return metaMapper.selectAllBindings();}
消息存储设计
- 设计思路:
消息需要在硬盘上存储.但是并不直接放到数据库中,⽽是直接使⽤⽂件存储
原因如下:
- 对于消息的操作并不需要复杂的增删改查.
- 对于⽂件的操作效率⽐数据库会⾼很多
我们给每个队列分配⼀个⽬录.⽬录的名字为data+队列名.形如 ./data/testQueue 该⽬录中包含两个固定名字的⽂件.
- queue_data.txt 消息数据文件 用来保存消息内容
- queue_stat.txt消息统计文件 用来保存消息统计信息
文件格式
queue_data.txt 文件格式
使用二进制方式存储 每个消息分为四个部分:
- 前四个字节表示Message对象的长度
- 后面若干字节 表示Message 内容
- 消息和消息之间首尾相连
每个Message基于Java标准库的 ObjectInputStream/ObjectOutputStream序列化
Message 对象中的offsetBeg和offsetEnd正是⽤来描述每个消息体所在的位置.
queue_stat.txt文件格式
使用文本方式存储 文件中只包含一行 里面包含两列 使用\t分割
第一列表示当前的总消息数目 第二列表示有效消息数目
形如
2000\t1500
创建MessageFileManager类
创建 mqserver.database.MessageFileManager
public class MessageFileManager {//定义内部类表示该队列的统计信息static public class Stat {public int totalCount;//总数量public int validCount;//有效消息数量}public void init(){//暂时不需要初始化}//约定消息文件所在的目录和文件名//获取指定队列对应的消息文件所在路径private String getQueueDir(String queueName) {return "./data/" + queueName;}//这个方法用来获取该队列的消息数据文件路径private String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}//这个方法用来获取该队列的消息统计文件路径private String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}
}
包含一个内部类stat 用来表示消息统计文件的内容
实现统计文件读写
private Stat readStat(String queueName) {Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}//写统计文件private void writeStat(String queueName, Stat stat) {//使用printWrite//outputstream打开文件默认情况下啊 会把源文件清空 此时相当于旧文件覆盖新文件try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}
实现创建队列目录
每个队列都有⾃⼰的⽬录和配套的⽂件.通过下列⽅法把⽬录和⽂件先准备好.
public void createQueueFiles(String queueName) throws IOException {// 1.创建⽬录指定队列的⽬录File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()) {boolean ok = baseDir.mkdirs();if (!ok) {throw new IOException(" 创建⽬录失败 ! baseDir=" + baseDir.getAbsolutePath());}}// 2.创建队列数据⽂件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {boolean ok = queueDataFile.createNewFile();if (!ok) {throw new IOException(" 创建⽂件失败 ! queueDataFile=" + queueDataFile.getAbsolutePath());}}// 3.创建队列统计⽂件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok = queueStatFile.createNewFile();if (!ok) {throw new IOException(" 创建⽂件失败 ! queueStatFile=" + queueStatFile.getAbsolutePath());}}// 4.给队列统计⽂件写⼊初始数据Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName, stat);}
把上述约定的⽂件都创建出来,并对消息统计⽂件进⾏初始化
实现删除队列目录
如果队列需要删除,则队列对应的⽬录/⽂件也需要删除
//删除队列的目录和文件//队列也是可以删除的 当队列删除之后 对应的消息文件也会被删除public void destroyQueueFiles(String queueName) throws IOException {//先删除里面的文件 在删除目录File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3) {//其中有任意一个失败 都算整体删除失败throw new IOException("删除队列目录和文件失败 baseDir=" + baseDir.getAbsolutePath());}}
注意:File类的delete⽅法只能删除空⽬录.因此需要先把内部的⽂件先删除掉.
检查队列文件是否存在
判定该队列的消息⽂件和统计⽂件是否存在.⼀旦出现缺失,则不能进⾏后续⼯作
//检查队列的目录和文件是否存在public boolean checkFilesExists(String queueName) {//判定队列的数据文件 和统计文件是否都存在File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}
实现消息对象序列化/反序列化
Message对象需要转成二进制写入文件,并且也需要吧文件中的二进制读出来解析成Message对象 此处针对这里的逻辑进行封装
创建common.BinaryTool包
public class BinaryTool {//将一个对象 序列化成一个数组public static byte[] toBytes(Object object) throws IOException {//这个流对象相当于一个变长的字节数组//就可以把object序列化的数据给逐渐的写入到byteArrayOutputStream中 在同一转化成byte[]try( ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()){try(ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){//此处的writeObject就会把该对象进行序列化 生成的二进制数据就会写入到ObjectOutputStream中//由于objectOutputStream关联到byteArrayOutputStream 所以最终结果写入到byteArrayOutputStreamobjectOutputStream.writeObject(object);}return byteArrayOutputStream.toByteArray();}}//将一个字节数字 反序列化成一个对象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){//从data这个byte[]数组中获取数据并进行反序列化object = objectInputStream.readObject();}}return object;}
}
- 使⽤ByteArrayInputStream/ByteArrayOutputStream针对byte[]进⾏封装,⽅便后续操作.(这两个流对象是纯内存的,不需要进⾏close).
- 使⽤ObjectInputStream/ObjectOutputStream进⾏序列化/反序列化操作.通过内部的readObject/writeObject即可完成对应操作.
- 此处涉及到的序列化对象,需要实现Serializable接⼝.这⼀点咱们的Message对象已经实现过了.
实现写入消息文件
//把一个新的消息 放到队列对应的文件中public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {//检查档期那要写入的队列对应的文件是否存在if (!checkFilesExists(queue.getName())) {throw new MqException("[MessageFileManager] 队列对应文件不存在 queueName = " + queue.getName());}//把message对象进行序列化 转化成二进制字节数组byte[] messageBinary = BinaryTool.toBytes(message);synchronized (queue) {//获取到当前队列数据文件的长度 用来计算message的 offsetBeg和offsetEnd//offsetEnd 就是当前文件长度 + 4 + message自身长度File queueDataFile = new File(getQueueDataPath(queue.getName()));message.setOffsetBeg(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);//写消息到文件中 追加到数据文件末尾 而不是覆盖try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {//接下来先写当前消息的长度//这个操作就是写入四个字节了dataOutputStream.writeInt(messageBinary.length);//写入消息本体dataOutputStream.write(messageBinary);}}//更新消息统计文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(), stat);}}
- 考虑线程安全 按照队列维度进行加锁
- 需要记录Message对象在⽂件中的偏移量.后续的删除操作依赖这个偏移量定位到消息.offsetBeg是原有⽂件⼤⼩的基础上,再+4.4个字节是存放消息⼤⼩的空间.(参考上⾯的图)
- 写完消息,要同时更新统计信息
创建common.MqException ,作为⾃定义异常类.后续业务上出现问题,都统⼀抛出这个异常..
/*** 自定义异常类*/
public class MqException extends Exception{public MqException(String reason) {super(reason);}
}
实现删除消息
此处删除消息只是逻辑删除 ",即把Message类中的isValid字段设置为0
//删除消息//逻辑删除 将isValid属性设置成0//先把文件中的数据还原为message对象 isValid改成0 在重新写回去//此处message必须包含有效的offsetBeg 和OffsetEndpublic void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException {//随机访问 在文件的指定位置读写//seek方法移动光标synchronized (queue) {try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {//先从文件中读取相应的message数据byte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);//把二进制数据转化为Message对象Message diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);//设置isValiddiskMessage.setIsValid((byte) 0x0);//重新写入byte[] bufferDest = BinaryTool.toBytes(diskMessage);//此时重新 seek 虽然上面已经seek过 但是进行了读操作 导致文件光标向后移动到下一个消息的位置//因此需要重新seek 调整光标randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);}//更新统计文件Stat stat = readStat(queue.getName());if ((stat.validCount > 0)) {stat.validCount -= 1;}writeStat(queue.getName(), stat);}
实现消息加载
把消息内容从⽂件加载到内存中.这个功能在服务器重启,和垃圾回收的时候都很关键
public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))) {try (DataInputStream dataInputStream = new DataInputStream(inputStream)) {//模拟光标位置long currentOffset = 0;//循环读取消息while (true) {//读取当前消息//这里可能会读到文件末尾 readInt方法到达末尾会抛出EOFException异常int messageSize = dataInputStream.readInt();//根据消息长度 读取消息byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {throw new MqException("[MessageFileManager] 文件格式错误 queueName = " + queueName);}//把读到的二进制数据 反序列化为Message对象Message message = (Message) BinaryTool.fromBytes(buffer);//判断这个小时是否为无效对象if (message.getIsValid() != 0x1) {//无效数据直接跳过currentOffset += (4 + messageSize);continue;}//有效数据 则需要把这个数据加入到链表中 加入前要填写offsetBeg offsetEnd//进行计算时 需要知道当前光标的位置//此时手动计算下标message.setOffsetBeg(currentOffset + 4);message.setOffsetEnd(currentOffset + 4 + messageSize);currentOffset += (4 + messageSize);//添加到链表中messages.add(message);}}catch (EOFException e){//此时不是真正处理异常 而是处理正常业务逻辑System.out.println("[MessageFileManager] 恢复Message数据完成");}}return messages;}
实现垃圾回收
上述删除操作,只是把消息在⽂件上标记成了⽆效.并没有腾出硬盘空间.最终⽂件⼤⼩可能会越积越多.因此需要定期的进⾏批量清除.此处使⽤类似于复制算法.当总消息数超过2000,并且有效消息数⽬少于50%的时候,就触发GC. GC的时候会把所有有效消息加载出来,写⼊到⼀个新的消息⽂件中,使⽤新⽂件,代替旧⽂件即可.
//检查当前是否要针对该队列的消息数据文件进行GCpublic boolean checkGC(String queueName){//判定是否要GC 是根据总消息数和有效消息数 这两个值 是在消息统计文件中的Stat stat = readStat(queueName);if(stat.totalCount > 2000 && (double)stat.validCount /(double) stat.totalCount < 0.5){return true;}return false;}
gc
//垃圾回收操作//使用复制算法//创建新文件 名字为queue_data_new.txt//把之前的消息数据文件中有效的消息读取出来 写到新文件中//删除旧的消息 把文件名改回queue_data.txt//同时要记得更新消息统计文件public void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {//进行gc时 其他线程不能对文件进行修改synchronized (queue){//由于gc操作比较耗时 此处统计消耗时间long gcBeg = System.currentTimeMillis();//创建一个新文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));if(queueDataNewFile.exists()){//正常情况下 这个文件不应该存在 说明上一次gc一半 程序意外结束throw new MqException("[MessageFileManager] gc 时发现该队列的queue_data_new 已经存在");}boolean ok = queueDataNewFile.createNewFile();if(!ok){throw new MqException("[MessageFileManager] 创建文件失败 queueDataNewFile =" + queueDataNewFile.getAbsolutePath() );}//从旧文件中获取出所有有效消息LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());//把有效消息 写入到新文件try(OutputStream outputStream = new FileOutputStream(queueDataNewFile,true)){try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){for(Message message : messages){byte[] buffer = BinaryTool.toBytes(message);dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}//删除旧文件 把新文件重命名File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if(!ok){throw new MqException("[MessageFileManager] 删除旧数据失败 queueDataOldFile" + queueDataOldFile.getAbsolutePath());}//重命名文件ok = queueDataNewFile.renameTo(queueDataOldFile);if(!ok){throw new MqException("[MessageFileManager] 文件重命名失败 queueDataNewFile" + queueDataNewFile.getAbsolutePath() +", queueDataOldFile = " + queueDataOldFile.getAbsolutePath());}//更新统计文件Stat stat = new Stat();stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕 queueName = " + queue.getName() +"time: " + (gcEnd - gcBeg) + " ms");}}
整合数据库和文件
上述代码中,使⽤数据库存储了Exchange,Queue,Binding,使⽤⽂本⽂件存储了Message.接下来我们把两个部分整合起来,统⼀进⾏管理
创建DiskDataCenter
使⽤DiskDataCenter来综合管理数据库和⽂本⽂件的内容.DiskDataCenter 会持有DataBaseManager和MessageFileManager对象
public class DiskDataCenter {private DataBaseManager dataBaseManager = new DataBaseManager();private MessageFileManager messageFileManager = new MessageFileManager();public void init(){//对上面两个类进行初始化dataBaseManager.init();messageFileManager.init();}//封装交换机操作public void insertExchange(Exchange exchange){dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName){dataBaseManager.deleteExchange(exchangeName);}public List<Exchange> selectAllExchanges(){return dataBaseManager.selectAllExchanges();}//封装队列操作public void insertQueue(MSGQueue queue) throws IOException {//创建队列的同时 还要创建对应的目录dataBaseManager.insertQueue(queue);messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {//删除队列也要删除对应的目录dataBaseManager.deleteQueue(queueName);messageFileManager.destroyQueueFiles(queueName);}public List<MSGQueue> selectAllQueues(){return dataBaseManager.selectAllQueues();}//封装绑定操作public void insertBinding(Binding binding){dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding){dataBaseManager.deleteBinding(binding);}public List<Binding> selectAllBindings(){return dataBaseManager.selectAllBindings();}//封装消息操作public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);if(messageFileManager.checkGC(queue.getName())){messageFileManager.gc(queue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}
}
⼩结
通过上述封装,把数据库和硬盘⽂件两部分合并成⼀个整体.上层代码在调⽤的时候则不再关⼼该数据 是存储在哪个部分的.
这个类的整体实现并不复杂,关键逻辑在之前都已经准备好了
内存数据结构设计
硬盘上存储数据,只是为了实现"持久化"这样的效果.但是实际的消息存储/转发,还是主要靠内存的结构.
对于MQ来说,内存部分是更关键的,内存速度更快,可以达成更⾼的并发
创建MemoryDataCenter
创建mqserver.datacenter.MemoryDataCenter
使用这个类来管理所有内存中的数据
- 使⽤四个哈希表,管理Exchange,Queue,Binding,Message.
- 使⽤⼀个哈希表+链表管理队列->消息之间的关系.
- 使⽤⼀个哈希表+哈希表管理所有的未被确认的消息.
为了保证消息被正确消费了,会使⽤两种⽅式进⾏确认.⾃动ACK和⼿动ACK.
其中⾃动ACK是指当消息被消费之后,就会⽴即被销毁释放.
其中⼿动ACK是指当消息被消费之后,由消费者主动调⽤⼀个basicAck⽅法,进⾏主动确认.服务器
收到这个确认之后,才能真正销毁消息.
此处的"未确认消息"就是指在⼿动ACK模式下,该消息还没有被调⽤basicAck.此时消息不能删除,
但是要和其他未消费的消息区分开.于是另搞了个结构.
当后续basicAck到了,就可以删除消息了
/*** 使用这个类来统一管理内存中的所有数据*/
public class MemoryDataCenter {//key是exchangeName, value是exchange对象private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();//key是queueName value是MSGQueue对象private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();//key是exchangeName 第二个key是queueNameprivate ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();//key是messageId value是一个Message对象private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();//key是queueName value是一个Message的链表private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();public void insertExchange(Exchange exchange) {exchangeMap.put(exchange.getName(), exchange);System.out.println("[MemoryDataCenter] 新交换机添加成功 exchangeName = " + exchange.getName());}public Exchange getExchange(String exchangeName) {return exchangeMap.get(exchangeName);}public void deleteExchange(String exchangeName) {exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 交换机删除成功 exchangeName = " + exchangeName);}public void insertQueue(MSGQueue queue) {queueMap.put(queue.getName(), queue);System.out.println("[MemoryDataCenter] 新队列添加成功 queueName = " + queue.getName());}public MSGQueue getQueue(String queueName) {return queueMap.get(queueName);}public void deleteQueue(String queueName) {queueMap.remove(queueName);System.out.println("[MemoryDataCenter] 队列删除成功 queueName = " + queueName);}public void insertBinding(Binding binding) throws MqException {
// //先使用exchangeName来查找哈希表是否有存在
// ConcurrentHashMap<String, Binding> bindingMap = this.bindingsMap.get(bingding.getExchangeName());
// if (bindingMap == null){
// bindingsMap.put(binding.getExchangeName(),bindingMap);
// }//现根据exchangeName查一下 对应的哈希表是否存在 不存在就创建一个ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());synchronized (bindingMap) {//再根据queueName查找一下 如果已经存在 就抛出异常 不存在才能插入if (bindingsMap.get(binding.getQueueName()) != null) {throw new MqException("[MemoryDataCenter] 绑定已经存在 exchangeName = " + binding.getExchangeName() +", queueName = " + binding.getQueueName());}bindingMap.put(binding.getQueueName(), binding);}System.out.println("[MemoryDataCenter] 新绑定添加成功 exchangeName = " + binding.getExchangeName() +",queueName" + binding.getQueueName());}//获取绑定//1. 根据exchangeName和queueName确定唯一一个Binding//2. 根据exchangeName获取到所有的bindingpublic Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName);if (bindingMap == null) {return null;}return bindingMap.get(queueName);}public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) {return bindingsMap.get(exchangeName);}//删除绑定public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if (bindingMap == null) {//该交换机没有绑定任何队列throw new MqException("[MemoryDataCenter] 绑定不存在 exchangeName = " + binding.getExchangeName() +", queueName = " + binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 删除绑定成功 exchangeName = " + binding.getExchangeName() +",queueName" + binding.getQueueName());}//添加消息public void addMessage(Message message) {messageMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 新消息添加成功 messageId = " + message.getMessageId());}//根据id查消息public Message getMessage(String messageId) {return messageMap.get(messageId);}//根据id删消息public void removeMessage(String messageId) {messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息被移除 messageId = " + messageId);}//发送消息到指定队列public void sendMessage(MSGQueue queue, Message message) {//先把消息放在指定数据结构中//现根据队列的名称 找到该队列对应的消息链表
// LinkedList<Message> messages = queueMessageMap.get(queue.getName());
// if (messages == null) {
// messages = new LinkedList<>();
// queueMessageMap.put(queue.getName(), messages);
// }LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>());//把新消息添加到message中synchronized (messages){messages.add(message);}//在这里把该消息向消息中心插入addMessage(message);System.out.println("[MemoryDataCenter] 消息被投递到队列中 messageId = " + message.getMessageId());}//从队列中取消息public Message pollMessage(String queueName){//根据队列名 查找对应的消息链表LinkedList<Message> messages = queueMessageMap.get(queueName);//如果没找到 则说明队列中没有任何消息if(messages == null){return null;}synchronized (messages){if(messages.size() == 0){return null;}//链表中有元素 就进行头删 取出该元素Message currentMessage = messages.remove(0);System.out.println("[MemoryDataCenter] 消息从队列中取出 messageId = " + currentMessage.getMessageId());return currentMessage;}}//获取指定队列中消息的个数public int getMessageCount(String queueName){LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages == null){//队列中无消息return 0;}synchronized (messages){return messages.size();}}//未确认消息的操作//添加未确认的消息public void addMessageWaitAck(String queueName, Message message){ConcurrentMap<String,Message> messageMap = queueMessageWaitAckMap.computeIfAbsent(queueName,k -> new ConcurrentHashMap<>());messageMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter] 消息进入到待确认队列 messageId=" + message.getMessageId());}//删除未确认的消息public void removeMessageWaitAck(String queueName, String messageId){ConcurrentHashMap<String, Message> messageMap = queueMessageWaitAckMap.get(queueName);if(messageMap == null){return;}messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息从待确认队列删除 messageId=" + messageId);}//获取指定未确认的消息public Message getMessageWaitAck(String queueName, String messageId){ConcurrentHashMap<String, Message> messageMap = queueMessageWaitAckMap.get(queueName);if(messageMap == null){return null;}return messageMap.get(messageId);}//当服务器重启后 要从硬盘上读取数据 回复到内存中public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {//清空所有旧数据exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear();//恢复所有交换机List<Exchange> exchanges = diskDataCenter.selectAllExchanges();for (Exchange exchange : exchanges){exchangeMap.put(exchange.getName(),exchange);}//恢复所有队列List<MSGQueue> queueList = diskDataCenter.selectAllQueues();for(MSGQueue queue : queueList){queueMap.put(queue.getName(),queue);}//恢复所有绑定List<Binding> bindingList = diskDataCenter.selectAllBindings();for(Binding binding : bindingList){ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(),binding);}//恢复所有消息//遍历所有的队列 根据每个队列的名字 获取到所有的信息for(MSGQueue queue : queueList){LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());//将消息加载到内存queueMessageMap.put(queue.getName(),messages);for(Message message : messages){messageMap.put(message.getMessageId(),message);}}//未确认的消息 不需要从硬盘获取//在等待ack的过程中 未被确认的消息就转变为未被取走的消息}
}
虚拟主机设计
⾄此,内存和硬盘的数据都已经组织完成.接下来使⽤"虚拟主机"这个概念,把这两部分的数据也串起来.并且实现⼀些MQ的关键API
注意:在RabbitMQ中,虚拟主机是可以随意创建/删除的.咱们此处为了实现简单,并没有实现虚拟主机的管理.因此我们默认就只有⼀个虚拟主机的存在.但是在数据结构的设计上我们预留了对于多虚拟主机的管理.保证不同虚拟主机中的Exchange,Queue,Binding,Message都是相互隔离的
创建VirtualHost
创建mqserver.VirtualHost
@Slf4j
@Getter
//通过这个类表示虚拟主机
//作为业务逻辑的整合 就需要对代码中抛出的异常进行处理了
public class VirtualHost {private String virtualHostName;private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();private DiskDataCenter diskDataCenter = new DiskDataCenter();private Router router = new Router();private ConsumerManager consumerManager = new ConsumerManager(this);private final Object exchangeLocker = new Object();private final Object queueLocker = new Object();public VirtualHost(String virtualHostName) {this.virtualHostName = virtualHostName;//对于memoryDataCenter不需要初始化 在类内部已经初始化过了//对于diskDataCenter来说 要进行初始化操作diskDataCenter.init();try {memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();log.info("恢复内存数据失败");}}//创建交换机//如果交换机不存在 则创建 如果存在 直接返回//返回值是boolean 创建成功true 失败 返回falsepublic boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) {//把交换机的名字 加上虚拟主机作为前缀exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if (existsExchange != null) {//该交换机已经存在log.info("交换机已经存在 exchangeName = " + exchangeName);return true;}//真正创建交换机Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);//把交换机对象写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}//4.把交换机对象写入内存memoryDataCenter.insertExchange(exchange);log.info("交换机创建完成 exchangeName = " + exchangeName);//先硬盘 后内存 因为内存容易失败 如果硬盘失败 就不向内存中存储}return true;} catch (Exception e) {log.info("交换机创建失败 exchangeName = " + exchangeName);e.printStackTrace();return false;}}public boolean exchangeDelete(String exchangeName) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {//先找到对应的交换机Exchange toDelete = memoryDataCenter.getExchange(exchangeName);if (toDelete == null) {throw new MqException("交换机不存在 无法删除");}//删除硬盘上的数据if (toDelete.isDurable()) {diskDataCenter.deleteExchange(exchangeName);}//删除内存中的交换机数据memoryDataCenter.deleteExchange(exchangeName);log.info("交换机删除成功 exchangeName = " + exchangeName);}return true;} catch (Exception e) {log.info("交换机删除失败 exchangeName = " + exchangeName);e.printStackTrace();return false;}}//创建队列public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) {//拼接队列名字queueName = virtualHostName + queueName;try {synchronized (queueLocker) {MSGQueue existsQueue = memoryDataCenter.getQueue(queueName);if (existsQueue != null) {log.info("队列已经存在 queueName = " + queueName);return true;}//创建队列对象MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);//写硬盘if (durable) {diskDataCenter.insertQueue(queue);}//写内存memoryDataCenter.insertQueue(queue);log.info("队列创建成功 queueName = " + queueName);}return true;} catch (Exception e) {log.info("队列创建失败 queueName = " + queueName);e.printStackTrace();return false;}}//删除队列public boolean queueDelete(String queueName) {queueName = virtualHostName + queueName;try {MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("队列不存在 无法删除 queueName = " + queueName);}//删除硬盘数据if (queue.isDurable()) {diskDataCenter.deleteQueue(queueName);}memoryDataCenter.deleteQueue(queueName);log.info("交换机删除成功 exchangeName = " + queueName);return true;} catch (Exception e) {log.info("队列创建失败 + queueName = " + queueName);e.printStackTrace();return false;}}//绑定public boolean queueBind(String queueName, String exchangeName, String bindingKey) {queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {Binding existsBinding = memoryDataCenter.getBinding(exchangeName, queueName);if (existsBinding != null) {throw new MqException("binding 已经存在 queueName = " + queueName);}//验证bindingkey是否合法if (!router.checkBindingKey(bindingKey)) {throw new MqException("bindingKey非法 bindingKey = " + bindingKey);}//创建Binding对象Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);//获取对应的交换机和队列 如果交换机或者队列不存在 也无法创建MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("队列不存在 queueName = " + queueName);}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("交换机不存在 exchangeName = " + exchangeName);}//先写硬盘if (queue.isDurable() && exchange.isDurable()) {diskDataCenter.insertBinding(binding);}//在写内存memoryDataCenter.insertBinding(binding);log.info("绑定成功 exchangeName = " + exchangeName + "queueName + " + queueName);}}return true;} catch (Exception e) {log.info("绑定失败 exchangeName = " + exchangeName + "queueName + " + queueName);e.printStackTrace();return false;}}public boolean queueUnbind(String queueName, String exchangeName) {queueName = virtualHostName + queueName;exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {//获取binding看是否已经存在Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if (binding == null) {throw new MqException("删除绑定失败 绑定不存在 exchangeName" + exchangeName + ", queueName" + queueName);}
// //获取一下对应的队列和交换机 看是否存在
// MSGQueue queue = memoryDataCenter.getQueue(queueName);
// if(queue == null){
// throw new MqException("对应的队列不存在 queueName = " + queueName);
// }
// Exchange exchange = memoryDataCenter.getExchange(exchangeName);
// if(exchange == null){
// throw new MqException("对应的交换机不存在 exchangeName" + exchangeName);
// }//删除硬盘上的数据
// if(queue.isDurable() && exchange.isDurable()){
// diskDataCenter.deleteBinding(binding);
// }//无论绑定是否持久化 都进行删除diskDataCenter.deleteBinding(binding);//删除内存的数据memoryDataCenter.deleteBinding(binding);}}return true;} catch (Exception e) {log.info("删除绑定失败");e.printStackTrace();return false;}}//发送交换机到指定的交换机/队列中public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {//转换交换机的名字exchangeName = virtualHostName + exchangeName;//检查routingKey是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法 routingKey = " + routingKey);}//查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在 exchangeName = " + exchangeName);}//判定交换机的类型if (exchange.getType() == ExchangeType.DIRECT) {//直接交换机的转发规则//以routingKey作为队列的名字 直接把消息写入指定的队列中//此时可以无视绑定关系String queueName = virtualHostName + routingKey;//构造消息对象Message message = Message.createMessageWithId(routingKey, basicProperties, body);//查找该队列名对应的对象MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new Exception("[VirualHost] 队列不存在 queueName = " + queueName);}//队列存在 直接给队列中写入消息sendMessage(queue, message);} else {//以fanout和topic的方式转发//找到该交换机关联的所有绑定 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBindings(exchangeName);for (Map.Entry<String, Binding> entry : bindingsMap.entrySet()) {//获取到绑定对象Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null) {//此处不跑出异常 可能有多个这样的队列//希望不要因为一个队列的失败 影响其他队列消息的传输log.info("basicPublish 发送消息时 发现消息不存在 queueName = " + binding.getQueueName());continue;}//构造消息对象Message message = Message.createMessageWithId(exchangeName, basicProperties, body);//判断这个消息是否能转发给该队列//如果是fanout 则所有绑定的队列都要转发//如果是topic 还要判定下 bindkey和routingKey是否匹配if (!router.route(exchange.getType(), binding, message)) {continue;}//真正转发消息给队列sendMessage(queue, message);}}return true;} catch (Exception e) {log.info("消息发送失败");e.printStackTrace();return false;}}private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息 就是把消息写入硬盘和内存中int deliverMode = message.getDeliverMode();//deliverMode 为1 不持久化 为2 不持久化if (deliverMode == 2) {diskDataCenter.sendMessage(queue, message);}memoryDataCenter.sendMessage(queue, message);//通知消费者可以消费消息了consumerManager.notifyConsumer(queue.getName());}//添加一个队列的订阅者 但队列收到消息之后 就要把消息推送给对应的订阅者public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {//构造一个consumerEnv对象 把这个对应的队列找到 再把这个Consumer对象添加到队列中queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);log.info("basicConsumer成功 queueName :{}", queueName);return true;} catch (Exception e) {log.info("basicConsumer 失败 queueName :{}", queueName);e.printStackTrace();return false;}}public boolean basicAck(String queueName, String messageId){queueName = virtualHostName + queueName;try {//获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null){throw new MqException("[VirtualHost] 要确认的消息不存在 messageId = " +messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue == null){throw new MqException("[virtualHost] 要确认的队列不存在 queueName = " + queueName);}//删除硬盘上的数据if(message.getDeliverMode() == 2){diskDataCenter.deleteMessage(queue,message);}//删除消息中心的数据memoryDataCenter.removeMessage(messageId);//删除待确认集合中的消息memoryDataCenter.removeMessageWaitAck(queueName,messageId);log.info("basicAck成功 消息被成功确认 queueName:{},messageId:{}",queueName,messageId);return true;} catch (MqException | IOException | ClassNotFoundException e) {log.info("basicAck失败 消息确认失败 queueName:{},messageId:{}",queueName,messageId);return false;}}
}
路由规则
//实现交换机的转发规则 和验证routingKey是否合法
public class Router {//bindingKey构造规则//数字字母下划线//使用.分割//允许*和#public boolean checkBindingKey(String bindingKey) {//todoif (bindingKey.length() == 0) {//空字符串 也是合法情况return true;}//检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}//检查*或者#是否是独立的部分String[] words = bindingKey.split("\\.");for (String word : words) {//检查word长度 >1 且包含#或*就不合法if (word.length() > 1 && word.contains("*") || word.contains("#")) {return false;}}//约定通配符之间的相邻关系//形如这种 aaa.#.#.bbb aaa.*.#.bbb aaa.#.*.bbb非法//aaa.*.*.bbb合法for (int i = 0; i < words.length - 1; i++) {//是否为连续两个#if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}//#连着*if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}//*连着#if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}//数字字母下划线//使用.分割public boolean checkRoutingKey(String routingKey) {if (routingKey.length() == 0) {//空字符串 合法 在使用fanout交换机时 routingKey用不上return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);//判断该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}//判断该字母是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}//判断该字母是否是阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}//判定是否是_或者.if (ch == '_' || ch == '.') {continue;}//该字符不满足任何一种情况 就返回falsereturn false;}return true;}//这个方法用来判定该消息是否可以转发给这个绑定对应的队列public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException {//TODO//根据不同的exchangeType使用不同的转发规则if (exchangeType == ExchangeType.FANOUT) {//所有都转发 直接返回true;return true;} else if (exchangeType == ExchangeType.TOPIC) {return routeTopic(binding, message);} else {//不应该存在这种情况throw new MqException("[Router] 交换机类型非法 exchangeType = " + exchangeType);}}private boolean routeTopic(Binding binding, Message message) {//先把这两个key进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");int bindingIndex = 0;int routingIndex = 0;//此处使用whilewhile (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {//如果遇到* 直接进入下一轮 *可以匹配到任意一个部分if (bindingTokens[bindingIndex].equals("*")) {bindingIndex++;routingIndex++;continue;} else if(bindingTokens[bindingIndex].equals("#")){//如果遇到#号 要先看看有没有下一个位置bindingIndex++;if(bindingIndex == bindingTokens.length){//该#后面没有东西 是最后一个字符return true;}//后面还有东西//拿着这个内容 在routingKey中往后找 找到对应的位置//findNextMatch 这个方法用来查找该部分 在routingKey中的位置 返回该下标 没找到返回-1routingIndex = findNextMatch(routingTokens,routingIndex,bindingTokens[bindingIndex]);if(routingIndex == -1){return false;}//找到的匹配的情况 就继续往后匹配bindingIndex++;routingIndex++;}else {//如果遇到普通字符串if(!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])){return false;}bindingIndex++;routingIndex++;}}//判定是否是双方到达末尾//比如 aaa.bbb.ccc和aaa.bbb是要匹配失败的if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length){return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if(routingTokens[i].equals(bindingToken)){return i;}}return -1;}}
订阅消息
添加一个订阅者
/添加一个队列的订阅者 但队列收到消息之后 就要把消息推送给对应的订阅者public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {//构造一个consumerEnv对象 把这个对应的队列找到 再把这个Consumer对象添加到队列中queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);log.info("basicConsumer成功 queueName :{}", queueName);return true;} catch (Exception e) {log.info("basicConsumer 失败 queueName :{}", queueName);e.printStackTrace();return false;}}
Consumer相当于一个回调函数 放在common.Consumer
中
@FunctionalInterface
public interface Consumer {//每次服务器收到消息 调用 把消息推送给消费者//此处参考rabbitMqvoid handleDelivery(String consumerTag, BasicProperties basicProperties,byte[] body) throws MqException, IOException;
}
创建订阅者管理类
创建mqserver.core.ConsumerManager
//通过这个类来实现消费者消费消息的逻辑
public class ConsumerManager {//持有virualhost来操作数据private VirtualHost parent;//指定一个线程池 执行具体的回调函数private ExecutorService workPool = Executors.newFixedThreadPool(4);//存放令牌的队列 实际上就是队列名 为了让线程池知道是哪一个队列的消息private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();//扫描线程private Thread scannerThread = null;public ConsumerManager(VirtualHost parent) {this.parent = parent;scannerThread = new Thread(()->{while (true){try {//1.拿到令牌String queueName = tokenQueue.take();//2.根据令牌 找到队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue == null){throw new MqException("[ConsumerManager] 取令牌后发现 该队列名不存在 queueName = "+ queueName);}//3.从这个队列中消费一个消息synchronized (queue){consumerMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});//设置为后台线程scannerThread.setDaemon(true);scannerThread.start();}public void notifyConsumer(String queueName) throws InterruptedException {tokenQueue.put(queueName);}public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {//找到对应的队列MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);if(queue == null){throw new MqException("[ConsumerManager] 队列不存在 queueName = " + queueName);}ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumerEnv(consumerEnv);//如果此时队列中已经有一些消息了 就需要立即消费int n = parent.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {//这个方法调用一次就消费一条消息consumerMessage(queue);}}}private void consumerMessage(MSGQueue queue) {//消费消息//1.按照轮询的方式 找到消费者ConsumerEnv luckyDog = queue.chooseConsumerEnv();if(luckyDog == null){//当前对象没有消费者 暂时不需要 等有消费者再说return;}//从队列中取出一条消息Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());if(message == null){//当队列中还没有消息,也不需要消费return;}/*** 为了达成消息不丢失这样的效果* 1. 在真正执行回调之前 先把这个消息放在待确认集合中 避免因为回调失败 导致的消息丢失* 2. 真正执行回调* 3. 如消费者采用的是autoAck = true 默认回调函数执行结束之后不抛出异常 就算消费成功 然后就可以删除消息* 硬盘 内存消息中心的哈希表 上面的待确认消息集合* 4. 当前消费者采取的是autoAck = false 手动应答 需要消费者自己在自己的回调函数内部 调用basicAck这个API**///把消息带入到消费者的回调方法中 去给线程池执行parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);//真正执行回调workPool.submit(()->{try{luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(), message.getBody());log.info("消息成功消费 queueName = " + queue.getName());//如果是自动应答 就可以直接把消息删除了if(luckyDog.isAutoAck()){//删硬盘if(message.getDeliverMode() == 2){parent.getDiskDataCenter().deleteMessage(queue,message);}//删待确认集合parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(),message.getMessageId());//删除内存中的消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());log.info("消息成功消费 queueName :{}" ,queue.getName());}//如果是手动应答 先不处理 等消费者调用basicAck处理}catch (Exception e){e.printStackTrace();}});}
}
- parent ⽤来记录虚拟主机.
- 使⽤⼀个阻塞队列⽤来触发消息消费.称为令牌队列.每次有消息过来了,都往队列中放⼀个令牌(也就是队列名),然后消费者再去消费对应队列的消息.
- 使⽤⼀个线程池⽤来执⾏消息回调
这样令牌队列的设定避免搞出来太多线程.否则就需要给每个队列都安排⼀个单独的线程了,如果队列很多则开销就⽐较⼤了
消息确认
//只有在手动应答时 才调用 应答成功 删除这条消息public boolean basicAck(String queueName, String messageId){queueName = virtualHostName + queueName;try {//获取到消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null){throw new MqException("[VirtualHost] 要确认的消息不存在 messageId = " +messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if(queue == null){throw new MqException("[virtualHost] 要确认的队列不存在 queueName = " + queueName);}//删除硬盘上的数据if(message.getDeliverMode() == 2){diskDataCenter.deleteMessage(queue,message);}//删除消息中心的数据memoryDataCenter.removeMessage(messageId);//删除待确认集合中的消息memoryDataCenter.removeMessageWaitAck(queueName,messageId);log.info("basicAck成功 消息被成功确认 queueName:{},messageId:{}",queueName,messageId);return true;} catch (MqException | IOException | ClassNotFoundException e) {log.info("basicAck失败 消息确认失败 queueName:{},messageId:{}",queueName,messageId);return false;}}
网络通信协议设计
⽣产者和消费者都是客⼾端,都需要通过⽹络和BrokerServer进⾏通信.
此处我们使⽤TCP协议,来作为通信的底层协议.同时在这个基础上⾃定义应⽤层协议,完成客⼾端对服务器这边功能的远程调⽤
要调⽤的功能有:
- 创建channel
- 关闭channel
- 创建exchange
- 删除exchange
- 创建queue
- 删除queue
- 创建binding
- 删除binding
- 发送message
- 订阅message
- 发送ack
- 返回message(服务器->客⼾端)
设计应用层协议
因为Message的消息体本⾝就是⼆进制的.因此不太⽅便使⽤json等⽂本格式的协议
其中type表⽰请求响应不同的功能.取值如下:
- 0x1 创建channel
- 0x2 关闭channel
- 0x3 创建exchange
- 0x4 销毁exchange
- 0x5 创建queue
- 0x6 销毁queue
- 0x7 创建binding
- 0x8 销毁binding
- 0x9 发送message
- 0xa 订阅message
- 0xb 返回ack
- 0xc 服务器给客⼾端推送的消息.(被订阅的消息)响应独有的.
其中payload部分,会根据不同的type,存在不同的格式.
对于请求来说,payload表⽰这次⽅法调⽤的各种参数信息.
对于响应来说,payload表⽰这次⽅法调⽤的返回值
定义Request/Response
创建common.Request common.Response
@Data
//表示一个网络通信中的请求对象 按照自定义协议的格式来展开的
public class Request {private int type;private int length;private byte[] payload;
}
//这个对象表示一个响应
@Data
public class Response {private int type;private int length;private byte[] payload;
}
定义参数父类
构造⼀个类表⽰⽅法的参数,作为Request的payload.
不同的⽅法中,参数形态各异,但是有些信息是通⽤的,使⽤⼀个⽗类表⽰出来.具体每个⽅法的参数再通过继承的⽅式体现
common.BasicArguments
@Data
//使用这个类表示方法的公共参数
//后续每个方法都会有一些不同的参数 不同的参数再分别使用不同的子类来表示
public class BasicArguments implements Serializable {//表示一次请求/响应的身份标识 可以把请求和响应对应protected String rid;//这次通信使用的channel的身份标识protected String channelId;
}
定义返回值父类
和参数同理,也需要构造⼀个类表⽰返回值,作为Response的payload
common.BasicReturns
@Data
//表示返回值的公共信息
public class BasicReturns implements Serializable {//表示一次请求/响应的身份标识 可以把请求和响应对应protected String rid;//这次通信使用的channel的身份标识protected String channelId;//表示方法的返回值protected boolean ok;
}
定义其他参数类
针对每个VirtualHost提供的⽅法,都需要有⼀个类表⽰对应的参数
ExchangeDeclareArguments
@Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private boolean durable;private boolean autoDelete;private Map<String, Object> arguments;
}
⼀个创建交换机的请求,形如:
- 可以把ExchangeDeclareArguments转成byte[],就得到了下列图⽚的结构.
- 按照length⻓度读取出payload,就可以把读到的⼆进制数据转换成ExchangeDeclareArguments 对象
ExchangeDeleteArguments
@Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {private String exchangeName;
}
QueueDeclareArguments
@Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {private String queueName;private boolean durable;private boolean exclusive;private boolean autoDelete;private Map<String,Object> arguments;}
QueueDeleteArguments
@Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {private String queueName;}
QueueBindArguments
@Data
public class QueueBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;private String bindingKey;
}
QueueUnbindArguments
@Data
public class QueueUnbindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;}
BasicPublishArguments
@Data
public class BasicPublishArguments extends BasicArguments implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body;
}
BasicConsumeArguments
@Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;//这个类对应的方法中 还有一个参数 是回调函数 这个回调函数是不能网络传输的}
SubScribeReturns
- 这个不是参数,是返回值.是服务器给消费者推送的订阅消息.
- consumerTag其实是channelId.
- basicProperties 和 body 共同构成了Message
@Data
public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;
}
实现BrokerServer
/*** 本质上就是一个TCP服务器*/
@Slf4j
public class BrokerServer {private ServerSocket serverSocket = null;//当前考虑一个BrokerServer上只有一个虚拟主机private VirtualHost virtualHost = new VirtualHost("default");//使用这个哈希表 表示当前所有的会话//此处的key是channelId value为对应的socket对象private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<>();private ExecutorService executorService = null;//引入一个boolean变量 控制服务器是否继续运行private volatile boolean runnable = true;public BrokerServer(int port) throws IOException {serverSocket = new ServerSocket(port);}public void start() throws IOException {log.info("brokerServer 启动!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();//把处理连接的逻辑丢给这个线程池executorService.submit(() -> {processConnection(clientSocket);});}}catch (SocketException e){log.info("服务器停止运行");}}//停止服务器public void stop() throws IOException {runnable = false;//把线程池中的任务都放弃了 让线程都销毁executorService.shutdown();serverSocket.close();}//通过这个方法 来处理一个客户端的连接//在这一个方法中 可能会涉及到多个请求和响应private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {//需要按照特定格式解析 使用DataInputStream和DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {//1.读取请求并解析Request request = readRequest(dataInputStream);//2.根据请求计算响应Response response = process(request, clientSocket);//3.把响应写回客户端writeResponse(dataOutputStream, response);}} catch (EOFException e) {//对于这个代码 如果DataInputStream 如果读到EOF 就会抛出一个eofException异常//需要借助这个异常来结束循环log.info("connection关闭! 客户端的地址: {} : {}", clientSocket.getInetAddress().toString(), clientSocket.getPort());}} catch (ClassNotFoundException | MqException | IOException e) {log.info("connection 出现异常");e.printStackTrace();} finally {try {clientSocket.close();//一个tcp连接中 可能包含多个channel 需要把这个socket对应的所有channel也顺便清理掉clearCloseSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}//遍历上述sessions哈希表 把被关闭的socket对应的键值对删除private void clearCloseSession(Socket clientSocket) {List<String> toDeleteChannelId = new ArrayList<>();for(Map.Entry<String,Socket> entry : sessions.entrySet()){if(entry.getValue() == clientSocket){//不能一边遍历一边删除 此时影响结构影响遍历toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId){sessions.remove(channelId);}log.info("清理session完成 被清理的sessionId :{}",toDeleteChannelId);}//处理一次请求 返回一次响应private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {//先对request中的payload做一个初步的解析BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());log.info("rid:{} , channelId:{} , type:{}, length:{}", basicArguments.getRid(), basicArguments.getChannelId(),request.getType(), request.getLength());//根据type的值 来进一步区分接下来这次请求要干什么boolean ok = true;if (request.getType() == 0x1) {//创建channelsessions.put(basicArguments.getChannelId(), clientSocket);log.info("创建channel完成 channelId:{}", basicArguments.getChannelId());} else if (request.getType() == 0x2) {//销毁channelsessions.remove(basicArguments.getChannelId());log.info("销毁channel完成 channelId:{}", basicArguments.getChannelId());} else if (request.getType() == 0x3) {//创建交换机 此时payload是ExchangeDeclareArguments 对象了ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {//删除交换机ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {//创建队列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {//删除队列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDelete(arguments.getQueueName());} else if (request.getType() == 0x7) {//创建绑定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(),arguments.getBindingKey());} else if (request.getType() == 0x8) {//删除绑定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {//发布消息BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {//订阅消息BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {//这个回调函数要做的工作 就是把服务器收到的消息可以直接推送到对应的消费者客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {//先知道当前收到的消息要发给那个客户端//此处consumerTag 其实是channelId 根据channelId去sessions中查询 就可以得到相应的socket\对象 就可以发送数据//1.根据channelId找到 socket对象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokeServer] 订阅消息的客户端已经关闭");}//构造响应数据SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid("");//这里只有响应 没有请求 不需要ridsubScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();//0xc表示服务器给消费者客户端推送的消息数据response.setType(0xc);response.setLength(payload.length);response.setPayload(payload);//把数据写回 给客户端DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {//调用basicAck确认消息BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {//当前type是非法的throw new MqException("[BrokerServer] 未知的 type type:" + request.getType());}//构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);log.info("rid:{} , channelId:{}, type:{}, length:{}", basicReturns.getRid(), basicReturns.getChannelId(),response.getType(), response.getLength());return response;}private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());//刷新缓冲区dataOutputStream.flush();}private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("读取请求格式出错");}request.setPayload(payload);return request;}
}
- virtualHost 表⽰服务器持有的虚拟主机.队列,交换机,绑定,消息都是通过虚拟主机管理.
- sessions ⽤来管理所有的客⼾端的连接.记录每个客⼾端的socket.
- serverSocket 是服务器⾃⾝的socket
- executorService 这个线程池⽤来处理响应.
- runnable 这个标志位⽤来控制服务器的运⾏停⽌
启动/停止服务器
public void start() throws IOException {log.info("brokerServer 启动!");executorService = Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket = serverSocket.accept();//把处理连接的逻辑丢给这个线程池executorService.submit(() -> {processConnection(clientSocket);});}}catch (SocketException e){log.info("服务器停止运行");}}//停止服务器public void stop() throws IOException {runnable = false;//把线程池中的任务都放弃了 让线程都销毁executorService.shutdown();serverSocket.close();}
实现处理连接
对于EOFException和SocketException,我们视为客⼾端正常断开连接. ◦
如果是客⼾端先close,后调⽤DataInputStream的read,则抛出EOFException ◦
如果是先调⽤DataInputStream的read,后客⼾端调⽤close,则抛出SocketException
//通过这个方法 来处理一个客户端的连接//在这一个方法中 可能会涉及到多个请求和响应private void processConnection(Socket clientSocket) {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {//需要按照特定格式解析 使用DataInputStream和DataOutputStreamtry (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {//1.读取请求并解析Request request = readRequest(dataInputStream);//2.根据请求计算响应Response response = process(request, clientSocket);//3.把响应写回客户端writeResponse(dataOutputStream, response);}} catch (EOFException e) {//对于这个代码 如果DataInputStream 如果读到EOF 就会抛出一个eofException异常//需要借助这个异常来结束循环log.info("connection关闭! 客户端的地址: {} : {}", clientSocket.getInetAddress().toString(), clientSocket.getPort());}} catch (ClassNotFoundException | MqException | IOException e) {log.info("connection 出现异常");e.printStackTrace();} finally {try {clientSocket.close();//一个tcp连接中 可能包含多个channel 需要把这个socket对应的所有channel也顺便清理掉clearCloseSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}
实现readRequest
private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request = new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload = new byte[request.getLength()];int n = dataInputStream.read(payload);if (n != request.getLength()) {throw new IOException("读取请求格式出错");}request.setPayload(payload);return request;}
实现writeResponse
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());//刷新缓冲区dataOutputStream.flush();}
实现处理请求
- 先把请求转换成BaseArguments,获取到其中的channelId和rid
- 再根据不同的type,分别处理不同的逻辑.(主要是调⽤virtualHost中不同的⽅法).
- 针对消息订阅操作,则需要在存在消息的时候通过回调,把响应结果写回给对应的客⼾端.
- 最后构造成统⼀的响应
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {//先对request中的payload做一个初步的解析BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());log.info("rid:{} , channelId:{} , type:{}, length:{}", basicArguments.getRid(), basicArguments.getChannelId(),request.getType(), request.getLength());//根据type的值 来进一步区分接下来这次请求要干什么boolean ok = true;if (request.getType() == 0x1) {//创建channelsessions.put(basicArguments.getChannelId(), clientSocket);log.info("创建channel完成 channelId:{}", basicArguments.getChannelId());} else if (request.getType() == 0x2) {//销毁channelsessions.remove(basicArguments.getChannelId());log.info("销毁channel完成 channelId:{}", basicArguments.getChannelId());} else if (request.getType() == 0x3) {//创建交换机 此时payload是ExchangeDeclareArguments 对象了ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {//删除交换机ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {//创建队列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {//删除队列QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDelete(arguments.getQueueName());} else if (request.getType() == 0x7) {//创建绑定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(),arguments.getBindingKey());} else if (request.getType() == 0x8) {//删除绑定QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());} else if (request.getType() == 0x9) {//发布消息BasicPublishArguments arguments = (BasicPublishArguments) basicArguments;ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());} else if (request.getType() == 0xa) {//订阅消息BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {//这个回调函数要做的工作 就是把服务器收到的消息可以直接推送到对应的消费者客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {//先知道当前收到的消息要发给那个客户端//此处consumerTag 其实是channelId 根据channelId去sessions中查询 就可以得到相应的socket\对象 就可以发送数据//1.根据channelId找到 socket对象Socket clientSocket = sessions.get(consumerTag);if (clientSocket == null || clientSocket.isClosed()) {throw new MqException("[BrokeServer] 订阅消息的客户端已经关闭");}//构造响应数据SubScribeReturns subScribeReturns = new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid("");//这里只有响应 没有请求 不需要ridsubScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();//0xc表示服务器给消费者客户端推送的消息数据response.setType(0xc);response.setLength(payload.length);response.setPayload(payload);//把数据写回 给客户端DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() == 0xb) {//调用basicAck确认消息BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());} else {//当前type是非法的throw new MqException("[BrokerServer] 未知的 type type:" + request.getType());}//构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);log.info("rid:{} , channelId:{}, type:{}, length:{}", basicReturns.getRid(), basicReturns.getChannelId(),response.getType(), response.getLength());return response;}
实现clearClosedSessio
- 如果客⼾端只关闭了Connection,没关闭Connection中包含的Channel,也没关系,在这⾥统⼀进⾏清理.
- 注意迭代器失效问题
//遍历上述sessions哈希表 把被关闭的socket对应的键值对删除private void clearCloseSession(Socket clientSocket) {List<String> toDeleteChannelId = new ArrayList<>();for(Map.Entry<String,Socket> entry : sessions.entrySet()){if(entry.getValue() == clientSocket){//不能一边遍历一边删除 此时影响结构影响遍历toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId){sessions.remove(channelId);}log.info("清理session完成 被清理的sessionId :{}",toDeleteChannelId);}
实现客户端
创建包mqclient
创建 ConnectionFactory
⽤来创建连接的⼯⼚类.
@Data
public class ConnectionFactory {//broker server的ip地址private String host;//broker server的端口号private int port;public Connection newConnection() throws IOException {Connection connection = new Connection(host,port);return connection;}
}
Connection 和Channel的定义
⼀个客⼾端可以创建多个Connection.
⼀个Connection对应⼀个socket,⼀个TCP连接.
⼀个Connection可以包含多个Channel
Connection的定义
public class Connection {private Socket socket = null;//使用哈希表 把若干个channel对象组织起来private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();private InputStream inputStream;private OutputStream outputStream;private DataOutputStream dataOutputStream;private DataInputStream dataInputStream;private ExecutorService callbackPool = null;
}
封装请求响应读写操作
public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();log.info("发送请求 type:{} ,length:{}", request.getType(), request.getLength());}//读取响应public Response readResponse() throws IOException {Response response = new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload = new byte[response.getLength()];int n = dataInputStream.read(payload);if (n != response.getLength()) {throw new IOException("读取的响应数据不完整");}response.setPayload(payload);log.info("收到响应 type:{}, length:{}", response.getType(), response.getLength());return response;}
创建channel
//创建Channelpublic Channel createChannel() throws IOException {String channelId = "C-" + UUID.randomUUID();Channel channel = new Channel(channelId, this);//把这个 Channel对象 放到Connection 管理channel 的哈希表中channelMap.put(channelId, channel);//同时也需要把创建channel这个消息告诉服务器boolean ok = channel.createChannel();if (!ok) {//整个这次创建channel操作不顺利//把哈希表中的键值对删除channelMap.remove(channelId);return null;}return channel;}
Channel的定义
public class Channel {private String channelId;//当前channel属于哪个连接private Connection connection;//记录后续客户端收到的服务器的响应private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();//如果当前Channel订阅了某个队列 就需要在此处记录下对应回调是啥,当该队列的消息返回来的时候,调用回调//此处约定一个Channel中只能有一个回调private Consumer consumer = null;public Channel(String channelId, Connection connection) {this.channelId = channelId;this.connection = connection;}
}
- channelId 为channel的⾝份标识,使⽤UUID标识.
- Connection 为channel对应的连接.
- baseReturnsMap ⽤来保存响应的返回值.放到这个哈希表中⽅便和请求匹配.
- consumer为消费者的回调(⽤⼾注册的).对于消息响应,应该调⽤这个回调处理消息.
创建channel
//在这个方法中 和服务器进行交互 告诉服务器 此时客户端创建了新的channelpublic boolean createChannel() throws IOException {//对于创建Channel操作来说 payload就是一个basicArguments对象BasicArguments arguments = new BasicArguments();arguments.setChannelId(channelId);arguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(arguments);//构造type = 0x1的对象Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//等待服务器的响应BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
实现generateRid
private String generateRid() {return "R-" + UUID.randomUUID();}
实现waitResult
- 由于服务器的响应是异步的.此处通过waitResult实现同步等待的效果
private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while ((basicReturns = basicReturnsMap.get(rid)) == null) {//如果查询结果为null 说明包裹还没有回来//此时就需要阻塞等待synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}//读取成功后 把消息从哈希表中删除掉basicReturnsMap.remove(rid);return basicReturns;}
关闭channel
//关闭channel 发送type = 0x2public boolean close() throws IOException {BasicArguments basicArguments = new BasicArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}
创建交换机
//创建交换机public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());return basicReturns.isOk();}
删除交换机
//删除交换机public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();exchangeDeleteArguments.setRid(generateRid());exchangeDeleteArguments.setChannelId(channelId);exchangeDeleteArguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(exchangeDeleteArguments.getRid());return basicReturns.isOk();}
创建队列
//创建队列public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException {QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setRid(generateRid());queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(queueDeclareArguments);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());return basicReturns.isOk();}```#### 删除队列```java
//删除队列public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments arguments = new QueueDeleteArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
创建绑定
//创建绑定public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {QueueBindArguments arguments = new QueueBindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);arguments.setBindingKey(bindingKey);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
删除绑定
//解除绑定public boolean queueUnbind(String queueName, String exchangeName) throws IOException {QueueUnbindArguments arguments = new QueueUnbindArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
发送消息
//发送消息public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublishArguments arguments = new BasicPublishArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setExchangeName(exchangeName);arguments.setBasicProperties(basicProperties);arguments.setRoutingKey(routingKey);arguments.setBody(body);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
订阅消息
//订阅消息public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {//先设置回调if (this.consumer != null) {throw new MqException("该channel已经设置过消费信息的回调了 不能重复设置");}this.consumer = consumer;BasicConsumeArguments arguments = new BasicConsumeArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setConsumerTag(channelId);//consumerTag 也是用channelId来表示了arguments.setQueueName(queueName);arguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
确认消息
//确认消息public boolean basicAck(String queueName, String messageId) throws IOException {BasicAckArguments arguments = new BasicAckArguments();arguments.setRid(generateRid());arguments.setChannelId(channelId);arguments.setQueueName(queueName);arguments.setMessageId(messageId);byte[] payload = BinaryTool.toBytes(arguments);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(arguments.getRid());return basicReturns.isOk();}
处理响应
创建扫描线程
创建⼀个扫描线程,⽤来不停的读取socket中的响应数据
注意:⼀个Connection中可能包含多个channel,需要把响应分别放到对应的channel中
public Connection(String host, int port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);//创建一个扫描线程 由这个线程负责不停的对socket中读取响应数据 把这个响应数据再交给对应的channel负责处理Thread t = new Thread(() -> {try {while (!socket.isClosed()) {Response response = readResponse();dispatchResponse(response);}} catch (SocketException e) {//连接正常断开 忽略这个异常log.info("连接正常断开");} catch (IOException | ClassNotFoundException | MqException e) {log.info("连接异常断开");e.printStackTrace();}});t.start();}
实现响应的分发
给Connection创建dispatchResponse⽅法
//使用这个方法 分别来处理//这个消息是 针对控制请求的响应 还是服务器推送的消息private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() == 0xc) {//服务器推送来的消息数据SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());//根据channelId 找到对应的channel对象Channel channel = channelMap.get(subScribeReturns.getChannelId());if (channel == null) {throw new MqException("[Connect] 该消息对应的channel 在客户端中不存在 channelId = " + channel.getChannelId());}//执行该channel对象内部的回调callbackPool.submit(() -> {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {//当前响应是针对刚才的控制请求的响应BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());//把这个结果放在对应的channel的hash表中Channel channel = channelMap.get(basicReturns.getChannelId());if (channel == null) {throw new MqException("[Connection] 该队列对应的channel在客户端中不存在");}channel.putReturns(basicReturns);}}
public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(),basicReturns);synchronized (this){//当前也不知道多少线程在等待上述的这个响应//把所有的等待线程都唤醒notifyAll();}}
关闭Connection
//关闭channel 发送type = 0x2public boolean close() throws IOException {BasicArguments basicArguments = new BasicArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}