文章目录
- 一、持久化存储的方式与路径
- 二、公共模块
- 序列化 / 反序列化
- 异常规定
- 三、持久化存储
- 数据库数据管理
- 文件数据管理
- 读写规定
- 新增 /删除规定
- 内存中 Message 的规定
- 存储规定
- 代码编写
- 硬盘数据管理
一、持久化存储的方式与路径
交换机,队列,绑定关系,这些我们使用数据库来管理,
而 Message 消息 并不会涉及到复杂的增删改查操作.且 消息 的数量可能会非常多,数据库的访问效率并不高
因此在Message持久化的存储,我们不存储在数据库中,我们直接存储到文件中.
- 我们来规定一下数据存储的文件及路径
二、公共模块
序列化 / 反序列化
上面提到了,消息存储到文件中,那么存储到文件中,就要将 Message 对象 序列化成二进制数据,再存储到文件中,而读取消息时,就要将二进制数据反序列化成 Message 对象.
因此咱们先去写公共模块的 序列化方法与反序列化方法 .
创建一个 common 包,再创建一个 BinaryTool类 来写序列化 / 反序列化方法
public class BinaryTool {// 要用 Java 标准库提供的流对象完成 序列化/反序列化 操作,一定要继承 Serializable 接口,不然运行时,会抛出异常// 把一个对象序列化成一个字节数组public static byte[] toBytes(Object object) throws IOException {// ByteArrayOutputStream 这个流对象是相当于一个中间过渡,解决无法确定数组长度的问题try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {// ObjectOutputStream 这个流对象是 Java 标准库提供的序列化的流对象,将过渡流对象绑定到序列化流对象中,最后直接转换成数组返回try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)) {// 将对象进行序列化后,存储到绑定的流对象中 即 过渡流对象objectOutputStream.writeObject(object);// 序列化完成后,存储到了绑定的流对象中,然后再通过绑定的流对象转换成数组返回}return byteArrayOutputStream.toByteArray();}}// 把一个字节数组反序列化成一个对象public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object = null;// ByteArrayInputStream 这个流对象直接绑定 data 数组,相当于直接从 data 数组中取数据try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) {// ObjectInputStream 这个流对象是 Java 标准库提供的反序列化的流对象,绑定中间流对象,从中间流对象取数据try (ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {// readObject方法,从中间流对象取数据,中间流对象从 data 数组取数据,=》 将 data 数组 反序列化成对象object = objectInputStream.readObject();}}return object;}
}
异常规定
对文件进行读写操作,可能会出现意料之外的情况,因此我们自定义一个异常,来针对咱们这个程序中出现意料之外的情况时.
public class MqException extends Exception{public MqException(String message) {super(message);}
}
三、持久化存储
数据库数据管理
MySQL数据库本身就比较重量,
此处为了使用方便,我们使用更轻量的数据库 SQLite,
在Java中使用SQLite,不需要额外安装,只需要引入依赖即可.(记得去maven找到与你jdk相匹配的版本)
SQLite,只是一个本地的数据库,这个数据库相当于直接操作本地硬盘文件,
因此需要在配置文件中配置好数据库文件的路径
使用 yaml格式的配置文件
创建一个 mapper包,通过里面的接口来操作数据库
/*** 使用这个类来操作数据库*/
@Mapper // 注入到容器中
public interface MetaMapper {// 提供三个核心建表方法,create 可以直接使用 update代替@Update("create table if not exists exchange (name varchar(50) primary key,type int,durable boolean,autoDelete boolean,arguments varchar(1024))")void createExchangeTable();@Update("create table if not exists queue(name varchar(50) primary key,durable boolean,exclusive boolean,autoDelete boolean,arguments varchar(1024))")void createQueueTable();@Update("create table if not exists binding(exchangeName varchar(50),queueName varchar(50),bindingKey varchar(256))")void createBindingTable();// 针对上述三个表,进行 增加删除查找 操作@Insert("insert into exchange values (#{name},#{type},#{durable},#{autoDelete},#{arguments})")void insertExchange(Exchange exchange);@Delete("delete from exchange where name = #{exchangeName}")void deleteExchange(String exchangeName);@Select("select * from exchange")List<Exchange> selectAllExchanges();@Insert("insert into queue values (#{name},#{durable},#{exclusive},#{autoDelete},#{arguments})")void insertQueue(MSGQueue queue);@Delete("delete from queue where name = #{queueName}")void deleteQueue(String queueName);@Select("select * from queue")List<MSGQueue> selectAllQueues();@Insert("insert into binding values (#{exchangeName},#{queueName},#{bindingKey})")void insertBinding(Binding binding);@Delete("delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName}")void deleteBinding(Binding binding);@Select("select * from binding")List<Binding> selectAllBindings();
}
我们再创建一个 datacenter包,通过里面的类来 整合对硬盘与内存上数据的管理 ,
首先来整合数据库操作,
注意了!此时我并不想将 DataBaseManager 这个类的控制权交给容器,
因此我就不给这个类加 类注解,所以在这里也没办法用 @Autowired依赖注入得到这个类,
因此我们需要用 依赖查找 得到这个类.故而需要给项目启动类 增添 一个 静态属性:容器上下文 ,
我直接贴出代码,代码中有详细注释!
/*** 通过这个类,来整合数据库操作*/
public class DataBaseManager {private MetaMapper metaMapper;// 数据库的初始化方法public void init() {// 通过启动类中的 上下文对象,进行依赖查找给 metaMapper 赋值metaMapper = MessageQueueApplication.context.getBean(MetaMapper.class);if (!checkDBExists()) {// 如果数据库不存在,则建库建表,构造默认数据// 创建 data 目录File dataDir = new File("./data");dataDir.mkdirs();createTable();createDefaultData();System.out.println("[DataBaseManger]数据库初始化完成");} else {// 如果数据库存在(有表有数据),不做任何操作System.out.println("[DataBaseManger]数据库已存在");}}// 删除数据库文件public void deleteDB() {File file = new File("./data/meta.db");if (file.delete()) {System.out.println("[DataBaseManager] 删除数据库文件成功");} else {System.out.println("[DataBaseManager] 删除数据库文件失败");}// 删除数据库目录(一定要先删除文件,目录是空的才能删除成功)File dataDir = new File("./data");if (dataDir.delete()) {System.out.println("[DataBaseManager] 删除数据库目录成功");} else {System.out.println("[DataBaseManager] 删除数据库目录失败");}}// 判断数据库是否存在private boolean checkDBExists() {File file = new File("./data/meta/.db");return file.exists();}// 建表方法// 建库操作并不需要手动执行,(不需要手动创建 meta.db 文件)// 首次执行这里的数据库操作时,就会自动创建 meta.db 文件(MyBatis 完成的)private void createTable() {metaMapper.createBindingTable();metaMapper.createExchangeTable();metaMapper.createQueueTable();System.out.println("[DataBaseManger]创建表完成");}// 给数据库表中,添加默认数据// 此处主要是添加一个默认的交换机// RabbiMQ 里有一个这样的设定,带有一个 匿名 的交换机,类型是 DIRECTprivate void createDefaultData() {Exchange exchange = new Exchange();exchange.setName("");exchange.setType(ExchangeType.DIRECT);exchange.setDurable(true);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);System.out.println("[DataBaseManger]插入初始数据完成");}// 将其他数据库操作,也在这个类中封装成方法public void insertExchange(Exchange exchange) {metaMapper.insertExchange(exchange);}public void deleteExchange(String exchangeName) {metaMapper.deleteExchange(exchangeName);}public List<Exchange> selectAllExchanges() {return metaMapper.selectAllExchanges();}public void insertQueue(MSGQueue queue) {metaMapper.insertQueue(queue);}public void deleteQueue(String queueName) {metaMapper.deleteQueue(queueName);}public List<MSGQueue> selectAllQueues() {return metaMapper.selectAllQueues();}public void insertBinding(Binding binding) {metaMapper.insertBinding(binding);}public void deleteBinding(Binding binding) {metaMapper.deleteBinding(binding);}public List<Binding> selectAllBindings() {return metaMapper.selectAllBindings();}
}
文件数据管理
读写规定
在上面已经规定好了文件的存储路径,每个队列的消息都放在 ./data/队列名 下.
大家想,文件里所有的消息都是二进制数据,因此文件中都是二进制数据,没办法找到消息之间有效的界限.
故而我们规定,写入文件的 Message对象 格式如下
在每个 Message 对象 的二进制数据前,加一个 大小为 四个字节 的整数 来描述 Message二进制数据的长度.
先读取 四个字节 获得 Message 的 长度,再读取该长度个 字节.
这样就可以使每个 Message 的二进制数据有一个可读界限.
在写入的时候只需写入下面的这两个 属性,只有这三个属性,才是 Message 对象的 核心属性,其他的属性都是为管理内存中的 Message而设置的,不需要写入到文件中.
新增 /删除规定
对于 消息来说,是需要频繁的新增和删除的,
当生产者 生产新的消息,就需要新增,
当消费者 消费了一个消息,就需要删除这个消息.
对于新增来说,比较简单,可以直接写到文件末尾,
但是对于删除来说,就比较复杂了,因为要删除的消息 不一定就是开头或末尾的消息,
文件类似于一个顺序表的结构,如果删除中间的消息,就需要将后面的消息向前搬运,
因此我们使用 逻辑删除 ,即使用 isValid 这个属性来表示该消息 是否有效.
内存中 Message 的规定
上面说到了使用 逻辑删除 来让一个消息失效,那么在要如何在文件中找到这个要删除的消息呢?
所以我们设置了 下面这两个属性
根据这两个属性,去读取对应的字节.
存储规定
针对逻辑删除,
我们可以队列文件中,再创建两个文件,
queue_data.txt(消息信息文件) 用来存储所有消息的本体,
queue_stat.txt(消息统计文件) 用来存储一行数据,这一行数据只有两个数字,用 "/t"分隔
所有消息的数量,
有效消息的数量.
代码编写
咱们再创建一个类,来集中管理消息.
/*** 通过这个类,来针对硬盘上的消息管理*/
public class MessageFileManager {public void init() {// 暂时不需要做啥额外的初始化工作, 以备后续扩展}// 定义一个内部类,来表示该队列的统计信息// 优先考虑使用 static,静态内部类static public class Stat {// 此处直接定义成 public,就不再搞 get set 方法了// 对于这样简单的类,就直接使用成员,类似于 C 的结构体了public int totalCount; // 总消息数量public int validCount; // 有效消息数量}// 约定消息文件所在的目录和文件名// 这个方法用来获取指定队列对应的消息文件所在路径private String getQueueDir(String queueName) {return "./data/" + queueName;}// 这个方法用来获取该队列的消息数据文件路径// 注意,二进制文件,使用 txt 作为后缀,不太合适,txt 一般表示文本(.bin或.dat 较为合适)private String getQueueDataPath(String queueName) {return getQueueDir(queueName) + "/queue_data.txt";}// 这个方法用来获取该队列的消息统计文件路径private String getQueueStatPath(String queueName) {return getQueueDir(queueName) + "/queue_stat.txt";}// 读取该队列的消息统计文件private Stat readStat(String queueName) {Stat stat = new Stat();try (InputStream inputStream = new FileInputStream(getQueueStatPath(queueName))) {Scanner scanner = new Scanner(inputStream);stat.totalCount = scanner.nextInt();stat.validCount = scanner.nextInt();return stat;} catch (IOException e) {e.printStackTrace();}return null;}// 向该队列的消息统计文件中写入数据private void writeStat(String queueName,Stat stat) {// 使用 PrintWrite 来写文件// OutputStream 打开文件,默认情况下,会直接把原文件清空,此时相当于新的数据覆盖了旧的try (OutputStream outputStream = new FileOutputStream(getQueueStatPath(queueName))) {PrintWriter printWriter = new PrintWriter(outputStream);printWriter.write(stat.totalCount + "\t" + stat.validCount);printWriter.flush();} catch (IOException e) {e.printStackTrace();}}// 创建队列对应的文件和目录public void createQueueFiles(String queueName) throws IOException {// 1.先创建队列对应的消息目录File baseDir = new File(getQueueDir(queueName));if (!baseDir.exists()) {// 不存在则创建boolean ok = baseDir.mkdirs();if (!ok) {throw new IOException("创建队列对应的消息目录失败!baseDir=" + baseDir.getAbsolutePath());}}// 2.创建队列对应的 data 文件File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {// 不存在则创建boolean ok = queueDataFile.createNewFile();if (!ok) {throw new IOException("创建该队列的消息数据文件失败!queueDataFile=" + queueDataFile.getAbsolutePath());}}// 3.创建队列对应的 stat 文件File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {boolean ok = queueStatFile.createNewFile();if (!ok) {throw new IOException("创建该队列的消息统计文件失败!queueStatFile="+queueStatFile.getAbsolutePath());}}// 4.给消息统计文件设置初始值Stat stat = new Stat();stat.totalCount = 0;stat.validCount = 0;writeStat(queueName,stat);}// 删除队列的目录和文件// 队列可以被删除,当队列删除后,对应的消息文件等,自然也要随之删除public void destroyQueueFiles(String queueName) throws IOException {// 先删除文件再删除目录File queueDataFile = new File(getQueueDataPath(queueName));boolean ok1 = queueDataFile.delete();File queueStatFile = new File(getQueueStatPath(queueName));boolean ok2 = queueStatFile.delete();File baseDir = new File(getQueueDir(queueName));boolean ok3 = baseDir.delete();if (!ok1 || !ok2 || !ok3) {// 有任意一个删除失败,都算整体失败,throw new IOException("删除队列目录和文件失败!baseDir="+getQueueDir(queueName));}}// 检查文件的目录和文件是否存在// 后续有生产者给 broker server 生产消息了,这个消息就可能需要记录到文件上(取决是否要持久化)// 在记录之前就要先检查文件是否存在public boolean checkFilesExits(String queueName) {// 判断队列和消息文件是否都存在!File queueDataFile = new File(getQueueDataPath(queueName));if (!queueDataFile.exists()) {return false;}File queueStatFile = new File(getQueueStatPath(queueName));if (!queueStatFile.exists()) {return false;}return true;}// 将新消息写入到对应的队列的数据文件并更新统计文件的方法// queue 表示要对应的队列,message 表示要写入的消息public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {// 1.检查一下当前要写入的队列对应的文件是否存在if (!checkFilesExits(queue.getName())) {throw new MqException("[MessageFileManager] 队列对应的文件不存在!queueName=" + queue.getName());}// 2.把要写入的 message 序列化byte[] messageBinary = BinaryTool.toBytes(message);// 写入消息时,可能会与其他线程对该队列的操作产生线程安全问题,因此针对这个 队列加锁// (如其他线程此时也向该队列写入消息,那么此时offsetBeg与offsetEnd就可能是不准确的,那么后续操作就也可能会出现BUG)synchronized (queue) {// 3.先获取当前队列的文件长度,计算 message 的 offsetBeg 和 offsetEnd// 把新的Message数据写入到队列数据文件的末尾,// 此时 offsetBeg = 文件长度 + 4,offsetEnd = 文件长度 + 4 + message长度File queueDataFile = new File(getQueueDataPath(queue.getName()));message.setOffsetBeg(queueDataFile.length() + 4);message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length);// 4.写入消息到数据文件,注意,是写入到文件末尾 第二个属性,一定要写true,不然会直接覆盖掉之前的数据try (OutputStream outputStream = new FileOutputStream(queueDataFile,true)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){// 先写入当前消息的长度,占据 4 个字节,writeInt()方法,写入的整形占四个字节dataOutputStream.writeInt(messageBinary.length);// 将数据本体写入到队列数据文件中outputStream.write(messageBinary);}}// 5.更新消息统计文件Stat stat = readStat(queue.getName());stat.totalCount += 1;stat.validCount += 1;writeStat(queue.getName(),stat);}}// 删除消息的方法public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException {// 此处的删除指的是逻辑上的删除,即将 isValid 由 0x1 改为0x0// 1.先把文件中对应的 [offsetBeg,offsetEnd) 的 消息读出来// 2.然后修改 isValid 为 0x0// 3.再将消息写回对应的 [offsetBeg,offsetEnd) 位置// 删除消息时,可能会与其他线程对该队列的操作产生线程安全问题,因此针对这个 队列加锁// (如在删除消息时,进行了gc操作,那么就很有可能导致,这个被逻辑删除的消息,覆盖其他消息,且也有可能会覆盖正确的统计文件)synchronized (queue) {// RandomAccessFile 这个流,可以自由移动光标(读取位置), 第二个参数 "rw"代表读写try (RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")) {// 1.先从文件中读取 messagebyte[] bufferSrc = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];// 移动光标randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 2.把读取出来的数据 反序列化 成 MessageMessage diskMessage = (Message) BinaryTool.fromBytes(bufferSrc);// 3.把 isValid 设置成无效diskMessage.setIsValid((byte) 0x0);// 4.将修改完成后的 message 序列化成二进制数据byte[] bufferDest = BinaryTool.toBytes(diskMessage);// 5.重新移动光标randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);}// 更新 统计文件 stat,逻辑删除了一个消息,有效消息数量 validCount 需要 -1Stat stat = readStat(queue.getName());if (stat.validCount > 0) {stat.validCount -= 1;}writeStat(queue.getName(), stat);}}// 使用这个方法, 从文件中, 读取出所有的消息内容, 加载到内存中(具体来说是放到一个链表里)// 这个方法, 准备在程序启动/重启时, 进行调用.// 这里使用一个 LinkedList, 主要目的是为了后续进行头删操作.// 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象. 因为这个方法不需要加锁, 只使用 queueName 就够了.// 由于该方法是在程序启动时调用, 此时服务器还不能处理请求呢~~ 不涉及多线程操作文件.public LinkedList<Message> loadAllMessageFromQueue (String queueName) throws IOException, ClassNotFoundException, MqException {LinkedList<Message> messages = new LinkedList<>();try (InputStream inputStream = new FileInputStream(getQueueDataPath(queueName))){// DataInputStream 提供了 readInt()方法,会读取四个字节的然后转化为整数,普通的 read()方法只会读取一个字节就转化为整数了try (DataInputStream dataInputStream = new DataInputStream(inputStream)){// 记录光标位置// 由于当下使用的 DataInputStream 并不方便直接获取到文件光标位置因此就需要手动记录下文件光标.long currentOffset = 0;while (true) {// 1.读取每个消息前用来记录消息长度的 4 个字节大小的 int// readInt()方法,会读取 4 个字节的大小 的intint messageSize = dataInputStream.readInt();currentOffset += 4;// 2.按照 messageSize 读取对应的字节数byte[] buffer = new byte[messageSize];int actualSize = dataInputStream.read(buffer);if (messageSize != actualSize) {// 如果不匹配, 说明文件有问题, 格式错乱了!!throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName);}// 3.将读取的二进制数据 反序列化成 对象Message message = (Message) BinaryTool.fromBytes(buffer);// 4.判断该数据是否有效if (message.getIsValid() == 0x1) {// 设置 message 的 offsetBeg 与 offsetEndmessage.setOffsetBeg(currentOffset);currentOffset += messageSize;message.setOffsetEnd(currentOffset);// 5.填加到链表中messages.add(message);} else {// 数据无效也要更新光标位置currentOffset += messageSize;}}} catch (EOFException e) {// 这个 catch 并非真是处理 "异常", 而是处理 "正常" 的业务逻辑. 文件读到末尾, 会被 readInt 抛出该异常.// 这个 catch 语句中也不需要做啥特殊的事情System.out.println("[MessageFileManager] 恢复 Message 数据完成!");}}return messages;}// 检查当前是否需要针对该队列的消息数据文件进行 GC(无效消息回收)public boolean checkGC(String queueName) {Stat stat = readStat(queueName);if (stat.totalCount > 2000 && (double)(stat.validCount / stat.totalCount) < 0.5) {return true;}return false;}// 获取 GC 复制算法的 新文件路径private String getQueueDataNewPath (String queueName) {return getQueueDir(queueName) + "/queue_data_new.txt";}// 通过这个方法, 真正执行消息数据文件的垃圾回收操作.// 使用复制算法来完成.// 创建一个新的文件, 名字就是 queue_data_new.txt// 把之前消息数据文件中的有效消息都读出来, 写到新的文件中.// 删除旧的文件, 再把新的文件改名回 queue_data.txt// 同时要记得更新消息统计文件.public void gc(MSGQueue queue) throws IOException, MqException, ClassNotFoundException {// 进行 gc 的时候,是针对消息数据文件进行大洗牌,在这个过程中,其他线程不能针对该队列的消息文件做任何修改synchronized (queue) {// 由于 gc 操作可能比较耗时, 此处统计一下执行消耗的时间.long gcBeg = System.currentTimeMillis();// 1.创建新数据文件File queueDataNewFile = new File(getQueueDataNewPath(queue.getName()));boolean ok = queueDataNewFile.exists();if (ok) {throw new MqException("[MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在! queueName="+ queue.getName());}ok = queueDataNewFile.createNewFile();if (!ok) {throw new MqException("[MessageFileManager] gc 时创建新文件失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath());}// 2.获取原数据文件中的有效消息,并写入到新数据文件中LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName());try (OutputStream outputStream = new FileOutputStream(queueDataNewFile)) {try (DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer = BinaryTool.toBytes(message);// 先写入记录 消息长度 的占 4 个字节的 intdataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}// 3.删除原数据文件File queueDataOldFile = new File(getQueueDataPath(queue.getName()));ok = queueDataOldFile.delete();if (!ok) {throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 4.将新数据文件重命名ok = queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()+ ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath());}// 5.更新统计文件Stat stat = readStat(queue.getName());stat.totalCount = messages.size();stat.validCount = messages.size();writeStat(queue.getName(),stat);long gcEnd = System.currentTimeMillis();System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time="+ (gcEnd - gcBeg) + "ms");}}
}
硬盘数据管理
咱们再创建一个类, 集中管理 数据库数据与文件数据 .
/*** 使用这个类来管理所有硬盘上的数据* 1. 数据库: 交换机,绑定,队列* 2. 数据文件: 消息* 上层逻辑如果需要操作硬盘,统一都通过这个类来使用,(上层代码不关心当前数据是存储在数据库还是文件中的)*/
public class DiskDataCenter {// 这个实例用来管理数据库private DataBaseManager dataBaseManager = new DataBaseManager();// 这个实践用来管理数据文件中的数据private MessageFileManager messageFileManager = new MessageFileManager();public void init() {// 针对上述两个实例进行初始化dataBaseManager.init();// 当前 messageFileManager.init() 方法,是空的,只是先放在这里,方便后续扩展messageFileManager.init();}// 封装交换机操作public void insertExchange(Exchange exchange) {dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName) {dataBaseManager.deleteExchange(exchangeName);}public List<Exchange> selectAllExchanges() {return dataBaseManager.selectAllExchanges();}// 封装队列操作public void insertQueue(MSGQueue queue) throws IOException {dataBaseManager.insertQueue(queue);// 创建队列的同时, 不仅仅是把队列对象写到数据库中, 还需要创建出对应的目录和文件messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);// 删除队列的同时, 不仅仅是把队列从数据库中删除, 还需要删除对应的目录和文件messageFileManager.destroyQueueFiles(queueName);}public List<MSGQueue> selectAllQueues() {return dataBaseManager.selectAllQueues();}// 封装绑定操作public void insertBinding(Binding binding) {dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding) {dataBaseManager.deleteBinding(binding);}public List<Binding> selectAllBindings() {return dataBaseManager.selectAllBindings();}// 封装消息方法public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);if (messageFileManager.checkGC(queue.getName())) {messageFileManager.gc(queue);}}public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}
}