解锁新姿势 | 如何用配置中心实现全局动态流控?

摘要: 当资源成为瓶颈时,服务框架需要对消费者做限流,启动流控保护机制。流量控制有多种策略,比较常用的有:针对访问速率的静态流控、针对资源占用的动态流控、针对消费者并发连接数的连接控制和针对并行访问数的并发控制。在分布式架构中,应用和应用之间的调用类型分为以下两种,流控方式也略有不同。

点此查看原文:https://yq.aliyun.com/articles/380180?spm=a2c41.11181499.0.0

当资源成为瓶颈时,服务框架需要对消费者做限流,启动流控保护机制。流量控制有多种策略,比较常用的有:针对访问速率的静态流控、针对资源占用的动态流控、针对消费者并发连接数的连接控制和针对并行访问数的并发控制。在实践中,各种流量控制策略需要综合使用才能起到较好的效果。

在分布式架构中,应用和应用之间的调用类型分为以下两种,流控方式也略有不同。

同步RPC类调用,比如RESTful,Dubbo,HSF等都属于该类。对于该类同步调用,通常限流方式为两种:针对服务提供者的并发全局流控,或针对服务消费者的并发局部流控。两种的控制手段类似,都是通过限制服务端或客服端并发调用数来进行限制。

异步MQ类调用,典型如RocketMQ, Kafka,等。对于该类异步调用,通常限流方式是在订阅端限流。限流方式为两种:针对消息订阅者的并发流控,或针对消息订阅者的消费延时流控。

针对消息订阅者的消费延时流控基本原理是,在每次客户端消费时,可以增加一个延时来控制消费速度,这样理论消费并发最快速度为:

MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber

比如如果消息并发消费线程为20,延时为100ms,则理论上可以将并发消费控制在200以下。具体公式如下:

200 = 1 / 0.1 * 20

相比并发线程数流控,消费延时流控优点在于实现相对简单,对MQ类客户端包依赖较少,不需要客户端提供控制并发线程数的动态调整接口。

以上各种流量控制方法,在分布式架构下,如果要做到全局动态控制,一个简单的技术方法是依赖配置中心,即通过配置中心来进行流控参数的下发。

下面章节详细介绍如何基于配置中心来实现异步消息消费的全局动态流控。使用的例子为阿里云上的 MQ (消息队列)和 ACM (应用配置管理)两款产品。

注:之所以用MQ为示例是因为在本文撰写之时,正好MQ Consumer Client SDK并不支持动态调整现成并发数,因此通过基于ACM来动态调整消费延迟的方法正好可以解决MQ消费流控动态的问题。

基于消费延时流控的基本原理

基本原理如下。其中,管理员或应用程序通过ACM控制台发布消费延时配置(RCV_INTERVAL_TIME),所有MQ消费程序订阅该配置。理论上,该配置从发布到下发所有客户端,可以在1秒内完成(取决于网络延时)。

图片描述

代码示例

该章节基于配置中心来实现异步消息消费的全局动态流控的代码示例。使用的例子为阿里云上的MQ(消息队列)和ACM(应用配置管理)两款产品,基于Java语言。关于SDK的详细介绍,可参见两款产品的官方文档。

在ACM上创建消费延时的参数,截屏如下。

图片描述

设置全局消费延时变量

首先,设置消费接收延时的全局变量, 如下。

     // 初始化消息接收延时参数,单位为millisecondstatic int RCV_INTERVAL_TIME = 10000;// 初始化配置服务,控制台通过示例代码自动获取下面参数ConfigService.init("acm.aliyun.com", /*租户ID*/"xxx", /*AK*/"xxx", /*SK*/"yyy");    // 主动获取配置String content = ConfigService.getConfig("app.mq.qos", "DEFAULT_GROUP", 6000);Properties p = new Properties();try {p.load(new StringReader(content));RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));} catch (IOException e) {e.printStackTrace();}

