高可用服务 AHAS 在消息队列 MQ 削峰填谷场景下的应用

在消息队列中,当消费者去消费消息的时候,无论是通过 pull 的方式还是 push 的方式,都可能会出现大批量的消息突刺。如果此时要处理所有消息,很可能会导致系统负载过高,影响稳定性。但其实可能后面几秒之内都没有消息投递,若直接把多余的消息丢掉则没有充分利用系统处理消息的能力。我们希望可以把消息突刺均摊到一段时间内,让系统负载保持在消息处理水位之下的同时尽可能地处理更多消息,从而起到“削峰填谷”的效果:

上图中红色的部分代表超出消息处理能力的部分。

我们可以看到消息突刺往往都是瞬时的、不规律的,其后一段时间系统往往都会有空闲资源。我们希望把红色的那部分消息平摊到后面空闲时去处理,这样既可以保证系统负载处在一个稳定的水位,又可以尽可能地处理更多消息,这时候我们就需要一个能够控制消费端消息匀速处理的利器 — AHAS 流控降级,来为消息队列削峰填谷,保驾护航。

AHAS 是如何削峰填谷的

AHAS 的流控降级是面向分布式服务架构的专业流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统保护等多个维度来帮助您保障服务的稳定性,同时提供强大的聚合监控和历史监控查询功能。

AHAS 专门为这种场景提供了匀速排队的控制特性,可以把突然到来的大量请求以匀速的形式均摊,以固定的间隔时间让请求通过,以稳定的速度逐步处理这些请求,起到“削峰填谷”的效果,从而避免流量突刺造成系统负载过高。同时堆积的请求将会排队,逐步进行处理;当请求排队预计超过最大超时时长的时候则直接拒绝,而不是拒绝全部请求。

比如在 RocketMQ 的场景下配置了匀速模式下请求 QPS 为 5,则会每 200 ms 处理一条消息,多余的处理任务将排队;同时设置了超时时间,预计排队时长超过超时时间的处理任务将会直接被拒绝。示意图如下图所示:

RocketMQ Consumer 接入示例

本部分将引导您快速在 RocketMQ 消费端接入 AHAS 流控降级 Sentinel。

1. 开通 AHAS

首先您需要到AHAS 控制台开通 AHAS 功能(免费)。可以根据 开通 AHAS 文档 里面的指引进行开通。

2. 代码改造

在结合阿里云 RocketMQ Client 使用 Sentinel 时,用户需要引入 AHAS Sentinel 的依赖 ahas-sentinel-client (以 Maven 为例):

<dependency><groupId>com.alibaba.csp</groupId><artifactId>ahas-sentinel-client</artifactId><version>1.1.0</version>
</dependency>

