消息的并发处理

看一下实现消息并发处理的代码,并发处理会增大实现流量控制、保证消息顺序方面的难度。

1 并发处理过程

处理效率的高低是反应Consumer实现好坏的重要指标,本节以ConsumeMessageConcurrentlyService类为例来分析RocketMQ的实现方式。ConsumeMessageConcurrentlyService类在org.apache.rocketmq.client.impl.consumer包中。

这个类定义了三个线程池,一个主线程池用来正常执行收到的消息,用户可以自定义通过consumeThreadMin和consumeThreadMax来自定义线程个数。另外两个都是单线程的线程池,一个用来执行推迟消费的消息,另一个用来定期清理超时消息(15分钟),如代码清单11-8所示。

代码清单11-8 三个线程池

this.consumeExecutor = new ThreadPoolExecutor(
    this.defaultMQPushConsumer.getConsumeThreadMin(),
    this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60,
    TimeUnit.MILLISECONDS, this.consumeRequestQueue,
    new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService =
    Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors =
    Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "CleanExpireMsgScheduledThread_"));

 

从Broker获取到一批消息以后,根据BatchSize的设置,把一批消息封装到一个ConsumeRequest中,然后把这个ConsumeRequest提交到consumeExecutor线程池中执行,如代码清单11-9所示。

代码清单11-9 任务分发逻辑

if (msgs.size() <= consumeBatchSize) {
    ConsumeRequest consumeRequest = new ConsumeRequest(msgs,
        processQueue, messageQueue);
    try {
        this.consumeExecutor.submit(consumeRequest);
    } catch (RejectedExecutionException e) {
        this.submitConsumeRequestLater(consumeRequest);
    }
} else {
    for (int total = 0; total < msgs.size(); ) {
        List<MessageExt> msgThis = new ArrayList<MessageExt>
            (consumeBatchSize);
        for (int i = 0; i < consumeBatchSize; i++, total++) {
            if (total < msgs.size()) {
                msgThis.add(msgs.get(total));
            } else {
                break;
            }
        }
        ConsumeRequest consumeRequest = new ConsumeRequest(msgThis,
            processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            for (; total < msgs.size(); total++) {
                msgThis.add(msgs.get(total));
            }

            this.submitConsumeRequestLater(consumeRequest);
        }
    }
}

 

消息的处理结果可能有不同的值,主要的两个是CONSUME_SUCCESS和RECONSUME_LATER。如果消费不成功,要把消息提交到上面说的scheduledExecutorService线程池中,5秒后再执行;如果消费模式是CLUSTERING模式,未消费成功的消息会先被发送回Broker,供这个ConsumerGroup里的其他Consumer消费,如果发送回Broker失败,再调用RECONSUME_LATER,消息消费的Status处理逻辑如代码清单11-10所示。

代码清单11-10 消息消费的Status处理逻辑

switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size
            (); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            log.warn("BROADCASTING, the message consume failed, drop " +
                "it, {}", msg.toString());
        }
        break;
    case CLUSTERING:
        List<MessageExt> msgBackFailed = new ArrayList<MessageExt>
            (consumeRequest.getMsgs().size());
        for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size
            (); i++) {
            MessageExt msg = consumeRequest.getMsgs().get(i);
            boolean result = this.sendMessageBack(msg, context);
            if (!result) {
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                msgBackFailed.add(msg);
            }
        }
        if (!msgBackFailed.isEmpty()) {
            consumeRequest.getMsgs().removeAll(msgBackFailed);
            this.submitConsumeRequestLater(msgBackFailed,
                consumeRequest.getProcessQueue(), consumeRequest
                    .getMessageQueue());
        }
        break;
    default:
        break;
}

 

处理逻辑是用户自定义的,当消息量大的时候,处理逻辑执行效率的高低影响系统的吞吐量。可以把多条消息组合起来处理,或者提高线程数,以提高系统的吞吐量。

2 ProcessQueue对象

在前面的源码中,有个ProcessQueue类型的对象,这个对象的功能是什么呢?从Broker获得的消息,因为是提交到线程池里并行执行,很难监控和控制执行状态,比如如何获得当前消息堆积的数量,如何解决处理超时情况等。RocketMQ定义了一个快照类ProcessQueue来解决这些问题,在PushConsumer运行的时候,每个Message Queue都会有一个对应的ProcessQueue对象,保存了这个Message Queue消息处理状态的快照,如代码清单11-11所示。ProcessQueue对象里主要的内容是一个TreeMap和一个读写锁。TreeMap里以Message Queue的Offset作为Key,以消息内容的引用为Value,保存了所有从MessageQueue获取到但是还未被处理的消息,读写锁控制着多个线程对TreeMap对象的并发访问。

