rocketmq实现限流

目录

问题背景

技术方向

方案确认

消息队列(√)

分布式锁(×)

方案实现

监控方向

业务方向


问题背景

公司邮件服务token有 分钟内超200封的熔断机制,当前token被熔断后,系统发邮件操作会被忽略,所以邮件服务也没有重试操作

人工发现token被熔断后,需要联系邮件群中值班人,将token恢复

分货业务依赖邮件来查看分货通知以及结果,并且分货层层依赖,如果不能及时收到邮件会影响业务的分货时效等,所以通过三个方面去解决这个问题

技术方向

系统内发邮件收口做限流

方案确认

方向:限流发邮件方法1分钟内最大200次

实现:改造系统发邮件底层方法,1分钟内最多发200个

消息队列(√)

面临问题:多出来的怎么处理?消息队列(需要持久化)

实现:新建一个topic,调用发邮件方法的请求全部扔到MQ中,自己消费,通过设置消费者的拉取间隔以及最大拉取数量限制,分钟内消费消息条数不超过200条

面临问题:多分区多消费者?

实现:默认拉取数量为32,目前MQ服务端设置,限制最大拉取数量为32

(可行)设置1个分区,一个消费者组,目前有2个实例(此时其中一个实例不会消费),设置拉取间隔为10s

(不可行,有自动加实例机制)设置2个分区,一个消费者组,目前有2个实例,设置拉取间隔为10s,最大拉取条数为16;系统在流量激增的情况下会增加实例来分摊流量

最终实现方式

topic设置1个分区,一个消费者组,使用默认负载均衡策略:平均分配

//平均分配负载均衡核心逻辑
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);for(int i = 0; i < range; ++i) {result.add(mqAll.get((startIndex + i) % mqAll.size()));
}return result;

解析两个实例负载均衡过程

//第一个实例起来,触发负载均衡
//index = 0
int index = cidAll.indexOf(currentCID);
//mod = 1%2 = 1
int mod = mqAll.size() % cidAll.size();
//averageSize = 1
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
//startIndex = 0 * 1 = 0
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
//range = min(1, 1 - 0) = 1
int range = Math.min(averageSize, mqAll.size() - startIndex);for(int i = 0; i < range; ++i) {//(0+0)%1 = 0,所以将第一个分区分给当前实例result.add(mqAll.get((startIndex + i) % mqAll.size()));
}//第二个实例起来,触发负载均衡
//index = 1
int index = cidAll.indexOf(currentCID);
//mod = 1%2 = 1
int mod = mqAll.size() % cidAll.size();
//averageSize = 1
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
//startIndex = 1 * 1 + 1 = 2
int startIndex = mod > 0 && index < mod ? index * averageSize : index * averageSize + mod;
//range = min(1, 1 - 2) = -1
int range = Math.min(averageSize, mqAll.size() - startIndex);//不会进入循环分配分区
for(int i = 0; i < range; ++i) {result.add(mqAll.get((startIndex + i) % mqAll.size()));
}

所以只会有一个实例去消费当前这个分区,在集群消费模式下,一个分区只会被消费组内的一个消费者消费,rmq默认拉取数量为32,设置拉取间隔为10s,所以每分钟内消费:32*6 = 192

分布式锁(×)

当前场景的点,在于需要将超出1分钟两百条的那些邮件持久化存储,等到下一个一分钟去发送,而分布式锁只能实现控制接口的流量,没法保证超出流量那部分的存储,所以没法解决当前问题

方案实现

最终采用消息队列,RocketMQ解决该问题

实现代码,使用Java SDK,设置拉取间隔为10s即可

public void run(String... args) throws Exception {Properties properties = new Properties();properties.setProperty(ConfigKey.CONSUMER_GROUP, emailNotifyMqProperties.getConsumerGroup());properties.setProperty(ConfigKey.ACCESS_KEY, rocketMqProperties.getAccessKey());properties.setProperty(ConfigKey.SECRET_KEY, rocketMqProperties.getSecretKey());properties.setProperty(ConfigKey.NAME_SERVER_ADDR, rocketMqProperties.getServer());properties.setProperty(ConfigKey.ENABLE_MSG_TRACE, "true");//消费限流:解决发邮件分钟内超过200封会被熔断的问题properties.setProperty(ConfigKey.PULL_INTERVAL, "10000");NormalConsumer consumer = ClientFactory.createNormalConsumer(properties, this::consumeMessage);consumer.subscribe(emailNotifyMqProperties.getTopic(), null);consumer.start();
}