其次,设置ACM listener,确保当配置被修改时,即使更新 RCV_INTERVAL_TIME 参数, 如下。

     // 初始化的时候,给配置添加监听,配置变更会回调通知ConfigService.addListener("app.mq.qos", "DEFAULT_GROUP", new ConfigChangeListener() {public void receiveConfigInfo(String configInfo) {Properties p = new Properties();try {p.load(new StringReader(configInfo));RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME"));} catch (IOException e) {e.printStackTrace();}}});

设置 MQ 消费延时逻辑

完整实例如下。

注:这里 RCV_INTERVAL_TIME 参数的访问是故意没有加锁的,读者可以自行思考原因。Aliyun ONS Client不提供动态线程并发数,默认并发为20。因此这里正好使用消费延时参数来动态调节QoS。

    //以下代码可直接贴在Main()函数里Properties properties = new Properties();properties.put(PropertyKeyConst.ConsumerId, "CID_consumer_group");properties.put(PropertyKeyConst.AccessKey,"xxx");properties.put(PropertyKeyConst.SecretKey, "yyy");properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");// 设置 TCP 接入域名(此处以公共云生产环境为例)properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe(/*Topic*/"topic-name", /*Tag*/null, new MessageListener() {public Action consume(Message message, ConsumeContext context) {// MQ Subscribe QoS logical start, // Each consuming process will sleep for RCV_INTERVAL_TIME seconds with 100 ms sleeping cycle.// Within each cycle, the thread will check RCV_INTERVAL_TIME in case it's set to a smaller value. // RCV_INTERVAL_TIME <= 0 means no sleeping.int rcvIntervalTimeLeft = RCV_INTERVAL_TIME;while (rcvIntervalTimeLeft > 0) {if (rcvIntervalTimeLeft > RCV_INTERVAL_TIME) {rcvIntervalTimeLeft = RCV_INTERVAL_TIME;}try {if (rcvIntervalTimeLeft >= 100) {rcvIntervalTimeLeft -= 100;Thread.sleep(100);} else {Thread.sleep(rcvIntervalTimeLeft);rcvIntervalTimeLeft = 0;}} catch (InterruptedException e) {e.printStackTrace();}}// MQ Subscribe interval logical endsSystem.out.println("Receive: " + message);/** Put your business logic here.*/doSomething();return Action.CommitMessage;}});consumer.start();

运行结果

单机运行consumer进行消费,假设queue内的消息无限多,不存在消费万的情况,分三段测试,分别运行约5分钟,通过ACM配置推送来达到以下效果。

RCV_INTERVAL_TIME = 100 ms

RCV_INTERVAL_TIME = 5000 ms

RCV_INTERVAL_TIME = 1000 ms

结果如下,在单MQ消费业务处理耗时约100ms情况下的,单机并发20线程的测试结果。

RCV_INTERVAL_TIME = 100 ms:平均消费性能约为 9000 tpm 左右

RCV_INTERVAL_TIME = 5000 ms:平均消费性能被限制到了 200 tpm 左右

RCV_INTERVAL_TIME = 1000 ms:平均消费性能回升到到了 1100 tpm 左右

以上结果基本达到消费和 tpm 成反比的预期,最关键的是整个过程中,应用不中断,流控推送结果秒级生效到分布式集群。单机性能结果如下所示。

图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/522870.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

53K!拿下阿里Python岗,这些技术点全考了!

Python又上热搜了&#xff01;”&#xff0c;最近笔者在逛脉脉时&#xff0c;发现这样的一条信息&#xff1a;看完后&#xff0c;我相信大家和我一样&#xff0c;what&#xff0c;Python这么时候值钱了&#xff1f;本篇文章&#xff0c;我将帮大家搞定两大疑问&#xff1a;1. P…

用WEB技术栈开发NATIVE应用:WEEX SDK原理详解

摘要&#xff1a; WEEX依旧采取传统的web开发技术栈进行开发&#xff0c;同时app在终端的运行体验不输native app。其同时解决了开发效率、发版速度以及用户体验三个核心问题。那么WEEX是如何实现的&#xff1f;目前WEEX已经完全开源&#xff0c;并捐给Apache基金会&#xff0c…

