从 0 到 1 ,手把手教你编写《消息队列》项目(Java实现) —— 创建虚拟机

文章目录

  • 一、虚拟机
  • 二、关于消息的API
    • 发布消息
      • 直接交换机 DIRECT 转发规则
      • 扇出交换机 FANOUT 转发规则
      • 主题交换机 TOPIC 转发规则
      • 匹配规则
      • Router类
    • 订阅消息
      • 消费者
      • 队列如何给订阅的消费者发送消息
      • 自动发送消息至订阅者
    • 应答消息
  • 三、代码编写


一、虚拟机

接下来要创建虚拟机,每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.同时提供 api 供上层调用.

在这里咱们实现的单虚拟机,并没有提供创建虚拟机,销毁虚拟机的功能,但是为了方便后续的扩展,咱们要约定好如何区分多个虚拟机之间的交换机,队列,绑定关系.

不同虚拟机当然可以拥有相同名字的交换机等.
比如
虚拟机A中 拥有 交换机C,
虚拟机B中 拥有 交换机C,
咱们视为以上情况是合法的.

咱们这里采取的方案是,在客户提供的交换机等的身份标识(交换机名字),前加上虚拟机的名字.
即 客户要在虚拟机 VirtualHostA 中创建交换机 exchangeC,咱们服务器存储的交换机名字是 VirtualHostAexchangeC.

当然也有其他方案,大家可以自由发挥.

二、关于消息的API

这是虚拟机要提供给上层的API
在这里插入图片描述

前6个API咱们已经写好了,只需要直接调用下层的API即可.
咱们现在来考虑后 7 - 9 这三个API的实现.

发布消息

发布消息API:其实就是生产者将消息发送给对应的交换机,交换机再根据不同的转发规则,转发给与之相绑定且符合规则的消息队列.

绑定关系 Binding 中有一个 bindingKey 属性
消息 Message 中 有一个 routingKey 属性

下面就来讲解一下三种交换机的转发规则已经这两个 Key 的不同含义.

直接交换机 DIRECT 转发规则

在直接交换机中,
bindingKye是无意义的,
routingKey是要转发到的队列的队列名.

直接交换机的转发规则, 是无视 bindingKey的,即 直接交换机是否与这个队列绑定都没有关系,而直接将消息转发到 routingKey指定的队列名的队列中.


扇出交换机 FANOUT 转发规则

在扇出交换机中,
bindingKye是无意义的,
routingKey是无意义的.

扇出交换机的转发规则,是将收到的消息转发到与之绑定的所有队列中.与bindingKye和routingKey是没有任何关系的.


主题交换机 TOPIC 转发规则

在主题交换机中,
bindingKey是创建绑定时,给绑定指定的特殊字符串(相当于一把锁),
routingKey是转发消息时,给消息指定的特殊字符串(相当于一把钥匙).

主题交换机的转发规则,是将收到的消息的routingKey与绑定的所有队列中的 bindingKey 进行匹配,当且仅当匹配成功时,才将消息转发给该队列.

匹配规则

routingKey规则

  • 由数字,字母,下划线组成
  • 使用 . 将routingKey分成多个部分.

bindingKey规则

  • 由数字,字母,下划线组成
  • 使用 . 将routingKey分成多个部分.
  • 支持两种特殊的符号作为通配符 * 与 # (*和#必须是作为被 . 分割出来的单独部分如 aaa*.bb就是非法的)
    * 可以匹配任何一个独立的部分
    # 可以匹配0个或多个的独立部分

匹配规则
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

Router类

在core包中创建 Router类,来完成对bindingKey与routingKey的校验与匹配.