由于 RocketMQ Client 未提供相应拦截机制,而且每次收到都可能是批量的消息,因此用户在处理消息时需要手动进行资源定义(埋点)。我们可以在处理消息的逻辑处手动进行埋点,资源名可以根据需要来确定(如 groupId + topic 的组合):

    private static Action handleMessage(Message message, String groupId, String topic) {Entry entry = null;try {// 资源名称为 groupId 和 topic 的组合,便于标识,同时可以针对不同的 groupId 和 topic 配置不同的规则entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic);// 在此处编写真实的处理逻辑System.out.println(System.currentTimeMillis() + " | handling message: " + message);return Action.CommitMessage;} catch (BlockException ex) {// 在编写处理被流控的逻辑// 示例:可以在此处记录错误或进行重试System.err.println("Blocked, will retry later: " + message);return Action.ReconsumeLater; // 会触发消息重新投递} finally {if (entry != null) {entry.exit();}}}

消费者订阅消息的逻辑示例:

Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(topic, "*", (message, context) -> {return handleMessage(message);
});
consumer.start();

更多关于 RocketMQ SDK 的信息可以参考 消息队列 RocketMQ 入门文档。

3. 获取 AHAS 启动参数

注意:若在本地运行接入 AHAS Sentinel 控制台需要在页面左上角选择 公网 环境,若在阿里云 ECS 环境则在页面左上角选择对应的 Region 环境。

我们可以进入 AHAS 控制台,点击左侧侧边栏的 流控降级,进入 AHAS 流控降级控制台应用总览页面。在页面右上角,单击添加应用,选择 SDK 接入页签,到 配置启动参数 页签拿到需要的启动参数(详情请参考 SDK 接入文档),类似于:

-Dproject.name=AppName -Dahas.license=<License>

其中 project.name 配置项代表应用名(会显示在控制台,比如 MqConsumerDemo),ahas.license配置项代表自己的授权 license(ECS 环境不需要此项)。

4. 启动 Consumer,配置规则

接下来我们添加获取到的启动参数,启动修改好的 Consumer 应用。由于 AHAS 流控降级需要进行资源调用才能触发初始化,因此首先需要向对应 group/topic 发送一条消息触发初始化。消费端接收到消息后,我们就可以在 AHAS Sentinel 控制台上看到我们的应用了。点击应用卡片,进入详情页面后点击左侧侧边栏的“机器列表”。我们可以在机器列表页面看到刚刚接入的机器,代表接入成功:

点击“请求链路”页面,我们可以看到之前定义的资源。点击右边的“流控”按钮添加新的流控规则:

我们在“流控方式”中选择“排队等待”,设置 QPS 为 10,代表每 100ms 匀速通过一个请求;并且设置最大超时时长为 2000ms,超出此超时时间的请求将不会排队,立即拒绝。配置完成后点击新建按钮。

5. 发送消息,查看效果

下面我们可以在 Producer 端批量发送消息,然后在 Consumer 端的控制台输出处观察效果。可以看到消息消费的速率是匀速的,大约每 100 ms 消费一条消息:

1550732955137 | handling message: Hello MQ 2453
1550732955236 | handling message: Hello MQ 9162
1550732955338 | handling message: Hello MQ 4944
1550732955438 | handling message: Hello MQ 5582
1550732955538 | handling message: Hello MQ 4493
1550732955637 | handling message: Hello MQ 3036
1550732955738 | handling message: Hello MQ 1381
1550732955834 | handling message: Hello MQ 1450
1550732955937 | handling message: Hello MQ 5871

同时不断有排队的处理任务完成,超出等待时长的处理请求直接被拒绝。注意在处理请求被拒绝的时候,需要根据需求决定是否需要重新消费消息。

我们也可以点击左侧侧边栏的“监控详情”进入监控详情页面,查看处理消息的监控曲线:

对比普通限流模式的监控曲线(最右面的部分):

如果不开启匀速模式,只是普通的限流模式,则只会同时处理 10 条消息,其余的全部被拒绝,即使后面的时间系统资源充足多余的请求也无法被处理,因而浪费了许多空闲资源。两种模式对比说明匀速模式下消息处理能力得到了更好的利用。

Kafka 接入代码示例

Kafka 消费端接入 AHAS 流控降级的思路与上面的 RocketMQ 类似,这里给出一个简单的代码示例:

private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) {pool.submit(() -> {Entry entry = null;try {// 资源名称为 groupId 和 topic 的组合,便于标识,同时可以针对不同的 groupId 和 topic 配置不同的规则entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic);// 在此处理消息.System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString());} catch (BlockException ex) {// Blocked.// NOTE: 在处理请求被拒绝的时候,需要根据需求决定是否需要重新消费消息System.err.println("Blocked: " + record.toString());} finally {if (entry != null) {entry.exit();}}});
}

消费消息的逻辑:

while (true) {try {ConsumerRecords<String, String> records = consumer.poll(1000);// 必须在下次 poll 之前消费完这些数据, 且总耗时不得超过 SESSION_TIMEOUT_MS_CONFIG// 建议开一个单独的线程池来消费消息,然后异步返回结果for (ConsumerRecord<String, String> record : records) {handleMessage(record, groupId, topic);}} catch (Exception e) {try {Thread.sleep(1000);} catch (Throwable ignore) {}e.printStackTrace();}
}

其它

以上介绍的只是 AHAS 流控降级的其中一个场景 —— 请求匀速,它还可以处理更复杂的各种情况,比如:

  • 流量控制:可以针对不同的调用关系,以不同的运行指标(如 QPS、线程数、系统负载等)为基准,对资源调用进行流量控制,将随机的请求调整成合适的形状(请求匀速、Warm Up 等)。
  • 熔断降级:当调用链路中某个资源出现不稳定的情况,如平均 RT 增高、异常比例升高的时候,会使对此资源的调用请求快速失败,避免影响其它的资源导致级联失败。
  • 系统负载保护:对系统的维度提供保护。当系统负载较高的时候,提供了对应的保护机制,让系统的入口流量和系统的负载达到一个平衡,保证系统在能力范围之内处理最多的请求。


原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/519633.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

32岁程序员,补偿N+2:“谢谢裁我,让我翻倍!” 网友:榜样!

2019年的冬天&#xff0c;“冷”的有些频繁。12月19日&#xff0c;《马蜂窝被曝裁员40% UGC模式变现难&#xff1f;》爆火&#xff0c;据悉马蜂窝将裁员40%&#xff0c;交易中心成了“重灾区”&#xff0c;赔偿N2&#xff0c;留下的除搜索推荐、内容中心等核心部门外&#xff0…

TableStore:爬虫数据存储和查询利器

TableStore是阿里云自研的在线数据平台&#xff0c;提供高可靠的存储&#xff0c;实时和丰富的查询功能&#xff0c;适用于结构化、半结构化的海量数据存储以及各种查询、分析。 爬虫数据特点 在众多大数据场景中&#xff0c;爬虫类型的数据非常适合存储在TableStore。主要是…

PageHelper分页时超过最大数量的页数仍然返回数据,PageHelper分页失效

最近使用PageHelper来进行分页查询&#xff0c;发现一个问题&#xff1a;明明查询出来的总数只有5个&#xff0c;分页的时候每页10个&#xff0c;按理说只有第一页返回会有数据&#xff0c;第二页开始就没有数据了&#xff0c;但是实际情况却是第二页返回的数据与第一页一致&am…

三七女生节,解密阿里女程序员们的代码诗!

女生节快要到了&#xff0c;小编火速在阿里的程序员群体中发掘出了一群才华横溢的程序媛&#xff01;今天就和大家分享下&#xff0c;阿里背后写代码、修bug的女生们&#xff01; 巾帼不让须眉&#xff0c;如今越来越多的女性同胞参与到IT行业&#xff0c;这个行业因为她们的参…

云+X案例展 | 传播类:九州云 SD-WAN 携手上海电信,助力政企客户网络重构换新颜...

戳蓝字“CSDN云计算”关注我们哦&#xff01;本案例由九州云投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击阅读原文丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“…

利用blink+MQ实现流计算中的超时统计问题

案例与解决方案汇总页&#xff1a;阿里云实时计算产品案例&解决方案汇总 一. 背景介绍 菜鸟的物流数据本身就有链路复杂、实操节点多、汇总维度多、考核逻辑复杂的特点&#xff0c;对于实时数据的计算存在很大挑战。经过仓配ETL团队的努力&#xff0c;目前仓配实时数据已覆…

Spring Schema整合Quartz_01

文章目录一、实现思路二、第一种实现方式2.1. 新建web项目2.2. 导入依赖2.3. 创建一个job类2.4. 创建配置文件2.5. 配置web.xml2.6.运行web服务&#xff0c;观察Quartz定时任务三、第二种实现方式3.1. 创建job类3.2. 修改spring-config.xml3.3. 运行web服务&#xff0c;观察Qua…

使用Grab的实验平台进行混沌实验编排

背景 对每个用户来说&#xff0c;Grab是一个可以叫车&#xff0c;叫外卖或付款的一个APP。对工程师来说&#xff0c;Grab是一个有许多服务并通过RPC交互的分布式系统&#xff0c;有时也可以叫做微服务架构。在数千台服务器上运行的数百个服务每天都有工程师在上面进行变更。每…

c++ 麦克风 录音 wav_小米有品上线新品,手机麦克风得到史诗级加强

手机里面是自带录音功能的&#xff0c;所以很多朋友都喜欢用手机来记录会议等内容&#xff0c;但是手机自带的麦克风用来录音并不合适&#xff0c;要么声音小&#xff0c;要么录下来的都是杂音&#xff0c;难道非要买一个专用的录音笔或者麦克风吗&#xff1f;其实没有那个必要…

云+X案例展 | 民生类: “中企通信 × TutorABC”共创全球数字教育科技新里程

本案例由中企通信投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。TutorABC荣获IDC 【讯…

ESB接口调用异常汇总

文章目录一、ESB接口前置知识1. ESB接口简述2. 生成的代码组成部分二、常见的异常汇总2.1. 场景1&#xff1a;不能解析某域名2.2. 场景2&#xff1a;调用服务连接超时三、调用服务前异常3.1. 异常描述3.2. CXF相关的jar和jdk的rt.jar中冲突3.3. 异常现象3.4. 异常日志3.5. 异常…

基于实时计算(Flink)与高斯模型构建实时异常检测系统

案例与解决方案汇总页&#xff1a;阿里云实时计算产品案例&解决方案汇总 1. 概述 异常检测&#xff08;anomaly detection&#xff09;指的是对不符合预期模式或数据集&#xff08;英语&#xff1a;dataset&#xff09;中其他项目的项目、事件或观测值的识别。实际应用包括…

hive 中某个字段等于0_快速了解hive

作者丨HappyMint文章选摘&#xff1a;大数据与人工智能这是作者的第7篇文章本文主要针对从事大数据分析和架构相关工作&#xff0c;需要与hive打交道但目前对hive还没有进行深层次了解的小伙伴&#xff0c;希望本文会让你对hive有一个快速的了解。内容主要包括什么是hive、为什…

利用blink CEP实现流计算中的超时统计问题

案例与解决方案汇总页&#xff1a;阿里云实时计算产品案例&解决方案汇总 一. 背景介绍 如<利用blinkMQ实现流计算中的延时统计问题>一文中所描述的场景&#xff0c;我们将其简化为以下案例&#xff1a; 实时流的数据源结构如下&#xff1a; 物流订单号支付时间仓接…

PPT素材网

PPT素材推荐 官网&#xff1a;http://www.1ppt.com/ 背景色采用这个&#xff0c;模板才用这个 简洁微立体创业融资计划书PPT模板免费下载 http://www.1ppt.com/article/33315.html

云+X案例展 | 民生类:中国电信天翼云携手国家天文台打造“大国重器”

本案例由天翼云投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。FAST是由中国科学院国家…

阿里云TSDB在大数据集群监控中的方案与实战

目前大部分的互联网企业基本上都有搭建自己的大数据集群&#xff0c;为了能更好让我们的大数据集群更加高效安全的工作&#xff0c;一个优秀的监控方案是必不可少的&#xff1b;所以今天给大家带来的这篇文章就是讲阿里云TSDB在上海某大型互联网企业中的大数据集群监控方案中的…

linux上java解加密(AES/CBC)异常:java.lang.SecurityException: JCE cannot authenticate the provider BC办法

对接第三方厂商需求时&#xff0c;需要对数据AES256进行解密&#xff0c;由于java本身不支持&#xff0c;需要添加依赖。 文章目录一、版本适配1. 版本对应关系2. maven仓库地址3. maven坐标二、linux jdk策略下载2.1. JDK6 jce2.2. JDK7 jce2.3. JDK8 jce三、linux jdk策略配置…

云+X案例展 | 民生类:易趋云全面提升三德科技管理效能

本案例由深圳蓝云投递并参与评选&#xff0c;CSDN云计算独家全网首发&#xff1b;更多关于【云X 案例征集】的相关信息&#xff0c;点击了解详情丨挖掘展现更多优秀案例&#xff0c;为不同行业领域带来启迪&#xff0c;进而推动整个“云行业”的健康发展。湖南三德科技股份有限…

redis 公网 安全_redis漏洞复现

一、漏洞简介什么是redisredis是一个key-value存储系统。和Memcached类似&#xff0c;它支持存储的value类型相对更多&#xff0c;包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash&#xff08;哈希类型&#xff09;。这些数据类型都支持push/po…