什么是java枚举_什么是java枚举

什么是java枚举&#xff1f;java 枚举的定义与用法一、枚举的定义&#xff1a;枚举是一种特殊的数据类型&#xff0c;之所以特殊是因为它既是一种类(class)类型却又比类型多了些特殊的约束&#xff0c;但是这些约束的存在也造就了枚举类型的简洁&#xff0c;安全性以及便捷性。…

基于阿里云Serverless架构下函数计算的最新应用场景详解(一)

摘要&#xff1a; Serverless概念是近年来特别火的一个技术概念&#xff0c;基于这种架构能构建出很多应用场景&#xff0c;适合各行各业&#xff0c;只要对轻计算、高弹性、无状态等场景有诉求的用户都可以通过本文来普及一些基础概念&#xff0c;看看这些场景是否对用户有一些…

浪潮云完成6亿元B轮融资,正推进上市;VMware收购AI初创公司Bitfusion;小爱同学App在苹果应用商店下架……...

关注并标星星CSDN云计算极客头条&#xff1a;速递、最新、绝对有料。这里有企业新动、这里有业界要闻&#xff0c;打起十二分精神&#xff0c;紧跟fashion你可以的&#xff01;每周三次&#xff0c;打卡即read更快、更全了解泛云圈精彩newsgo go go 华为Mate30 Lite贴膜曝光&am…

基于阿里云Serverless架构下函数计算的最新应用场景详解(二)

摘要&#xff1a; Serverless概念是近年来特别火的一个技术概念&#xff0c;基于这种架构能构建出很多应用场景&#xff0c;适合各行各业&#xff0c;只要对轻计算、高弹性、无状态等场景有诉求的用户都可以通过本文来普及一些基础概念&#xff0c;看看这些场景是否对用户有一些…

增长黑客系列:今天比昨天增长多少?快使用环比函数来分析日志

摘要&#xff1a; 增长黑客系列&#xff1a;今天比昨天增长多少&#xff1f;快使用环比函数来分析日志 在我们平时分析业务时&#xff0c;一个最重要的指标就是&#xff0c;今天比昨天增长多少&#xff0c;本周比上周增长多少&#xff1b;或者同上一个周期相比增长最大的分类是…

云计算将会让数据中心消失?

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 王洪鹏出品 | CSDN云计算&#xff08;ID&#xff1a;CSDNcould&#xff09;近年来企业应用云化明显&#xff0c;越来越多的企业开始将自己的公司业务转移到云平台之上&#xff0c;可能是迁到公有云&#xff0c;也可能是私有云…

巧用 Img / JavaScript 采集页面数据

摘要&#xff1a; 当我们有一个新内容时&#xff08;例如新功能、新活动、新游戏、新文章&#xff09;&#xff0c;作为运营人员总是迫不及待地希望能尽快传达到用户&#xff0c;因为这是获取用户的第一步、也是最重要的一步。 点此查看原文:http://click.aliyun.com/m/40929/…

时间序列数据的存储和计算 - 开源时序数据库解析

摘要&#xff1a; Prometheus 开源时序数据库解析的系列文章在之前已经完成了几篇&#xff0c;对比分析了Hbase系的OpenTSDB、Cassandra系的KairosDB、BlueFlood及Heroic&#xff0c;最后是tsdb ranking top 1的InfluxDB。 点此查看原文&#xff1a;http://click.aliyun.com/m/…

flowable modler为任务节点增加自定义属性

如何在modler设计器中为任务节点&#xff0c;添加自定义的属性。 文章目录1、modler设计器中所有元素的定义2、flowable如何处理stencilset_bpmn.json文件3、为UserTask任务节点添加一个扩展属性3.1. 添加属性的名称3.2. 为任务节点添加扩展属性3.3. 查看效果1、modler设计器中…