代码清单11-11 保存消息消费的状态

private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();

 

有了ProcessQueue对象,可以随时停止、启动消息的消费,同时也可用于帮助实现顺序消费消息。顺序消息是通过ConsumeMessageOrderlyService类实现的,主要流程和ConsumeMessageConcurrentlyService类似,区别只是在对并发消费的控制上。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/157854.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

紧跟热点:教你如何快速掌握ChatGPT

2023年随着OpenAI开发者大会的召开&#xff0c;最重磅更新当属GPTs&#xff0c;多模态API&#xff0c;未来自定义专属的GPT。微软创始人比尔盖茨称ChatGPT的出现有着重大历史意义&#xff0c;不亚于互联网和个人电脑的问世。360创始人周鸿祎认为未来各行各业如果不能搭上这班车…

图解Spark Graphx基于connectedComponents函数实现连通图底层原理

原创/朱季谦 第一次写这么长的graphx源码解读&#xff0c;还是比较晦涩&#xff0c;有较多不足之处&#xff0c;争取改进。 一、连通图说明 连通图是指图中的任意两个顶点之间都存在路径相连而组成的一个子图。 用一个图来说明&#xff0c;例如&#xff0c;下面这个叫graph…

【教3妹学编程-算法题】最大异或乘积

3妹&#xff1a;2哥&#xff0c;你有没有看到新闻“18岁父亲为4岁儿子落户现身亲子鉴定” 2哥 : 啥&#xff1f;18岁就当爹啦&#xff1f; 3妹&#xff1a;确切的说是14岁好吧。 2哥 : 哎&#xff0c;想我30了&#xff0c; 还是个单身狗。 3妹&#xff1a;别急啊&#xff0c; 2…

已完结7个,再启动1个新项目,嘎嘎强!

作者&#xff1a;小傅哥 博客&#xff1a;https://bugstack.cn 沉淀、分享、成长&#xff0c;让自己和他人都能有所收获&#xff01;&#x1f604; 大家好&#xff0c;我是技术UP主小傅哥。 &#x1f490;又到了启动新项目的时候&#xff0c;死鬼开心嘛。小傅哥的星球&#xf…

数据库课后习题加真题

文章目录 第二章第三章第四到六章某年真题 第二章 第三章 3.8 对于教学数据库的三个基本表&#xff1a; s( 学号 ‾ \underline{学号} 学号​&#xff0c;姓名&#xff0c;年龄, 性别) sc( 学号 , 课程号 ‾ \underline{学号, 课程号} 学号,课程号​, 成绩) c( 课程号 ‾ \un…

【C++】类与对象(中)

一、类的默认成员函数 如果一个类中什么成员都没有&#xff0c;简称为空类。 空类中真的什么都没有吗&#xff1f;并不是&#xff0c;任何类在什么都不写时&#xff0c;编译器会自动生成以下6个默认成员函数。 默认成员函数&#xff1a;用户没有显式实现&#xff0c;编译器会自…

[超详细]基于YOLO&OpenCV的人流量统计监测系统(源码&部署教程)

1.图片识别 2.视频识别 [YOLOv7]基于YOLO&#xff06;Deepsort的人流量统计系统(源码&#xff06;部署教程)_哔哩哔哩_bilibili 3.Deepsort目标追踪 &#xff08;1&#xff09;获取原始视频帧 &#xff08;2&#xff09;利用目标检测器对视频帧中的目标进行检测 &#xff08…

oracle21c报错 【ORA-65096: 公用用户名或角色名无效】

1.数据库版本 oracle21c 2.问题提示 创建用户提示【ORA-65096: 公用用户名或角色名无效】 create user 自定义用户名 identified by 密码;--例:用户为test1&#xff0c;密码为123456 create user test1 identified by 123456;三.解决办法及结果 oracle11g之后的版本&#xff…

将kali系统放在U盘中插入电脑直接进入kali系统

