目录
需求分析
内存管理
实现 MemoryDataCenter 类
封装交换机操作
封装队列操作
封装绑定操作
封装消息操作
封装未确认消息操作
封装恢复数据操作
关于线程安全
针对 MemoryDataCenter 单元测试
需求分析
- 当前我们已经实现了 数据库管理 交换机、绑定、队列,数据文件管理 消息
- 即已经实现在硬盘上存储上述数据
注意:
- 对于 MQ 来说,内存存储数据为主,硬盘存储数据为辅
- 硬盘数据主要是为了能够进行持久化保存,重启之后,数据不丢失
内存管理
- 交换机:使用 ConcurrentHashMap,其中 key 为 name,value 为 Exchange 对象
// key 是 exchangeName,value 是 exchange 对象private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>();
- 队列:使用 ConcurrentHashMap,其中 key 为 name,value 为 MSGQueue 对象
// key 是 queueName,value 是 MSGQueue 对象private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>();
- 绑定:使用嵌套 ConcurrentHashMap
- key1 为 exchangeName,value1 为 ConcurrentHashMap
- key2 为 queueName,value2 为 Binding 对象
// 第一个 key 是 exchangeName,第二个 key 是 queueNameprivate ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>();
- 消息:使用 ConcurrentHashMap,其中 key 为 messageId,Value 为 Message 对象
// key 是 messageId,value 是 Message 对象private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();
- 队列和消息之间的关联(每个队列中有哪些消息):使用 ConcurrentHashMap
- key 为 queueName,value 为 LinkedList
- LinkedList 中的每个元素为 Message 对象
// key 是 queueName,value 是一个 Message 的链表private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();
- "未被确认" 消息(用来存储当前队列中哪些消息被消费者取走了,但还未应答):
- 使用嵌套 ConcurrentHashMap
- key1 为 queueName,value1 为 ConcurrentHashMap
- key2 为 messageId,value 为 Message 对象
// 第一个 key 是 queueName,第二个 key 是 MessageIdprivate ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();
注意点一:
- 消息 对应的 ConcurrentHashMap 中的 value 为 Message 对象
- 此处 value 中存的是引用,由引用指向 Message 对象
- 所以此处 value 中存的 Message 对象并没有占用重复的空间
- 队列和消息之间的关联 对应的 LinkedList 中的元素 Message 对象同理
- "未被确认" 消息 对应的 value2 中的 Message 对象同理
注意点二:
- 后续实现消息确认逻辑,需要根据 ACK 响应的内容,该内容中包含一个确认的 messageId
- 然后再根据这个 messageId 来把上述结构中的 Message 对象找到并移除
注意点三:
- 此处实现的 MQ 支持两种应答模式(ACK)
- 自动应答:消费者取了元素,该消息就被应答了,此时该消息就可以被干掉了
- 手动应答:消费者取了元素之后,该消息还不算被应答,需消费者主动再调用一个 basicAck 方法,此时才认为是真正应答了,才能删除该消息
注意点四:
- 此处我们主要使用 ConcurrentHashMap 来进行内存管理,其主要是为了线程安全
实现 MemoryDataCenter 类
封装交换机操作
- 针对 exchangeMap 进行插入、获取、删除
public void insertExchange(Exchange exchange) {exchangeMap.put(exchange.getName(),exchange);System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName = " + exchange.getName());}public Exchange getExchange(String exchangeName) {return exchangeMap.get(exchangeName);}public void deleteExchange(String exchangeName) {exchangeMap.remove(exchangeName);System.out.println("[MemoryDataCenter] 交换机删除成功! exchangeName = " + exchangeName);}public void insertQueue(MSGQueue queue) {queueMap.put(queue.getName(),queue);System.out.println("[MemoryDataCenter] 新队列添加成功!queueName = " + queue.getName());}
封装队列操作
- 针对 queueMap 进行插入、获取、删除
public void insertQueue(MSGQueue queue) {queueMap.put(queue.getName(),queue);System.out.println("[MemoryDataCenter] 新队列添加成功!queueName = " + queue.getName());}public MSGQueue getQueue(String queueName) {return queueMap.get(queueName);}public void deleteQueue(String queueName) {queueMap.remove(queueName);System.out.println("[MemoryDataCenter] 队列删除成功!queueName = " + queueName);}
封装绑定操作
- 针对 bindingsMap 进行插入、获取、删除
public void insertBinding(Binding binding) throws MqException { // 先使用 exchangeName 查一下,对应的哈希表是否存在,不存在就创建一个 // ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName()); // if(bindingMap == null) { // bindingMap = new ConcurrentHashMap<>(); // bindingsMap.put(binding.getExchangeName(),bindingMap); // } // 改行代码等价于 上方注释掉的代码段ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());// 针对数据进行进一步的插入,根据 queueName 查一下,如果已经存在,就抛出异常,不存在才能插入synchronized (bindingMap) {if(bindingMap.get(binding.getQueueName()) != null){throw new MqException("[MemoryDataCenter] 绑定已经存在!exchangeName = " + binding.getExchangeName() +", queueName = " + binding.getQueueName());}bindingMap.put(binding.getQueueName(),binding);}System.out.println("[MemoryDataCenter] 新绑定添加成功!exchangeName = " + binding.getExchangeName() +", queueName = " + binding.getQueueName());}// 获取绑定,写两个版本: // 1、根据 exchangeName 和 queueName 确定唯一一个 Binding // 2、根据 exchangeName 获取到所有的 Bindingpublic Binding getBinding(String exchangeName,String queueName) {ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(exchangeName);if(bindingMap == null) {return null;}return bindingMap.get(queueName);}public ConcurrentHashMap<String,Binding> getBindings(String exchangeName) {return bindingsMap.get(exchangeName);}public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());if(bindingMap == null) { // 该交换机没有绑定任何队列,报错throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName = " + binding.getExchangeName() +", queueName = " + binding.getQueueName());}bindingMap.remove(binding.getQueueName());System.out.println("[MemoryDataCenter] 绑定删除成功!exchangeName = " + binding.getExchangeName() +", queueName = " + binding.getQueueName());}
封装消息操作
- 针对 messageMap 进行添加、查询、删除
- 针对 queueMessageMap 实现 发送消息到指定队列、从指定队列获取消息、获取指定队列消息个数
// 添加消息public void addMessage(Message message) {messageMap.put(message.getMessageId(), message);System.out.println("[MemoryDataCenter] 新消息添加成功! messageId = " + message.getMessageId());}// 根据 id 查询消息public Message getMessage(String messageId) {return messageMap.get(messageId);}// 根据 id 删除消息public void removeMessage(String messageId) {messageMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息被移除! messageId = " + messageId);}// 发送消息到指定队列public void sendMessage(MSGQueue queue,Message message) { // 把消息放到对应的队列数据结构中 // 先根据队列的名字,找到该队列对应的消息链表 // LinkedList<Message> messages = queueMessageMap.get(queue.getName()); // if(messages == null) { // messages = new LinkedList<>(); // queueMessageMap.put(queue.getName(),messages); // }LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>()); // 再把数据加到 messages 里面synchronized (messages) {messages.add(message);} // 在这里把该消息也往消息中心插入一下 // 假设如果 message 已经在消息中心存在,重复插入也没关系 // 主要就是相同 messageId,对应的 message 的内容一定是一样的(服务器代码不会对 Message 内容做修改 basicProperties 和 body)addMessage(message);System.out.println("[MemoryDataCenter] 消息被投递到队列中! messageId = " + message.getMessageId());} // 从队列中取消息public Message pollMessage(String queueName) { // 根据队列名,查找一下,对应的队列的消息链表LinkedList<Message> messages = queueMessageMap.get(queueName); // 如果没有找到,说明队列中没有任何消息if(messages == null){return null;}synchronized (messages) { // 如果存在消息链表但其中没有元素,也说明队列中没有任何消息if(messages.size() == 0) {return null;} // 链表中有元素,就进行头删Message currentMessage = messages.remove(0);System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId = " + currentMessage.getMessageId());return currentMessage;}}// 获取指定队列中消息的个数public int getMessageCount(String queueName) {LinkedList<Message> messages = queueMessageMap.get(queueName);if(messages == null) { // 队列中没有消息return 0;}synchronized (messages) {return messages.size();}}
封装未确认消息操作
- 针对 queueMessageWaitAckMap 进行插入、获取、删除
// 添加未确认消息public void addMessageWaitAck(String queueName,Message message) {ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName,k -> new ConcurrentHashMap<>());messageHashMap.put(message.getMessageId(),message);System.out.println("[MemoryDataCenter] 消息进入待确认队列! messageId = " + message.getMessageId());}// 删除未确认消息 (消息已经确认了)public void removeMessageWaitAck(String queueName,String messageId) {ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if(messageHashMap == null) {return;}messageHashMap.remove(messageId);System.out.println("[MemoryDataCenter] 消息从待确认队列删除! messageId = " + messageId);}// 获取指定的未确认的消息public Message getMessageWaitAck(String queueName,String messageId) {ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);if(messageHashMap == null) {return null;}return messageHashMap.get(messageId);}
封装恢复数据操作
- 通过 recovery 方法将硬盘中存储的数据都恢复到内存中
// 这个方法就是从硬盘上读取数据,把硬盘中之前持久化存储的各个纬度的数据都恢复到内存中public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException { // 0、清空之前的所有数据exchangeMap.clear();queueMap.clear();bindingsMap.clear();messageMap.clear();queueMessageMap.clear(); // 1、恢复所有的交换机数据List<Exchange> exchanges = diskDataCenter.selectAllExchanges();for (Exchange exchange : exchanges) {exchangeMap.put(exchange.getName(),exchange);} // 2、恢复所有的队列数据List<MSGQueue> queues = diskDataCenter.selectAllQueues();for (MSGQueue queue : queues) {queueMap.put(queue.getName(),queue);} // 3、恢复所有的绑定数据List<Binding> bindings = diskDataCenter.selectAllBindings();for (Binding binding : bindings) {ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),k -> new ConcurrentHashMap<>());bindingMap.put(binding.getQueueName(),binding);} // 4、恢复所有的消息数据 // 遍历所有的队列,根据每个队列的名字,获取到所有的消息for (MSGQueue queue: queues) {LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());queueMessageMap.put(queue.getName(),messages);for (Message message : messages) {messageMap.put(message.getMessageId(),message);}} // 注意!!针对 "未确认的消息" 这部分内存中的数据,不需要从硬盘恢复,之前考虑硬盘存储的数据,也没设定这一块 // 一旦在等待 ack 的过程中,服务器重启了,此时这些 "未被确认的消息",就恢复成 "未被取走的消息" // 这个消息在硬盘上存储的时候,就是当作 "未被取走"}
关于线程安全
问题:
- 上述代码中,哪一段需要加锁?加锁的范围是什么?使用哪个对象作为锁对象?
回答:
- 此处我们很难一概而论,只能具体问题具体分析
总的原则:
- 分析该段代码如果不加锁,会造成什么样的后果/问题?该后果/问题是否严重?
实例理解:
- 红框代码中的 if 逻辑 和 bindingMap 的 put 操作是分开的
- 即可能存在 bindingsMap 中,同一个交换机绑定两个 queueName 相同的 queue 对象
- 如果这两个 binding 对象中的 bindingKey 不同,那么该 queueName 对应的 bindingKey 是哪一个 bindingKey?
- 该问题会造成十分严重的后果,所以我们要给该段代码加锁!
- 绿框代码中的 if 逻辑 和 messageHashMap 的 remove 操作是分开的
- 但是该段代码所产生的线程安全问题微乎其微
- 线程A 执行了 remove 操作,此时线程B 同样执行 remove 操作,如果将 null 传递给 ConcurrentHashMap 的 remove 方法,该方法将不会进行任何操作,并返回 null
- 所以该段代码无需加锁!
针对 MemoryDataCenter 单元测试
- 编写测试用例代码是十分重要的!
package com.example.demo;import com.example.demo.common.MqException; import com.example.demo.mqserver.core.*; import com.example.demo.mqserver.datacenter.DiskDataCenter; import com.example.demo.mqserver.datacenter.MemoryDataCenter; import org.apache.tomcat.util.http.fileupload.FileUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.boot.SpringApplication; import org.springframework.boot.test.context.SpringBootTest;import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap;@SpringBootTest public class MemoryDataCenterTests {private MemoryDataCenter memoryDataCenter = null;@BeforeEachpublic void setUp(){memoryDataCenter = new MemoryDataCenter();}@AfterEachpublic void tearDown() {memoryDataCenter = null;}// 创建测试交换机private Exchange createTestExchange(String exchangeName) {Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.DIRECT);exchange.setAutoDelete(false);exchange.setDurable(true);return exchange;}// 创建测试队列private MSGQueue createTestQueue(String queueName) {MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setExclusive(false);queue.setAutoDelete(false);return queue;}// 针对交换机进行测试@Testpublic void testExchange() { // 1、先构造一个交换机并插入Exchange expectExchange = createTestExchange("testExchange");memoryDataCenter.insertExchange(expectExchange); // 2、查询出这个交换机,比较结果是否一致 此处直接比较这俩引用指向同一个对象Exchange actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertEquals(expectExchange,actualExchange); // 3、删除这个交换机memoryDataCenter.deleteExchange("testExchange"); // 4、再查一次,看是否就差不到了actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertNull(actualExchange);}// 针对队列进行测试@Testpublic void testQueue() { // 1、构造一个队列MSGQueue expectedQueue = createTestQueue("testQueue");memoryDataCenter.insertQueue(expectedQueue); // 2、查询这个队列MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertEquals(expectedQueue,actualQueue); // 3、删除这个队列memoryDataCenter.deleteQueue("testQueue"); // 4、再次查询队列,看是否能查到actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertNull(actualQueue);}// 针对绑定进行测试@Testpublic void testBinding() throws MqException {Binding expectedBinding = new Binding();expectedBinding.setExchangeName("testExchange");expectedBinding.setQueueName("testQueue");expectedBinding.setBindingKey("testBindingKey");memoryDataCenter.insertBinding(expectedBinding);Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertEquals(expectedBinding,actualBinding);ConcurrentHashMap<String,Binding> bindingMap = memoryDataCenter.getBindings("testExchange");Assertions.assertEquals(1,bindingMap.size());Assertions.assertEquals(expectedBinding,bindingMap.get("testQueue"));memoryDataCenter.deleteBinding(expectedBinding);actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertNull(actualBinding);}private Message createTestMessage(String content) {Message message = Message.createMessageWithId("testRoutingKey",null,content.getBytes());return message;}@Testpublic void testMessage() {Message expectedMessage = createTestMessage("testMessage");memoryDataCenter.addMessage(expectedMessage);Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertEquals(expectedMessage,actualMessage);memoryDataCenter.removeMessage(expectedMessage.getMessageId());actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}@Testpublic void testSendMessage() { // 1、创建一个队列,创建 10 条消息,把这些消息都插入队列中MSGQueue queue = createTestQueue("testQueue");List<Message> expectedMessages = new ArrayList<>();for (int i = 0; i < 10; i++) {Message message = createTestMessage("testMessage" + i);memoryDataCenter.sendMessage(queue,message);expectedMessages.add(message);} // 2、从队列中取出这些消息List<Message> actualMessages = new ArrayList<>();while (true) {Message message = memoryDataCenter.pollMessage("testQueue");if (message == null) {break;}actualMessages.add(message);} // 3、比较取出的消息和之前的消息是否一致Assertions.assertEquals(expectedMessages.size(),actualMessages.size());for (int i = 0; i < expectedMessages.size(); i++) {Assertions.assertEquals(expectedMessages.get(i),actualMessages.get(i));}}@Testpublic void testMessageWaitAck() {Message expectedMessage = createTestMessage("expectedMessage");memoryDataCenter.addMessageWaitAck("testQueue",expectedMessage);Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueue",expectedMessage.getMessageId());Assertions.assertEquals(expectedMessage,actualMessage);memoryDataCenter.removeMessageWaitAck("testQueue", expectedMessage.getMessageId());actualMessage = memoryDataCenter.getMessageWaitAck("testQueue",expectedMessage.getMessageId());Assertions.assertNull(actualMessage);}//@Testpublic void testRecovery() throws IOException, MqException, ClassNotFoundException { // 由于后续需要进行数据库操作,依赖 MyBatis,就需要先启动 SpringApplication,这样才能进行后续的数据库操作DemoApplication.context = SpringApplication.run(DemoApplication.class); // 1、在硬盘上构造好数据DiskDataCenter diskDataCenter = new DiskDataCenter();diskDataCenter.init();// 构造交换机Exchange expectedExchange = createTestExchange("testExchange");diskDataCenter.insertExchange(expectedExchange);// 构造队列MSGQueue expectedQueue = createTestQueue("testQueue");diskDataCenter.insertQueue(expectedQueue);// 构造绑定Binding expectedBinding = new Binding();expectedBinding.setExchangeName("testExchange");expectedBinding.setQueueName("testQueue");expectedBinding.setBindingKey("testBindingKey");diskDataCenter.insertBinding(expectedBinding);// 构造消息Message expectedMessage = createTestMessage("testContent");diskDataCenter.sendMessage(expectedQueue,expectedMessage); // 2、执行恢复操作memoryDataCenter.recovery(diskDataCenter);// 3、对比结果Exchange actualExchange = memoryDataCenter.getExchange("testExchange");Assertions.assertEquals(expectedExchange.getName(),actualExchange.getName());Assertions.assertEquals(expectedExchange.getType(),actualExchange.getType());Assertions.assertEquals(expectedExchange.isDurable(),actualExchange.isDurable());Assertions.assertEquals(expectedExchange.isAutoDelete(),actualExchange.isAutoDelete());MSGQueue actualQueue = memoryDataCenter.getQueue("testQueue");Assertions.assertEquals(expectedQueue.getName(),actualQueue.getName());Assertions.assertEquals(expectedQueue.isDurable(),actualQueue.isDurable());Assertions.assertEquals(expectedQueue.isAutoDelete(),actualQueue.isAutoDelete());Assertions.assertEquals(expectedQueue.isExclusive(),actualQueue.isExclusive());Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");Assertions.assertEquals(expectedBinding.getExchangeName(),actualBinding.getExchangeName());Assertions.assertEquals(expectedBinding.getQueueName(),actualBinding.getQueueName());Assertions.assertEquals(expectedBinding.getBindingKey(),actualBinding.getBindingKey());Message actualMessage = memoryDataCenter.pollMessage("testQueue");Assertions.assertEquals(expectedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(),actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(),actualMessage.getBody());// 4、清理硬盘的数据,把整个 data 目录里的内容都删掉(包含了 meta.db 和 队列的目录)DemoApplication.context.close();File dataDir = new File("./data");FileUtils.deleteDirectory(dataDir);} }