微软对OpenAI投资10亿美元欲开发AI技术;华晨宝马宣布建成全球首个5G汽车生产基地;传苹果将收购英特尔调制解调器芯片业务...

关注并标星星CSDN云计算极客头条&#xff1a;速递、最新、绝对有料。这里有企业新动、这里有业界要闻&#xff0c;打起十二分精神&#xff0c;紧跟fashion你可以的&#xff01;每周三次&#xff0c;打卡即read更快、更全了解泛云圈精彩newsgo go go 索尼Xperia 1R信息曝光&…

java se 与j2se_javaSE和 j2SE的区别?

javaSE和 j2SE的区别?关注:71 答案:4 mip版解决时间 2021-01-28 18:29提问者耍硪ミ倪配么2021-01-27 20:19javaSE和 j2SE的区别?最佳答案二级知识专家自然卷的气质2021-01-27 21:53JavaEE 比 se多了许多包&#xff0c;用于开发大规模的&#xff0c;分布式的Java应用/服务器…

[高速通道进阶一]如何理解高速通道的就近接入和一点接入连接全球

摘要&#xff1a; 经常有用户问&#xff0c;我在上海有个IDC&#xff0c;在北京有个VPC&#xff0c;我想通过专线把IDC和VPC连接起来&#xff0c;我是不是要找运营商直接拉一根上海到北京的专线&#xff1f;其实不是这样的&#xff0c;用户只需要接入到最近的接入点即可。高速通…

Flowable决策任务(decision task)

摘要&#xff1a;Flowable中引入了一个decision task&#xff08;我们可以将其称之为决策任务&#xff09;。在讲解decision task之前&#xff0c;我们不妨看一下dmn引擎。因为如果大家不了解dmn&#xff0c;那肯定不知道如何使用decision task了。 dmn是decision Modeling Not…

5G精华问答 | 5G技术已经成熟了吗?

1G时我们用手机打电话&#xff0c;2G时我们能互发短信、看文字信息&#xff0c;3G时上网看图片&#xff0c;而4G时我们看视频和直播&#xff0c;从1G到4G&#xff0c;不仅信号越来越好&#xff0c;安全性越来越高&#xff0c;上网也越来越快了。1Q&#xff1a;5G技术已经成熟了…

PyODPS DataFrame:统一的数据查询语言

摘要&#xff1a; 前几天&#xff0c;PyODPS发布了0.7版本&#xff0c;这篇文章给大家介绍下PyODPS新版本带来的重要特性。 之前也有若干篇文章介绍过了&#xff0c;我们PyODPS DataFrame是延迟执行的&#xff0c;在调用立即执行的方法&#xff0c;比如execute、persist等之前&…

AliOS Things 持续集成(CI)系统介绍

摘要&#xff1a; AliOS Things在快速的迭代进化之中&#xff0c;如何保证提交的代码质量&#xff0c;并保证在各个硬件平台上的稳定性&#xff0c;是一个非常大的挑战。同时物联网硬件碎片化&#xff0c;资源紧张&#xff0c;对持续集成&#xff08;CI&#xff09;系统也提出了…

从青铜到王者,助力企业轻松上云的四大绝招!

戳蓝字“CSDN云计算”关注我们哦!IBM在7月份发生了很多大事&#xff0c;其中这两件你知道吗&#xff1f;第一&#xff0c;IBM&#xff08;NYSE: IBM&#xff09;与美国电话电报公司&#xff08;AT&T&#xff09;&#xff08;NYSE: T&#xff09;达成一项为期多年的战略联盟…

PyODPS 中使用 Python UDF

摘要&#xff1a; PyODPS 中使用 Python UDF 包含两方面&#xff0c;一个是直接使用&#xff0c;也就是在 MaxCompute SQL 中使用&#xff1b;一个是间接的方式&#xff0c;也就是 PyODPS DataFrame&#xff0c;这种方式你不需要直接写 Python UDF&#xff0c;而是写普通的 Pyt…