消息队列 (9)-消费者核心类的实现

目录

  • 前言
  • 消费者类设计思路
    • 核心API
    • 总体代码

前言

我们上一篇博客,写了虚拟主机的实现, 在虚拟主机中需要用到俩个未实现的类,分别是验证绑定关键字和消费者类,接下来我们实现消费者类的核心代码

消费者类设计思路

在这个类中,首先我们要持有virtualHost对象来操作数据, 然后我们指定一个线程池负责具体的回调函数,通过一个扫描队列来不停的扫描所有的队列,看那个队列有新的消息,如果有就放到阻塞队列中去,消费者每次从阻塞队列中取出一个消息来响应。如果是多个消费者都订阅了一个消息,那么就使用轮询的方式来获取消息
在这里插入图片描述

核心API

属性

虚拟主机
线程池
阻塞 队列
扫描线程

方法
①往阻塞队列中添加消息

// 往阻塞队列中添加消息public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}

②订阅消息

我们的思路是,先找到对应的队列,然后去查看队列中是否有消息,如果有就要消费掉这些消息

 //添加订阅者public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 1 找到对应的队列MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);if (queue == null){throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumEnv consumEnv = new ConsumEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumEnv(consumEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = virtuaHost.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}

③消费消息
关于消费消息,我们按照轮询的方式来依次消费

 // 消费消息private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumEnv luckyDog = queue.chooseConsumEnv();if (luckyDog == null){// 说明没有消费者return;}// 2. 从队列中取出一个消息Message message = virtuaHost.getMemoryDataCenter().pollMessage(queue.getName());if (message == null){// 说明没有消息,不能消费return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workPool.submit(()->{try {//1,将消息放到待确认的集合中, 这个操作在回调函数之前virtuaHost.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);//2. 执行回调函数luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());System.out.println("[ConsumerManager] 消息被成功消费, queueName = "+queue.getName());//3. 如果是自动应答, 就可以之间删除消息// 如果是手动应答,  就先什么也不做if (luckyDog.isAutoAck()){// 1删除硬盘上的消息if (message.getDeliverMode() == 2){virtuaHost.getDiskDataCenter().deleteMessage(queue,message);}//2 删除待确认的消息virtuaHost.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3 删除内存中的消息virtuaHost.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (IOException | ClassNotFoundException | MqException e) {e.printStackTrace();}});}

总体代码