/*** 这个类用来检查 bindingKey与routingKey 是否合法* 以及 bindingKey与routingKey 的匹配功能,* 以及 根据不同交换机的转发规则判断 消息 Message 是否可以转发给对应的绑定队列*/
public class Router {// bindingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分// 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段.// 检查 BindingKey 是否合法public boolean checkBindingKey(String bindingKey) {if (bindingKey.length() == 0) {// 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的.return true;}// 检查字符串中不能存在非法字符for (int i = 0; i < bindingKey.length(); i++) {char ch = bindingKey.charAt(i);if (ch >= 'A' && ch <= 'Z') {continue;}if (ch >= 'a' && ch <= 'z') {continue;}if (ch >= '0' && ch <= '9') {continue;}if (ch == '_' || ch == '.' || ch == '*' || ch == '#') {continue;}return false;}// 检查 * 或者 # 是否是独立的部分.// aaa.*.bbb 合法情况;  aaa.a*.bbb 非法情况.String[] words = bindingKey.split("\\.");for (String word : words) {// 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了.if (word.length() > 1 && (word.contains("*") || word.contains("#"))) {return false;}}// 约定一下, 通配符之间的相邻关系(人为约定的).// 这样约定是因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性 提升不大~~// 1. aaa.#.#.bbb    => 非法// 2. aaa.#.*.bbb    => 非法// 3. aaa.*.#.bbb    => 非法// 4. aaa.*.*.bbb    => 合法for (int i = 0; i < words.length - 1; i++) {// 连续两个 ##if (words[i].equals("#") && words[i + 1].equals("#")) {return false;}// # 连着 *if (words[i].equals("#") && words[i + 1].equals("*")) {return false;}// * 连着 #if (words[i].equals("*") && words[i + 1].equals("#")) {return false;}}return true;}// 检查 RoutingKey 是否合法// routingKey 的构造规则:// 1. 数字, 字母, 下划线// 2. 使用 . 分割成若干部分public boolean checkRoutingKey(String routingKey) {if (routingKey.length() == 0) {// 空字符串. 合法的情况. 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 ""return true;}for (int i = 0; i < routingKey.length(); i++) {char ch = routingKey.charAt(i);// 判定该字符是否是大写字母if (ch >= 'A' && ch <= 'Z') {continue;}// 判定该字母是否是小写字母if (ch >= 'a' && ch <= 'z') {continue;}// 判定该字母是否是阿拉伯数字if (ch >= '0' && ch <= '9') {continue;}// 判定是否是 _ 或者 .if (ch == '_' || ch == '.') {continue;}// 该字符, 不是上述任何一种合法情况, 就直接返回 falsereturn false;}// 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 truereturn true;}// 用来判断该消息是否可以转发给这个绑定对应的队列public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException {// 根据不同的 exchangeType 使用不同的转发规则if (exchangeType == ExchangeType.FANOUT) {// 如果是 FANOUT 类型,则该交换机上绑定的所有队列都需要转发return true;} else if (exchangeType == ExchangeType.TOPIC) {return routeTopic(binding,message);} else {throw new MqException("[Router] 交换机类型非法! exchangeType="+exchangeType);}}// 用来匹配 bindingKey与routingKeyprivate boolean routeTopic(Binding binding, Message message) {// 先把这两个 key 进行切分String[] bindingTokens = binding.getBindingKey().split("\\.");String[] routingTokens = message.getRoutingKey().split("\\.");// 引入两个下标, 指向上述两个数组. 初始情况下都为 0int bindingIndex = 0;int routingIndex = 0;// 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 forwhile (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) {if (bindingTokens[bindingIndex].equals("*")) {// [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!!bindingIndex++;routingIndex++;continue;} else if (bindingTokens[bindingIndex].equals("#")) {// 如果遇到 #, 需要先看看有没有下一个位置.bindingIndex++;if (bindingIndex == bindingTokens.length) {// [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了!return true;}// [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置.// findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]);if (routingIndex == -1) {// 没找到匹配的结果. 匹配失败return false;}// 找到的匹配的情况, 继续往后匹配.bindingIndex++;routingIndex++;} else {// [情况一] 如果遇到普通字符串, 要求两边的内容是一样的.if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) {return false;}bindingIndex++;routingIndex++;}}// [情况五] 判定是否是双方同时到达末尾// 比如 aaa.bbb.ccc  和  aaa.bbb 是要匹配失败的.if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) {return true;}return false;}private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) {for (int i = routingIndex; i < routingTokens.length; i++) {if (routingTokens[i].equals(bindingToken)) {return i;}}return -1;}
}

订阅消息

消费者

咱们要创建一个消费者类,其中有消费者的信息,以及该消费者订阅的队列的名字,
以及消息应答模式,以及回调函数.

回调函数是什么?
是让消费者自己设定一个函数,当有新的消息转发给该消费者后,就执行这个回调函数