监控方向

1. 系统日志报警,配置邮件发送失败报警

2. 关注token熔断消息通知

业务方向

梳理当前系统中邮件通知的场景,分析报警内容,从以下方向减少邮件次数发送

1. 用户是否需要关注(用户长时间使用下来,部分通知发现自己并不关注的,比如节点报错可重试成功的

2. 是否可以批量发送(多条通知集合到一条邮件发送:多用户,多单据等)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/41856.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

责任链模式(大话设计模式)C/C++版本

责任链模式 C #include <iostream> #include <memory>using namespace std; // 请求类 struct Request {std::string requestType; // 请求类型int number; // 该请求类型的数量std::string requestContent; // 请求内容 };// 抽象经理类 clas…

MySQL学习记录 —— 십칠 CentOS7.9环境下的MySQL8.4 安装和配置

文章目录 1、安装和配置2、MySQL 包位置3、主要程序介绍 本篇开始在之前mysql博客的基础上继续延伸&#xff0c;适合有一定基础的mysql使用者阅读 环境 &#xff1a;CentOS 7.9 root 用户&#xff0c;MySQL 8.4 1、安装和配置 看一下当前系统版本 cat /etc/redhat-release应当…

前端重点之:Vue+websocket通信详细用法和websocket心跳机制的使用,websocket断开实时监测,websocket实时通信

今年年初找工作,好多gou面试官总喜欢问关于websocket通信的使用方式,此次又用到了,在此做个总结:主要包含websocket的具体使用方法,和重点:(心跳机制的使用),就是主要是前端实时监测websocket是否有断连和数据的处理 在前端开发中,WebSocket 是一种常见的技术,用于…

众所周知沃尔玛1P是怎么运营?

​​沃尔玛的1P模式&#xff0c;即第一方供应商模式&#xff0c;是其独特的采购策略。在这种模式下&#xff0c;供应商先将商品卖给沃尔玛&#xff0c;由沃尔玛负责库存管理和销售。沃尔玛通过强大的采购和物流能力控制库存&#xff0c;确保商品品质&#xff0c;为客户提供更加…

FPGA问题

fpga 问题 第一道坎&#xff0c;安装软件&#xff1b;没有注册&#xff0c;无法产生sop文件&#xff0c;无法下载 没有相应的库的quartus ii版本&#xff0c;需要另下载 第二道坎&#xff0c;模拟器的下载&#xff0c;安装&#xff1b; 第三道&#xff0c;verilog 语法&#x…

deepspeed huggingface传入参数 optimizer和lr_scheduler测试

Trainer中 首先&#xff1a; WarmupDecayLR --lr_scheduler_type linear WarmupLR --lr_scheduler_type constant_with_warmup 1 TrainArgument不传lr_scheduler_type、optim&#xff0c;warmup_steps15 ds config文件中定义如下&#xff1a; 注意&#xff1a;如果不在Trai…

LangChain(四)工具调用的底层原理!给大模型按上双手吧!(新手向)

背景 经过前面三篇的内容&#xff0c;我想大家对于大模型的构建、Langchain的优势、Chain的构建有了相当程度的理解&#xff08;虽然只是最简单的示例&#xff0c;但是足够有代表性&#xff09;。 后续Chain的使用将会更加丰富多彩&#xff0c;您会了解Langchain开发的大模型…

14-31 剑和诗人5 - 使用 AirLLM 和分层推理在单个 4GB GPU 上运行 LLama 3 70B

利用分层推理实现大模型语言(LLM) 大型语言模型 (LLM) 领域最近取得了显著进展&#xff0c;LLaMa 3 70B 等模型突破了之前认为可能实现的极限。然而&#xff0c;这些模型的庞大规模给其部署和实际使用带来了巨大挑战&#xff0c;尤其是在资源受限的设备上&#xff0c;例如内存…

怎么压缩pdf文件的大小?减小PDF文件大小的四种方法

怎么压缩pdf文件的大小&#xff1f;文件大小不仅影响传输速度&#xff0c;还可能涉及存储空间的管理。当处理大型PDF文件时&#xff0c;可能会面临电子邮件附件限制或云存储容量不足的问题。此外&#xff0c;过大的文件在浏览和加载时也会导致延迟&#xff0c;影响阅读体验。这…

3款自己电脑就可以运行AI LLM的项目

AnythingLLM、LocalGPT和PrivateGPT都是与大语言模型&#xff08;LLM&#xff09;相关的项目&#xff0c;它们允许用户在本地环境中与文档进行交互&#xff0c;但它们在实现方式和特点上存在一些差异。AnythingLLM使用Pinecone和ChromaDB来处理矢量嵌入&#xff0c;并使用OpenA…

【C语言】return 关键字详解

在C语言中&#xff0c;return是一个关键字&#xff0c;用于从函数中返回值或者结束函数的执行。它是函数的重要组成部分&#xff0c;负责将函数的计算结果返回给调用者&#xff0c;并可以提前终止函数的执行。 主要用途和原理&#xff1a; 返回值给调用者&#xff1a; 当函数执…

【论文阅读】-- Visual Traffic Jam Analysis Based on Trajectory Data

基于轨迹数据的可视化交通拥堵分析 摘要1 引言2 相关工作2.1 交通事件检测2.2 交通可视化2.3 传播图可视化 3 概述3.1 设计要求3.2 输入数据说明3.3 交通拥堵数据模型3.4 工作流程 4 预处理4.1 路网处理4.2 GPS数据清理4.3 地图匹配4.4 道路速度计算4.5 交通拥堵检测4.6 传播图…

掌握【Python异常处理】:打造健壮代码的现代编程指南

目录 ​编辑 1. 什么是异常&#xff1f; 知识点 示例 小李的理解 2. 常见的内置异常类型 知识点 示例 小李的理解 3. 异常机制的意义 知识点 示例 小李的理解 4. 如何处理异常 知识点 示例 小李的理解 5. 抛出异常 知识点 示例 小李的理解 6. Python内置…

Springboot整合Jsch-Sftp

背景 开发一个基于jsch的sftp工具类&#xff0c;方便在以后的项目中使用。写代码的过程记录下来&#xff0c;作为备忘录。。。 Maven依赖 springboot依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-par…

codeforces 1633A

文章目录 1. 题目链接2. 题目代码正确代码错误代码 3. 题目总结 1. 题目链接 Div. 7 2. 题目代码 正确代码 #include<iostream> using namespace std; int main(){int testCase;cin >> testCase;while(testCase --){int ingeter;cin >> ingeter;if(!(inget…

SpringBoot彩蛋之定制启动画面

写在前面 在日常开发中&#xff0c;我们经常会看到各种各样的启动画面。例如以下几种 ① spring项目启动画面 ② mybatisplus启动画面 ③若依项目启动画面 还有很多各式各样好看的启动画面&#xff0c;那么怎么定制这些启动画面呢&#xff1f; 一、小试牛刀 ① 新建一个Spr…

SQL 之 concat_ws和concat的区别

concat_ws和concat都是用于连接字符串的函数&#xff0c;但它们在使用上有一些区别&#xff1a; 一、concat、concat_ws函数格式&#xff1a; concat格式&#xff1a; concat&#xff08;参数1,参数2,…参数n&#xff09;&#xff0c;如果要加’分隔符’直接写在 各参数中间就…

关于微信支付-商户平台:查询订单提示“查询失败:操作失败,请稍候重试”的分析

目录 引子 分析 应对 小结 引子 在开发和实施微信 JSAPI 支付的应用后&#xff0c;我们遇到了一些问题&#xff0c;订单的状态更新不正常&#xff0c;当然我们首先需要从自身寻找原因和完善解决问题的办法和方案。在支付的过程中&#xff0c;客户会给我们一些反馈&#xf…

Open-Sora1.2环境搭建推理测试

引子 前阵子写了一篇Open-Sora1.0环境搭建&推理测试&#xff08;Open-Sora1.0环境搭建&推理测试_自己搭建sora服务-CSDN博客&#xff0c;感兴趣的童鞋&#xff0c;请移步&#xff09;。Open-Sora1.1发布的时候&#xff0c;撇了一眼新闻。后面一转头&#xff0c;忘记这…

ARL联动AWVS实现自动化漏洞扫描

0x01 前言 很多场景下需要大范围的扫描漏洞和快速排查互联网暴露面的漏洞&#xff0c;需要使用这种自动化的手段&#xff0c;常规渗透测试的找互联网暴露面是&#xff0c;域名>子域名>IP>C段>端口&#xff0c;可以手动收集&#xff0c;也可以借助一些网络搜索引擎…