消息队列 (9)-消费者核心类的实现

目录

  • 前言
  • 消费者类设计思路
    • 核心API
    • 总体代码

前言

我们上一篇博客,写了虚拟主机的实现, 在虚拟主机中需要用到俩个未实现的类,分别是验证绑定关键字和消费者类,接下来我们实现消费者类的核心代码

消费者类设计思路

在这个类中,首先我们要持有virtualHost对象来操作数据, 然后我们指定一个线程池负责具体的回调函数,通过一个扫描队列来不停的扫描所有的队列,看那个队列有新的消息,如果有就放到阻塞队列中去,消费者每次从阻塞队列中取出一个消息来响应。如果是多个消费者都订阅了一个消息,那么就使用轮询的方式来获取消息
在这里插入图片描述

核心API

属性

虚拟主机
线程池
阻塞 队列
扫描线程

方法
①往阻塞队列中添加消息

// 往阻塞队列中添加消息public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}

②订阅消息

我们的思路是,先找到对应的队列,然后去查看队列中是否有消息,如果有就要消费掉这些消息

 //添加订阅者public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 1 找到对应的队列MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);if (queue == null){throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumEnv consumEnv = new ConsumEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumEnv(consumEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = virtuaHost.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}

③消费消息
关于消费消息,我们按照轮询的方式来依次消费

 // 消费消息private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumEnv luckyDog = queue.chooseConsumEnv();if (luckyDog == null){// 说明没有消费者return;}// 2. 从队列中取出一个消息Message message = virtuaHost.getMemoryDataCenter().pollMessage(queue.getName());if (message == null){// 说明没有消息,不能消费return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workPool.submit(()->{try {//1,将消息放到待确认的集合中, 这个操作在回调函数之前virtuaHost.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);//2. 执行回调函数luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());System.out.println("[ConsumerManager] 消息被成功消费, queueName = "+queue.getName());//3. 如果是自动应答, 就可以之间删除消息// 如果是手动应答,  就先什么也不做if (luckyDog.isAutoAck()){// 1删除硬盘上的消息if (message.getDeliverMode() == 2){virtuaHost.getDiskDataCenter().deleteMessage(queue,message);}//2 删除待确认的消息virtuaHost.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3 删除内存中的消息virtuaHost.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (IOException | ClassNotFoundException | MqException e) {e.printStackTrace();}});}

总体代码