在这里插入图片描述

/*** 消费者的回调函数*/
@FunctionalInterface
public interface Consumer {// Delivery 是 “投递” 的意思,这个方法预期是在每次服务器收到消息之后,来调用// 通过这个方法把消息推送给对应的消费者// 这里的方法名与参数列表都是参考 RabbitMQ 的void handleDelivery(String consumerTag, BasicProperties basicProperties,byte[] body) throws MqException, IOException;
}
/*** 表示一个消费者(完整的执行环境)*/
@Data
public class ConsumerEnv {// 消费者信息private String consumerTag;// 订阅队列的名字private String queueName;// true -> 自动应答, false -> 手动应答private boolean autoAck;// 回调函数private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag = consumerTag;this.queueName = queueName;this.autoAck = autoAck;this.consumer = consumer;}
}

队列如何给订阅的消费者发送消息

这里咱们要想清楚的是,一个队列可以有多个消费者,
新来的消息要转发给哪个消费者呢?
咱们在这里采取轮询策略,即让消费者排队,依次将消息发送给消费者,当消费者收到消息后,则移动到队伍的最后等待下个消息.

因此咱们要给核心类 Message类再增加几个属性和方法,来管理消费者,

/*** 表示一个存储消息的队列* MSG =》Message* 消息队列的使用者是消费者*/
@Data
public class MSGQueue {// 表示队列的身份标识private String name;// 表示队列是否持久化private boolean durable = false;// true -> 这个队列只能被一个消费者使用,false -> 大家都能使用这个队列// 后续代码不实现相关功能private boolean exclusive = false;// true -> 没人使用后,自动删除,false -> 没人使用,不自动删除private boolean autoDelete = false;// 表示扩展参数,后续代码没有实现private Map<String,Object> arguments = new HashMap<>();// 当前队列有哪些消费者订阅了private List<ConsumerEnv> consumerEnvList = new ArrayList<>();// 记录当前取到了第几个消费者,方便实现轮询策略private AtomicInteger consumerSeq = new AtomicInteger(0);// 添加一个新的订阅者public void addConsumerEnv(ConsumerEnv consumerEnv) {consumerEnvList.add(consumerEnv);}// 删除订阅者暂不考虑// 挑选一个订阅者,用来处理当前的消息(按照轮询的方式)public ConsumerEnv chooseConsumer() {// 无人订阅if (consumerEnvList.size() == 0) {return null;}int index = consumerSeq.get() % consumerEnvList.size();consumerSeq.getAndIncrement();return consumerEnvList.get(index);}public String getArguments() {ObjectMapper objectMapper = new ObjectMapper();try {return objectMapper.writeValueAsString(arguments);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}public void setArguments(String arguments) {ObjectMapper objectMapper = new ObjectMapper();try {this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});} catch (JsonProcessingException e) {e.printStackTrace();}}public void setArguments(Map<String,Object> arguments) {this.arguments = arguments;}public Object getArguments(String key) {return arguments.get(key);}public void setArguments(String key,Object value) {arguments.put(key, value);}
}

自动发送消息至订阅者

那么消费者要如何拿到消息呢?即如何将消息发送给消费者,咱们这里采取的是自动发送,即队列中来了新消息,就自动将新消息发送给订阅了这个队列的消费者.

咱们实现的方法是,使用一个阻塞队列,当生产者发布消息到交换机时,交换机转发消息到对应的队列后,就把队列名当作令牌添加到这个阻塞队列中,再配置一个扫描线程,去时刻扫描这个阻塞队列中是否有新的令牌了,有了新令牌,则根据令牌去对应的队列中,去把新消息安装轮询策略转发给消费者.


应答消息

应答消息共有两种模式.

  • 自动应答:将消息发送给消费者就算应答了(不关心消费者收没收到,相当于没应答)
  • 手动应答:需要消费者手动调用应答方法(确保消费者收到消息了)

三、代码编写