首先准备一个空白的 U 盘。 Kali Linux | Penetration Testing and Ethical Hacking Linux Distribution 在 Windows 上制作 Kali 可启动 USB 驱动器 Making a Kali Bootable USB Drive on Windows | Kali Linux Documentation 1. 首先下载 .iso 镜像 Index of /kali-images…

构建 App 的方法

目录 构建 App 使用 App 设计工具以交互方式构建 App 使用 MATLAB 函数以编程方式构建 App 构建实时编辑器任务 可以使用 MATLAB 来构建可以集成到各种环境中的交互式用户界面。可以构建两种类型的用户界面&#xff1a; App - 基于用户交互执行操作的自包含界面 实时编辑器…

css中5种属性选择器

属性选择器 语法&#xff1a; [属性名]{} 选择含有指定属性的元素 [属性名属性值]{} 选择含有指定属性和属性值的元素 [属性名^属性值]{} 选择属性值以指定值开头的元素 [属性名$属性值]{} 选择属性值以指定值结尾的元素 [属性名*属性值]{} 选择属性值含有某值的元素 以下为演…

【HCSD大咖直播】亲授大厂面试秘诀【云驻共创】

同学们&#xff0c;毕业季是否找到了自己心仪的工作呢&#xff1f;是否了解大厂面试流程、要求以及技巧呢&#xff1f;华为云IoT高级工程师&#xff0c;传授大厂面试秘诀&#xff0c;教大家如何轻松get大厂offer&#xff01;提前为大厂面试做准备&#xff0c;赢在起跑线&#x…

uniapp和vue3+ts创建自定义下拉选择框组件

使用uniapp开发小程序的时候&#xff0c;使用了uview的ui组件&#xff0c;但是里面没有下拉选择组件&#xff0c;只有Picker 选择器&#xff0c;但是我们想要使用下拉选择的组件&#xff0c;所以需要自定义个一个下拉选择的自定义组件&#xff0c;我就只能自己动手创建这个自定…

43、vue导出pdf文件,并解决滚动条外内容无法获取的问题

使用插件html2canvas和jspdf插件 下载完两个插件后引入所需要的页面 import html2canvas from "html2canvas" import jsPDF from "jspdf"1、在导出之前将元素的高度或者宽度设置为滚动高度或者宽度&#xff0c;如&#xff1a; el.style.height el.scro…

【MySQL】CONCAT、CONCAT_WS、GROUP_CONCAT 函数用法

CONCAT 含义 将多个字符串连接成一个字符串 语法 CONCAT(str1, str2,...)数据模拟&#xff08;user_score&#xff09; idnamescore1小明652小红703小兰904(Null)1005小李(Null) SQL 代码 SELECT CONCAT(name, ,, score) AS score FROM user_score结果 score小明,65小红…

31、Flink的SQL Gateway介绍及示例

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…

梳理一名Go后端程序员日常用的软件~

大家好&#xff0c;我是豆小匠。 这期分享下我日常工作用到的软件和工具&#xff01; 省流版图片↓↓↓ 工具分为四类&#xff1a;编码软件、笔记/文档软件、开发工具和日常软件等。 1. 编码软件 1.1. Goland 出自JetBrain家族&#xff0c;IDE的王者&#xff0c;作为我的…

hash模式和history模式

在Vue Router中&#xff0c;有两种路由模式可供选择&#xff1a;hash模式和history模式。它们各自有一些优点和缺点&#xff0c;下面是它们的简要介绍&#xff1a; hash模式的原理是通过hashchange事件&#xff0c;通过监听hash变化来驱动界面变化。它的url中有 # 号 1、监听…

Vuetify:定制化、响应式的 Vue UI 库 | 开源日报 No.83

vuetifyjs/vuetify Stars: 38.1k License: MIT Vuetify 是一个无需设计技能的 UI 库&#xff0c;具有精美手工制作的 Vue 组件。它具有以下核心优势和主要功能&#xff1a; 可定制性&#xff1a;使用 SASS/SCSS 进行广泛自定义&#xff0c;并提供默认配置和蓝图。响应式布局&…

C++标准模板(STL)- 类型支持 (类型修改,从给定类型移除 const 或/与 volatile 限定符,std::remove_cv)

类型特性 类型特性定义一个编译时基于模板的结构&#xff0c;以查询或修改类型的属性。 试图特化定义于 <type_traits> 头文件的模板导致未定义行为&#xff0c;除了 std::common_type 可依照其所描述特化。 定义于<type_traits>头文件的模板可以用不完整类型实例…