package com.example.demo.mqServer.core;import com.example.demo.Common.ConsumEnv;
import com.example.demo.Common.Consumer;
import com.example.demo.Common.MqException;
import com.example.demo.mqServer.VirtuaHost;import java.io.IOException;
import java.util.concurrent.*;/*
* 通过这个类, 来实现来实现消费者消费消息的核心功能
* */
public class ConsumerManager {// 持有上层对象 VirtualHost 调用 ,来操作数据private VirtuaHost virtuaHost;// 指定一个线程池, 负责执行具体的回调函数private ExecutorService workPool = Executors.newFixedThreadPool(4);//  存放令牌的队列  - 阻塞队列private BlockingDeque<String> tokenQueue = new LinkedBlockingDeque<>();//  扫描线程private Thread scannerThread = null;//public ConsumerManager(VirtuaHost virtuaHost) {this.virtuaHost = virtuaHost;scannerThread =new Thread(()->{while (true){try {String queueName = tokenQueue.take();MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);if (queue == null){throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}synchronized (queue){consumeMessage(queue);}} catch (InterruptedException e) {e.printStackTrace();} catch (MqException e) {e.printStackTrace();}}});scannerThread.setDaemon(true);scannerThread.start();}// 往阻塞队列中添加消息public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}// 增加订阅public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 先找到对应的队列MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);if (queue == null){throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumEnv consumEnv = new ConsumEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumEnv(consumEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = virtuaHost.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumEnv luckyDog = queue.chooseConsumEnv();if (luckyDog == null){// 说明没有消费者return;}// 2. 从队列中取出一个消息Message message = virtuaHost.getMemoryDataCenter().pollMessage(queue.getName());if (message == null){// 说明没有消息,不能消费return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workPool.submit(()->{try {//1,将消息放到待确认的集合中, 这个操作在回调函数之前virtuaHost.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);//2. 执行回调函数luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());System.out.println("[ConsumerManager] 消息被成功消费, queueName = "+queue.getName());//3. 如果是自动应答, 就可以之间删除消息// 如果是手动应答,  就先什么也不做if (luckyDog.isAutoAck()){// 1删除硬盘上的消息if (message.getDeliverMode() == 2){virtuaHost.getDiskDataCenter().deleteMessage(queue,message);}//2 删除待确认的消息virtuaHost.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3 删除内存中的消息virtuaHost.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (IOException | ClassNotFoundException | MqException e) {e.printStackTrace();}});}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28877.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【BASH】回顾与知识点梳理(十)

【BASH】回顾与知识点梳理 十 十. 文件的格式化与相关处理10.1 格式化打印&#xff1a; printf10.2 awk&#xff1a;好用的数据处理工具awk 的逻辑运算字符 10.3 文件比对工具diffcmppatch 10.4 文件打印准备&#xff1a; pr 该系列目录 --> 【BASH】回顾与知识点梳理&#…

移动端开发基础总结

移动端学习总结 (适合于复习) 移动端基础 技术选型&#xff1a; 单独制作移动端页面&#xff08;主流&#xff09; 流式布局&#xff08;百分比布局&#xff09;flex弹性布局&#xff08;强烈推荐&#xff09;lessrem媒体查询布局混合布局 响应式页面兼容移动端&#xff08;…

postman----传参格式(json格式、表单格式)

本文主要讲解postman使用post请求方法的2中传参方式&#xff1a;json格式、表单格式 首先了解下&#xff0c;postman进行接口测试&#xff0c;必须条件是&#xff1a; ♥请求地址 ♥请求协议 ♥请求方式 ♥请求头 ♥参数 json格式 先看一下接口文档&#xff0c;根据接口文档&…

深度学习环境安装依赖时常见错误解决

1.pydantic 安装pydantic时报以下错误&#xff1a; ImportError: cannot import name Annotated from pydantic.typing (C:\Users\duole\anaconda3\envs\vrh\lib\site-packages\pydantic\typing.py) 这个是版本错误&#xff0c;删除装好的版本&#xff0c;重新指定版本安装就…

项目规范 编写规范(范例)

项目目录 目录接口参考 项目目录结构设计&#xff0c;增加部分领域模型后缀强制定义&#xff0c;方便统一编码风格。 controller&#xff1a;请求处理 RestController module&#xff1a;按大业务区分&#xff0c;对多个业务对象数据聚合处理 Component manager&#xff1a;…

web-xss

一、简介 XSS 又称CSS(Cross Site Scripting)或跨站脚本攻击&#xff0c;攻击者在网页中插入由JavaScript编写的恶意代码&#xff0c;当用户浏览被嵌入恶意代码的网页时&#xff0c;恶意代码将会在用户的浏览器上执行。 二、xss的攻击方式 Dom&#xff1a;这是一种将任意 Jav…

zookeeper入门学习

zookeeper入门学习 zookeeper应用场景 分布式协调组件 客户端第一次请求发给服务器2&#xff0c;将flag值修改为false&#xff0c;第二次请求被负载均衡到服务器1&#xff0c;访问到的flag也会是false 一旦有节点发生改变&#xff0c;就会通知所有监听方改变自己的值&#…

一键开启ChatGPT“危险发言”

‍ ‍ 大数据文摘授权转载自学术头条 作者&#xff1a;Hazel Yan 编辑&#xff1a;佩奇 随着大模型技术的普及&#xff0c;AI 聊天机器人已成为社交娱乐、客户服务和教育辅助的常见工具之一。 然而&#xff0c;不安全的 AI 聊天机器人可能会被部分人用于传播虚假信息、操纵舆…

“深入探索JVM:Java虚拟机的工作原理解析“

标题&#xff1a;深入探索JVM&#xff1a;Java虚拟机的工作原理解析 摘要&#xff1a;本文将深入探索Java虚拟机&#xff08;JVM&#xff09;的工作原理&#xff0c;从类加载、内存管理、垃圾回收、即时编译器等方面进行详细解析&#xff0c;帮助读者更好地理解JVM的内部机制。…

SSH 免密登录

SSH SSH&#xff08;Secure Shell&#xff09;是一种安全通道协议&#xff0c;主要用来实现字符界面的远程登录、远程复制等功能 SSH 协议对通信双方的数据传输进行了加密处理&#xff0c;其中包括用户登录时输入的用户口令 SSH 为建立在应用层和传输层基础上的安全协议。对数…

命令行非明文密码连接 TiDB

作者&#xff1a; GangShen 原文来源&#xff1a; https://tidb.net/blog/6794a34b 在命令行中连接TiDB的过程中&#xff0c;为了保护密码不被明文获取&#xff0c;可以使用非明文密码连接。本文记录了几种非明文连接 TiDB 的方式。 方式一&#xff1a;命令行输入方式 [ro…

使用go获取链上数据之主动拉取-搭建环境(一)

使用go获取链上数据之主动拉取-搭建环境&#xff08;一&#xff09; 1、配置文件1.1、新建配置文件1.2、新建setting.go文件1.3、新建config.go文件 2、全局变量配置2.1、新建global.go2.2、初始化配置2.3、验证配置 在我们实际开发项目中&#xff0c;很多时候都需要从链上获取…

PyTorch 微调终极指南:第 1 部分 — 预训练模型及其配置

一、说明 如今&#xff0c;在训练深度学习模型时&#xff0c;通过在自己的数据上微调预训练模型来迁移学习已成为首选方法。通过微调这些模型&#xff0c;我们可以利用他们的专业知识并使其适应我们的特定任务&#xff0c;从而节省宝贵的时间和计算资源。本文分为四个部分&…

数据结构:插入排序

直接插入排序 插入排序算法是所有排序方法中最简单的一种算法&#xff0c;其主要的实现思想是将数据按照一定的顺序一个一个的插入到有序的表中&#xff0c;最终得到的序列就是已经排序好的数据。 直接插入排序是插入排序算法中的一种&#xff0c;采用的方法是&#xff1a;在…

解析隧道代理被封的几个主要原因

Hey&#xff0c;各位爬虫高手&#xff0c;你是不是经常遇到爬虫代理HTTP被封的问题&#xff1f;不要慌&#xff0c;今天我来分享一些信息&#xff0c;帮你解析这个问题&#xff01;告别封禁&#xff0c;让你的爬虫工作更顺利&#xff0c;赶快跟随我一起了解吧&#xff01; 在爬…

opencv基础45-图像金字塔01-高斯金字塔cv2.pyrDown()

什么是图像金字塔&#xff1f; 图像金字塔&#xff08;Image> Pyramid&#xff09;是一种用于多尺度图像处理和分析的技术&#xff0c;它通过构建一系列不同分辨率的图像&#xff0c;从而使得图像可以在不同尺度下进行处理和分析。图像金字塔在计算机视觉、图像处理和计算机…

APT80DQ60BG-ASEMI快恢复二极管APT80DQ60BG

编辑&#xff1a;ll APT80DQ60BG-ASEMI快恢复二极管APT80DQ60BG 型号&#xff1a;APT80DQ60BG 品牌&#xff1a;ASEMI 芯片个数&#xff1a;双芯片 封装&#xff1a;TO-3P 恢复时间&#xff1a;≤80ns 工作温度&#xff1a;-55C~150C 浪涌电流&#xff1a;600A 正向电…

UEFI build报错:‘build‘ is not recognized as an internal or external command

UEFI学习&#xff0c;某一次进行build时&#xff0c;提示&#xff1a; build is not recognized as an internal or external command,operable program or batch file. 用的命令是&#xff1a; C:\UEFIWorkspace>build -a X64 -p edk2\OvmfPkg\OvmfPkgX64.dsc -b NOOPT -…

【性能类】—页面性能类

一、提升页面性能的方法有哪些&#xff1f; 1. 资源压缩合并&#xff0c;减少HTTP请求 图片、视频、js、css等资源压缩合并&#xff0c;开启HTTP压缩&#xff0c;把资源文件变小 2. 非核心代码异步加载 →异步加载的方式 → 异步加载的区别 异步加载的方式 ① 动态脚本加载…

【OpenCV常用函数:视频捕获函数】cv2.VideoCapture

文章目录 1、cv2.VideoCapture() 1、cv2.VideoCapture() 输入视频路径&#xff0c;创建VideoCapture的对象 cv2.VideoCapture(filename) filename: 视频文件的路径视频名扩展名该类的函数有&#xff1a; 1&#xff09;video.isOpened: 检查视频捕获是否成功 2&#xff09;vid…