/*** 通过这个类, 来表示 虚拟主机.* 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息 数据.* 同时提供 api 供上层调用.* 针对 VirtualHost 这个类, 作为业务逻辑的整合者, 就需要对于代码中抛出的异常进行处理了.*/@Data
public class VirtualHost {private String virtualHostName;private Router router = new Router();private DiskDataCenter diskDataCenter = new DiskDataCenter();private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();// 操作交换机的锁对象,防止多线程操作交换机时,出现线程安全问题,如 创建了两个拥有相同身份标识的交换机private final Object exchangeLocker = new Object();// 操作队列的锁对象,防止多线程,操作队列时,出现线程安全问题,如 创建了两个拥有相同身份标识的队列private final Object queueLocker = new Object();// 消费者管理中心ConsumerManager consumerManager = new ConsumerManager(this);public VirtualHost(String virtualHostName) {this.virtualHostName = virtualHostName;// MemoryDataCenter 并不需要初始化,当 new MemoryDataCenter();时,里面所需的数据结构也都已经创建好了// DiskDataCenter 就需要初始化操作,去建库建表建文件和初始化数据的设定diskDataCenter.init();try {// 将硬盘中已有的数据恢复到内存中memoryDataCenter.recovery(diskDataCenter);} catch (IOException | MqException | ClassNotFoundException e) {e.printStackTrace();}}// 创建交换机// 如果交换机不存在, 就创建. 如果存在, 直接返回.// 返回值是 boolean. 创建成功, 返回 true. 失败返回 falsepublic boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String,Object> arguments) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 把交换机的名字, 加上虚拟主机作为前缀.// 1.判断交换机是否存在,直接从内存查询if (memoryDataCenter.getExchange(exchangeName) != null) {// 该交换机已经存在!System.out.println("[VirtualHost] 交换机已经存在! exchangeName=" + exchangeName);return true;}// 2.创建交换机Exchange exchange = new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3.把交换机对象写入硬盘if (durable) {diskDataCenter.insertExchange(exchange);}// 4.把交换机对象写入内存memoryDataCenter.insertExchange(exchange);System.out.println("[VirtualHost] 交换机创建成功! exchangeName=" + exchangeName);// 上述逻辑, 先写硬盘, 后写内存. 目的就是因为硬盘更容易写失败. 如果硬盘写失败了, 内存就不写了.// 要是先写内存, 内存写成功了, 硬盘写失败了, 还需要把内存的数据给再删掉. 就比较麻烦了.}return true;} catch (Exception e) {System.out.println("[VirtualHost] 交换机创建失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}// 删除交换机public boolean exchangeDelete(String exchangeName) {exchangeName = virtualHostName + exchangeName;try {synchronized (exchangeLocker) {// 1.判断交换机是否存在Exchange existsExchange = memoryDataCenter.getExchange(exchangeName);if (existsExchange == null) {throw new MqException("[virtualHostName] 交换机不存在无法删除!");}if (existsExchange.isDurable()) {// 2.删除硬盘上的数据diskDataCenter.deleteExchange(exchangeName);}// 3.删除内存上的数据memoryDataCenter.deleteExchange(exchangeName);System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName);e.printStackTrace();return false;}}// 创建队列public boolean queueuDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments) {// 给队列名字加前缀queueName = virtualHostName + queueName;try {synchronized (queueLocker) {// 1.判断队列是否存在if (memoryDataCenter.getQueue(queueName) != null) {System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName);return true;}// 2.创建队列MSGQueue queue = new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3.向硬盘中写入数据if (durable) {diskDataCenter.insertQueue(queue);}// 4.向内存中写入数据memoryDataCenter.insertQueue(queue);System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName);}return true;} catch (Exception e) {System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName);e.printStackTrace();return false;}}// 删除队列public boolean queueDelete(String queueeName) {queueeName = virtualHostName + queueeName;try {synchronized (queueLocker) {MSGQueue existsQueue = memoryDataCenter.getQueue(queueeName);// 1.判断交换机是否存在if (existsQueue == null) {throw new MqException("[virtualHostName] 队列不存在无法删除!");}// 2.删除硬盘上的数据if (existsQueue.isDurable()) {diskDataCenter.deleteQueue(queueeName);}// 3.删除内存上的数据memoryDataCenter.deleteQueue(queueeName);System.out.println("[VirtualHost] 队列删除成功! queueeName=" + queueeName);}return true;}catch (Exception e) {System.out.println("[VirtualHost] 队列删除失败! queueeName=" + queueeName);e.printStackTrace();return false;}}// 创建绑定public boolean bindingDeclare(String exchangeName,String queueName,String bindingKey) {exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在,对应的交换机 队列是否存在,bindingKey是否合法if (memoryDataCenter.getBinding(exchangeName, queueName) != null) {return true;}Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}if (!router.checkBindingKey(bindingKey)) {throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey);}// 2.创建绑定Binding binding = new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 3.写入硬盘if (exchange.isDurable() && queue.isDurable()) {diskDataCenter.insertBinding(binding);}// 4.写入内存memoryDataCenter.insertBinding(binding);System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName+ ", queueName=" + queueName);}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName+ ", queueName=" + queueName);e.printStackTrace();return false;}}// 删除绑定public boolean bindingDelete(String exchangeName,String queueName) {exchangeName = virtualHostName + exchangeName;queueName = virtualHostName + queueName;try {synchronized (exchangeLocker) {synchronized (queueLocker) {// 1.判断绑定是否已存在,对应的交换机 队列是否存在,bindingKey是否合法Binding binding = memoryDataCenter.getBinding(exchangeName, queueName);if (binding == null) {throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName);}// 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用.diskDataCenter.deleteBinding(binding);// 3. 删除内存的数据memoryDataCenter.deleteBinding(binding);System.out.println("[VirtualHost] 删除绑定成功!");}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 删除绑定失败!");e.printStackTrace();return false;}}// 发送消息到指定的交换机/队列中public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {exchangeName = virtualHostName + exchangeName;// 1.检查 routingKey是否合法if (!router.checkRoutingKey(routingKey)) {throw new MqException("[VirtualHost] routingKey 非法! routingKey=" + routingKey);}// 2.查找交换机对象Exchange exchange = memoryDataCenter.getExchange(exchangeName);if (exchange == null) {throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName);}// 3.判断交换机绑定类型if (exchange.getType() == ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 以 routingKey 作为队列的名字, 直接把消息写入指定的队列中.// 此时, 可以无视绑定关系.// 4.查找对应的队列String queueName = virtualHostName + routingKey;MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName);}// 5.构造 messageMessage message = Message.createMessageWithId(routingKey, basicProperties, body);// 6.发送消息sendMessage(queue,message);return true;} else {// 按照 fanout 和 topic 的方式来转发.// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMap<String, Binding> bindingsMap = memoryDataCenter.getBinding(exchangeName);for (Map.Entry<String,Binding> entry : bindingsMap.entrySet()) {// 6.获取绑定对象,判断对应的队列是否存在Binding binding = entry.getValue();MSGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());if (queue == null){// 此处咱们就不抛出异常了. 可能此处有多个这样的队列.// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输.System.out.println("[VirtualHost] basicPublish 发送消息时, 发现队列不存在! queueName=" + binding.getQueueName());continue;}// 7.构造消息Message message = Message.createMessageWithId(routingKey, basicProperties, body);// 8.判断该消息是否可以发送给该队列if (router.route(exchange.getType(),binding,message)) {sendMessage(queue,message);}}}return true;} catch (Exception e) {System.out.println("[VirtualHost] 消息发送失败!");e.printStackTrace();return false;}}// 发送消息到硬盘与内存private void sendMessage(MSGQueue queue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息,即 把消息写入到 硬盘 和 内存 中int deliverMode = message.getDeliverMode();// deliverMode 为 1 不持久化,为 2 持久化if (deliverMode == 2) {diskDataCenter.sendMessage(queue,message);}// 写入内存memoryDataCenter.sendMessage(queue,message);// 通知消费者可以消费消息了consumerManager.notifyConsume(queue.getName());}// 订阅消息// 添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后,应答的方式  为 true 自动应答,为 false 手动应答// consumer: 是一个回调函数,此处类型设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda样子public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {queueName = virtualHostName + queueName;try {consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);System.out.println("[VirtualHost] basicConsume成功 queueName=" + queueName);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicConsume失败 queueName=" + queueName);e.printStackTrace();return false;}}// 消息的手动应答public boolean basicAck(String queueName,String messageId) {queueName = virtualHostName + queueName;try {// 1.获取消息和队列Message message = memoryDataCenter.getMessage(messageId);if (message == null) {throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId);}MSGQueue queue = memoryDataCenter.getQueue(queueName);if (queue == null) {throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName);}// 2.删除硬盘上的消息if (message.getDeliverMode() == 2) {diskDataCenter.deleteMessage(queue, message);}// 3.删除消息中心的消息memoryDataCenter.removeMessage(messageId);// 4.删除待确认消息中的消息memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName + ", messageId="+ messageId);return true;} catch (Exception e) {System.out.println("[VirtualHost] basicAck 失败! 消息被成功确认! queueName=" + queueName + ", messageId="+ messageId);e.printStackTrace();return false;}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/100495.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

el-date-picker增加默认值 修改样式

预期效果 默认是这样的 但希望是直接有一个默认的当天日期&#xff0c;并且字体颜色啥的样式也要修改&#xff08;在这里假设今天是2023/10/6 功能实现 踩了坑挺多坑的&#xff0c;特此记录 官方文档 按照官方的说明&#xff0c;给v-model绑定一个字符串就可以了 在j…

AI能否取代程序员:探讨人工智能在编程领域的角色

引言&#xff1a; 随着人工智能&#xff08;AI&#xff09;技术的快速发展&#xff0c;人们开始思考&#xff1a;AI是否能够取代程序员&#xff1f;这个问题引发了广泛的讨论和辩论。一些人认为&#xff0c;AI的出现将彻底改变编程的面貌&#xff0c;而另一些人则坚信&#xf…

大数据之LibrA数据库系统介绍

简介 LibrA是一个基于开源数据库Postgres-XC开发的分布式并行关系型数据库系统。 LibrA提供了以下功能&#xff1a; 标准SQL支持 支持标准的SQL92/SQL2003规范&#xff0c;支持GBK和UTF-8字符集&#xff0c;支持SQL标准函数与OLAP分析函数&#xff0c;支持存储过程。 数据库…

微信支付v2

文档&#xff1a; https://pay.weixin.qq.com/wiki/doc/api/index.html 微信小程序&#xff1a;https://pay.weixin.qq.com/wiki/doc/api/jsapi.php?chapter11_1 需要一个微信认证后的小程序&#xff0c;&#xff0c;还需要一个&#xff0c;在微信商户平台&#xff0c;&…

串级/级联控制知识点整理

串级控制系统是改善控制质量的有效方法之一&#xff0c;在过程控制中得到了广泛的应用。所谓串级控制&#xff0c;就是采用两个控制器串联工作&#xff0c;外环控制器的输出作为内环控制器的设定值&#xff0c;由内环控制器的输出去操纵控制阀&#xff0c;从而对外环被控量具有…

mysql8压缩包安装

MySQL 8.0 版压缩包安装教程_mysql 压缩包 8.0安装-CSDN博客 1、mysql压缩包 2、参考链接一步一步操作即可。 3、安装&#xff0c;破解navicat. 4、无法连接&#xff0c;参考该链接修改&#xff1a; Mysql 解决1251- Client does not support authentication protocol reques…

android app开发环境搭建

Android是流行的移动设备原生应用开发平台&#xff0c;其支持Java语言以及Kotlin语言的开发环境&#xff0c;本文主要描述官方提供的Android studio集成开发环境搭建。 https://developer.android.google.cn/ 如上所示&#xff0c;从官方上下载最新版本的Android studio集成开…

【DevOps】搭建你的第一个 Docker 应用栈

搭建你的第一个 Docker 应用栈 1.Docker 集群部署2.第一个 Hello World2.1 获取应用栈各节点所需镜像2.2 应用栈容器节点互联2.3 应用栈容器节点启动2.4 应用栈容器节点的配置2.4.1 Redis Master 主数据库容器节点的配置2.4.2 Redis Slave 从数据库容器节点的配置2.4.3 Redis 数…

[NISACTF 2022]join-us - 报错注入无列名注入

[NISACTF 2022]join-us 解题流程 解题流程 点击登录&#xff0c;找到注入点 这种框&#xff0c;可以直接爆破关键字&#xff0c;看是否拦截&#xff0c;也可以手动尝试&#xff0c;发现、union、and、or、substr、database等关键字都拦截了 1、学到了&#xff1a;可以用数据库…

pytorch学习------TensorBoard的使用

目录 简介使用方式1、单条曲线(scalar)2、多条曲线(scalars)3、直方图(histogram)4、图片(image)5、渲染(figure)6、网络(graph) 简介 建好一个神经网络&#xff0c;其实我们也不知道神经网络里头具体细节到底做了什么&#xff0c;要人工调试十分困难(就好比你无法想象出递归的…

SpringCloud之Stream框架集成RocketMQ消息中间件

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于 Spring Boot 来创建独立的、可用于生产的 Spring 应用程序。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现&#xff0c;并引入了发布-订阅、消费组、分区这三…

SQL Server 简介与 Docker Compose 部署

今天我翻阅了在之前公司工作时的笔记&#xff0c;发现了有关数据库的一些记录。当时&#xff0c;我们的项目开始使用 Oracle 数据库&#xff0c;但后来由于一些项目需求的变更&#xff0c;我们切换到了 SQL Server 。值得一提的是&#xff0c;公司当时也开始采用 Docker 技术&a…

c++模板库容器list vector map set操作和性能对比

文章目录 listvectormapset性能比较总结 list 列表&#xff08;list&#xff09;是C STL中的一种容器类型&#xff0c;它是一个双向链表&#xff0c;可以在任意位置高效地添加、删除、移动元素。 以下是一些常用的列表操作&#xff1a; 创建列表 #include <list> std…

Jetson Orin NX 开发指南(1): 系统烧录

一、SDK Manager SDK Manager 工具是 NVIDIA 官方推荐的烧写和管理 Jetpack 系统组件的一个图形化烧写工具&#xff0c;使用起来非常的简单方便&#xff0c;但是该软件需要在 x86 的 Ubuntu 18.04 或 Ubuntu 20.04 的系统上运行&#xff0c;因此我们需要准备一台安装了 Ubuntu…

记录:Unity脚本的编写3.0

目录 前言前置控制方法查看效果移动方式 前言 前面记录了一些通过脚本控制对象模型移动和通过用户的操作对模型进行变化的方法&#xff0c;那么为了让我们创造的不论是地形还是模型都拥有真实的物理引擎&#xff08;大雾&#xff09;&#xff0c;那么这次就使用脚本控制模型感…

函数reshape(-1,)里的-1的意思

reshape函数是对narray的数据结构进行维度变换&#xff0c;由于变换遵循对象元素个数不变&#xff0c;在进行变换时&#xff0c;假设一个数据对象narray的总元素个数为N&#xff0c;如果我们给出一个维度为&#xff08;m&#xff0c;-1&#xff09;时&#xff0c;我们就理解为将…

【Linux C】Linux如何执行一个程序(程序存储空间、系统调用、内核调用)

文章目录 一、程序存储空间1.1 C语言程序存储空间1.2 用户空间和内核空间1.3 用户模式和内核模式 二、内核调用-系统调用-C语言库函数2.1 系统调用和内核调用2.2 C语言库函数 三、Linux如何执行一个程序 一、程序存储空间 本节说的空间主要是指内存空间&#xff0c;即程序如何分…

链表(2)——带头双向循环链表

&#x1f341;一、链表的分类 &#x1f315;1.单向或者双向 &#x1f315;2.带头或者不带头&#xff08;有无哨兵&#xff09; &#x1f315;3.循环或者不循环 &#x1f315;4.无头单向非循环链表&#xff08;常用&#xff09; &#x1f315;5.带头双向循环链表&#xff08;常用…

案例分享:原生广告如何助力app实现高效变现收益的转化

原生广告是指将广告嵌入到APP的内容中&#xff0c;使其与APP内容融为一体&#xff0c;达到获得用户关注的效果。在形式上&#xff0c;原生广告并不像传统广告那样显眼&#xff0c;而是以一种更加自然的方式展现在用户面前。 它采用了与APP相似的设计风格和交互方式&#xff0c…

深度学习DAY1:神经网络NN;二元分类

深度学习笔记 DAY1 深度学习基本知识 1.神经网络 1.1 单一神经元 所有神经元将房屋大小size作为输入x,计算线性方程&#xff0c;结果取max&#xff08;0&#xff0c;y&#xff09;,输出预测房价y ReLU函数&#xff08;线性整流函数&#xff09;–max&#xff08;0&#xf…