package com.example.demo.mqServer.core;import com.example.demo.Common.ConsumEnv;
import com.example.demo.Common.Consumer;
import com.example.demo.Common.MqException;
import com.example.demo.mqServer.VirtuaHost;import java.io.IOException;
import java.util.concurrent.*;/*
* 通过这个类, 来实现来实现消费者消费消息的核心功能
* */
public class ConsumerManager {// 持有上层对象 VirtualHost 调用 ,来操作数据private VirtuaHost virtuaHost;// 指定一个线程池, 负责执行具体的回调函数private ExecutorService workPool = Executors.newFixedThreadPool(4);//  存放令牌的队列  - 阻塞队列private BlockingDeque<String> tokenQueue = new LinkedBlockingDeque<>();//  扫描线程private Thread scannerThread = null;//public ConsumerManager(VirtuaHost virtuaHost) {this.virtuaHost = virtuaHost;scannerThread =new Thread(()->{while (true){try {String queueName = tokenQueue.take();MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);if (queue == null){throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);}synchronized (queue){consumeMessage(queue);}} catch (InterruptedException e) {e.printStackTrace();} catch (MqException e) {e.printStackTrace();}}});scannerThread.setDaemon(true);scannerThread.start();}// 往阻塞队列中添加消息public void notifyConsume(String queueName) throws InterruptedException {tokenQueue.put(queueName);}// 增加订阅public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 先找到对应的队列MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);if (queue == null){throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);}ConsumEnv consumEnv = new ConsumEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue){queue.addConsumEnv(consumEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n = virtuaHost.getMemoryDataCenter().getMessageCount(queueName);for (int i = 0; i < n; i++) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}}}private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumEnv luckyDog = queue.chooseConsumEnv();if (luckyDog == null){// 说明没有消费者return;}// 2. 从队列中取出一个消息Message message = virtuaHost.getMemoryDataCenter().pollMessage(queue.getName());if (message == null){// 说明没有消息,不能消费return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workPool.submit(()->{try {//1,将消息放到待确认的集合中, 这个操作在回调函数之前virtuaHost.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);//2. 执行回调函数luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());System.out.println("[ConsumerManager] 消息被成功消费, queueName = "+queue.getName());//3. 如果是自动应答, 就可以之间删除消息// 如果是手动应答,  就先什么也不做if (luckyDog.isAutoAck()){// 1删除硬盘上的消息if (message.getDeliverMode() == 2){virtuaHost.getDiskDataCenter().deleteMessage(queue,message);}//2 删除待确认的消息virtuaHost.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3 删除内存中的消息virtuaHost.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());}} catch (IOException | ClassNotFoundException | MqException e) {e.printStackTrace();}});}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28877.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

postman----传参格式(json格式、表单格式)

本文主要讲解postman使用post请求方法的2中传参方式&#xff1a;json格式、表单格式 首先了解下&#xff0c;postman进行接口测试&#xff0c;必须条件是&#xff1a; ♥请求地址 ♥请求协议 ♥请求方式 ♥请求头 ♥参数 json格式 先看一下接口文档&#xff0c;根据接口文档&…

深度学习环境安装依赖时常见错误解决

1.pydantic 安装pydantic时报以下错误&#xff1a; ImportError: cannot import name Annotated from pydantic.typing (C:\Users\duole\anaconda3\envs\vrh\lib\site-packages\pydantic\typing.py) 这个是版本错误&#xff0c;删除装好的版本&#xff0c;重新指定版本安装就…

web-xss

一、简介 XSS 又称CSS(Cross Site Scripting)或跨站脚本攻击&#xff0c;攻击者在网页中插入由JavaScript编写的恶意代码&#xff0c;当用户浏览被嵌入恶意代码的网页时&#xff0c;恶意代码将会在用户的浏览器上执行。 二、xss的攻击方式 Dom&#xff1a;这是一种将任意 Jav…

zookeeper入门学习

zookeeper入门学习 zookeeper应用场景 分布式协调组件 客户端第一次请求发给服务器2&#xff0c;将flag值修改为false&#xff0c;第二次请求被负载均衡到服务器1&#xff0c;访问到的flag也会是false 一旦有节点发生改变&#xff0c;就会通知所有监听方改变自己的值&#…

一键开启ChatGPT“危险发言”

‍ ‍ 大数据文摘授权转载自学术头条 作者&#xff1a;Hazel Yan 编辑&#xff1a;佩奇 随着大模型技术的普及&#xff0c;AI 聊天机器人已成为社交娱乐、客户服务和教育辅助的常见工具之一。 然而&#xff0c;不安全的 AI 聊天机器人可能会被部分人用于传播虚假信息、操纵舆…

SSH 免密登录

SSH SSH&#xff08;Secure Shell&#xff09;是一种安全通道协议&#xff0c;主要用来实现字符界面的远程登录、远程复制等功能 SSH 协议对通信双方的数据传输进行了加密处理&#xff0c;其中包括用户登录时输入的用户口令 SSH 为建立在应用层和传输层基础上的安全协议。对数…

使用go获取链上数据之主动拉取-搭建环境(一)

使用go获取链上数据之主动拉取-搭建环境&#xff08;一&#xff09; 1、配置文件1.1、新建配置文件1.2、新建setting.go文件1.3、新建config.go文件 2、全局变量配置2.1、新建global.go2.2、初始化配置2.3、验证配置 在我们实际开发项目中&#xff0c;很多时候都需要从链上获取…

PyTorch 微调终极指南:第 1 部分 — 预训练模型及其配置

一、说明 如今&#xff0c;在训练深度学习模型时&#xff0c;通过在自己的数据上微调预训练模型来迁移学习已成为首选方法。通过微调这些模型&#xff0c;我们可以利用他们的专业知识并使其适应我们的特定任务&#xff0c;从而节省宝贵的时间和计算资源。本文分为四个部分&…

数据结构:插入排序

直接插入排序 插入排序算法是所有排序方法中最简单的一种算法&#xff0c;其主要的实现思想是将数据按照一定的顺序一个一个的插入到有序的表中&#xff0c;最终得到的序列就是已经排序好的数据。 直接插入排序是插入排序算法中的一种&#xff0c;采用的方法是&#xff1a;在…

解析隧道代理被封的几个主要原因

Hey&#xff0c;各位爬虫高手&#xff0c;你是不是经常遇到爬虫代理HTTP被封的问题&#xff1f;不要慌&#xff0c;今天我来分享一些信息&#xff0c;帮你解析这个问题&#xff01;告别封禁&#xff0c;让你的爬虫工作更顺利&#xff0c;赶快跟随我一起了解吧&#xff01; 在爬…

opencv基础45-图像金字塔01-高斯金字塔cv2.pyrDown()

什么是图像金字塔&#xff1f; 图像金字塔&#xff08;Image> Pyramid&#xff09;是一种用于多尺度图像处理和分析的技术&#xff0c;它通过构建一系列不同分辨率的图像&#xff0c;从而使得图像可以在不同尺度下进行处理和分析。图像金字塔在计算机视觉、图像处理和计算机…

APT80DQ60BG-ASEMI快恢复二极管APT80DQ60BG

编辑&#xff1a;ll APT80DQ60BG-ASEMI快恢复二极管APT80DQ60BG 型号&#xff1a;APT80DQ60BG 品牌&#xff1a;ASEMI 芯片个数&#xff1a;双芯片 封装&#xff1a;TO-3P 恢复时间&#xff1a;≤80ns 工作温度&#xff1a;-55C~150C 浪涌电流&#xff1a;600A 正向电…

UEFI build报错:‘build‘ is not recognized as an internal or external command

UEFI学习&#xff0c;某一次进行build时&#xff0c;提示&#xff1a; build is not recognized as an internal or external command,operable program or batch file. 用的命令是&#xff1a; C:\UEFIWorkspace>build -a X64 -p edk2\OvmfPkg\OvmfPkgX64.dsc -b NOOPT -…

【性能类】—页面性能类

一、提升页面性能的方法有哪些&#xff1f; 1. 资源压缩合并&#xff0c;减少HTTP请求 图片、视频、js、css等资源压缩合并&#xff0c;开启HTTP压缩&#xff0c;把资源文件变小 2. 非核心代码异步加载 →异步加载的方式 → 异步加载的区别 异步加载的方式 ① 动态脚本加载…

重试框架入门:Spring-RetryGuava-Retry

前言 在日常工作中&#xff0c;随着业务日渐庞大&#xff0c;不可避免的涉及到调用远程服务&#xff0c;但是远程服务的健壮性和网络稳定性都是不可控因素&#xff0c;因此&#xff0c;我们需要考虑合适的重试机制去处理这些问题&#xff0c;最基础的方式就是手动重试&#xf…

YOLOv5源码中的参数超详细解析(2)— 配置文件yolov5s.yaml

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。YOLOv5配置了5种不同大小的网络模型&#xff0c;分别是YOLOv5n、YOLOv5s、YOLOv5m、YOLOv5l、YOLOv5x&#xff0c;其中YOLOv5n是网络深度和宽度最小但检测速度最快的模型&#xff0c;其他4种模型都是在YOLOv5n的基础上不断…

《Python入门到精通》os模块详解,Python os标准库

「作者主页」&#xff1a;士别三日wyx 「作者简介」&#xff1a;CSDN top100、阿里云博客专家、华为云享专家、网络安全领域优质创作者 「推荐专栏」&#xff1a;小白零基础《Python入门到精通》 os模块详解 1、文件目录操作os.stat() 获取文件状态os.utime() 修改文件时间os.r…

IPC之三:使用 System V 消息队列进行进程间通信的实例

IPC 是 Linux 编程中一个重要的概念&#xff0c;IPC 有多种方式&#xff0c;本文主要介绍消息队列(Message Queues)&#xff0c;消息队列可以完成同一台计算机上的进程之间的通信&#xff0c;相比较管道&#xff0c;消息队列要复杂一些&#xff0c;但使用起来更加灵活和方便&am…

FFmpeg中AVIOContext的使用

通过FFmpeg对视频进行编解码时&#xff0c;如果输入文件存在本机或通过USB摄像头、笔记本内置摄像头获取数据时&#xff0c;可通过avformat_open_input接口中的第二个参数直接指定即可。但如果待处理的视频数据存在于内存块中时&#xff0c;该如何指定&#xff0c;可通过FFmpeg…

用MiCoNE工具对16S序列数据进行共现网络分析

谷禾健康 微生物群通常由数百个物种组成的群落&#xff0c;这些物种之间存在复杂的相互作用。绘制微生物群落中不同物种之间的相互关系&#xff0c;对于理解和控制其结构和功能非常重要。 微生物群高通量测序的激增导致创建了数千个包含微生物丰度信息的数据集。这些丰度可以转…