如何使用队列处理 API 速率限制

对于遇到速率限制的应用程序来说也是一个挑战,因为它需要“放慢速度”或暂停。这是一个典型的场景:

  • 初始请求:当应用程序发起与 API 的通信时,它会请求特定的数据或功能。
  • API 响应: API 处理请求并响应请求的信息或执行所需的操作。
  • 速率限制:如果应用程序已达到限制,通常需要等到下一个指定的时间范围(例如一分钟到一小时)才能发出其他请求。如果它是“软”速率限制并且时间范围已知并且是线性的,则更容易处理。通常,每个区块的等待时间都会增加,需要对每个 API 进行完全不同的自定义处理。
  • 处理超出速率限制:如果应用程序超出速率限制,它可能会收到来自 API 的错误响应(例如“429 Too Many Requests”状态代码)。应用程序需要优雅地处理这个问题,可能是通过对请求进行排队、实施退避策略(在重试之前等待逐渐延长的时间)或通知用户已达到速率限制。

为了在速率限制内有效运行,应用程序通常采用以下策略:

  • 限制:调节传出请求的速率以符合 API 的速率限制。
  • 缓存:将频繁请求的数据存储在本地,以减少重复 API 调用的需要。
  • 指数退避:实施一种策略,使应用程序在达到速率限制后在后续重试之间等待的时间越来越长,以减少服务器负载并防止立即重试。
  • 队列? 下一节将详细介绍

使用队列

由于队列能够系统地处理任务,因此可以作为出色的“助手”或工具来帮助服务管理速率限制。然而,虽然它提供了显着的好处,但它并不是用于此目的的独立解决方案。

在构建健壮的架构时,用于与受速率限制的外部 API 交互的服务或应用程序通常会异步处理任务。该服务通常由从队列派生的任务启动。当服务遇到速率限制时,它可以轻松地将作业返回到主队列或将其分配到指定用于延迟任务的单独队列,并在特定的等待时间(例如 X 秒)后重新访问它。

这种对队列系统的依赖是非常有利的,主要是因为它的临时性质和排序。然而,仅靠队列并不能完全解决速率限制问题;它需要额外的功能或服务本身的帮助才能有效地处理这些限制。

使用队列时可能会出现挑战

  • 重新进入队列的任务可能会比必要的时间更早返回,因为它们的时间不直接由您的服务控制。
  • 由于在有限时间内频繁拨打电话而超出速率限制。这可能需要实施睡眠或等待机制,由于它们对性能和响应能力的潜在影响,通常被认为是不好的做法。

RabbitMQ

const amqp = require('amqplib');
const axios = require('axios');// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {try {const response = await axios.get(url);console.log('API Response:', response.data);} catch (error) {console.error('API Error:', error.message);}
}// Connect to RabbitMQ server
async function connect() {try {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'rateLimitedQueue';channel.assertQueue(queue, { durable: true });// Consume messages from the queuechannel.consume(queue, async msg => {const { url, delayInSeconds } = JSON.parse(msg.content.toString());// Simulating rate limitationawait new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));await makeAPICall(url); // Make the API callchannel.ack(msg); // Acknowledge message processing completion});} catch (error) {console.error('RabbitMQ Connection Error:', error.message);}
}// Function to send a message to the queue
async function addToQueue(url, delayInSeconds) {try {const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();const queue = 'rateLimitedQueue';channel.assertQueue(queue, { durable: true });const message = JSON.stringify({ url, delayInSeconds });channel.sendToQueue(queue, Buffer.from(message), { persistent: true });console.log('Task added to the queue');} catch (error) {console.error('RabbitMQ Error:', error.message);}
}// Usage example
addToQueue('https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds// Start the consumer
connect();

Kafka

const { Kafka } = require('kafkajs');
const axios = require('axios');// Function to make API requests, simulating rate limitations
async function makeAPICall(url) {try {const response = await axios.get(url);console.log('API Response:', response.data);} catch (error) {console.error('API Error:', error.message);}
}// Kafka configuration
const kafka = new Kafka({clientId: 'my-app',brokers: ['localhost:9092'], // Replace with your Kafka broker address
});// Create a Kafka producer
const producer = kafka.producer();// Connect to Kafka and send messages
async function produceToKafka(topic, message) {await producer.connect();await producer.send({topic,messages: [{ value: message }],});await producer.disconnect();
}// Create a Kafka consumer
const consumer = kafka.consumer({ groupId: 'my-group' });// Consume messages from Kafka topic
async function consumeFromKafka(topic) {await consumer.connect();await consumer.subscribe({ topic });await consumer.run({eachMessage: async ({ message }) => {const { url, delayInSeconds } = JSON.parse(message.value.toString());// Simulating rate limitationawait new Promise(resolve => setTimeout(resolve, delayInSeconds * 1000));await makeAPICall(url); // Make the API call},});
}// Usage example - Sending messages to Kafka topic
async function addToKafka(topic, url, delayInSeconds) {const message = JSON.stringify({ url, delayInSeconds });await produceToKafka(topic, message);console.log('Message added to Kafka topic');
}// Start consuming messages from Kafka topic
const kafkaTopic = 'rateLimitedTopic';
consumeFromKafka(kafkaTopic);// Usage example - Adding messages to Kafka topic
addToKafka('rateLimitedTopic', 'https://api.example.com/data', 5); // Add an API call with a delay of 5 seconds

这两种方法都是合法的,但它们需要您的服务包含“睡眠”机制。
借助 Memphis,您可以使用专门为此目的而设计的称为“延迟消息”的简单功能,将延迟从客户端转移到队列。当您的消费者应用程序需要额外的处理时间时,延迟消息允许您将收到的消息发送回代理。

孟菲斯实施的独特之处在于消费者能够独立且原子地控制这种延迟。
在站内,未消费消息的计数不会影响延迟消息的消费。例如,如果需要 60 秒的延迟,它会精确配置该特定消息的不可见时间。

Memphis.dev 延迟消息

  1. 消费者组收到一条消息。
  2. 发生事件,提示消费者组暂停处理消息。
  3. 假设maxMsgDeliveries尚未达到其限制,消费者将激活message.delay(delayInMilliseconds),绕过消息。代理不会立即重新处理同一消息,而是将其保留指定的持续时间。
  4. 后续消息将被消费。
  5. 一旦请求delayInMilliseconds通过,代理将停止主要消息流并将延迟的消息重新引入循环。

孟菲斯

const { memphis } = require('memphis-dev');// Function to make API requests, simulating rate limitations 
async function makeAPICall(message) 
{ try { const response = await axios.get(message.getDataAsJson()['url']); console.log('API Response:', response.data); message.ack();} catch (error) { console.error('API Error:', error.message); console.log("Delaying message for 1 minute"); message.delay(60000);} 
}(async function () {let memphisConnection;try {memphisConnection = await memphis.connect({host: '<broker-hostname>',username: '<application-type username>',password: '<password>'});const consumer = await memphisConnection.consumer({stationName: '<station-name>',consumerName: '<consumer-name>',consumerGroup: ''});consumer.setContext({ key: "value" });consumer.on('message', (message, context) => {await makeAPICall(url, message);});consumer.on('error', (error) => { });} catch (ex) {console.log(ex);if (memphisConnection) memphisConnection.close();}
})();

结论

了解并遵守速率限制对于使用 API 的应用程序开发人员至关重要。它涉及管理请求频率、达到限制时处理错误、实施退避策略以防止 API 服务器过载以及利用 API 提供的速率限制信息来优化应用程序性能,现在您也知道如何使用队列来做到这一点!


作者:Idan Asulin

更多技术干货请关注公号【云原生数据库

squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。

irds.cn,多数据库管理平台(私有云)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/578270.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

12.25

led.c #include "led.h" void all_led_init() {RCC_GPIO | (0X3<<4);//时钟使能GPIOE_MODER &(~(0X3<<20));//设置PE10输出GPIOE_MODER | (0X1<<20);//设置PE10为推挽输出GPIOE_OTYPER &(~(0x1<<10));//PE10为低速输出GPIOE_OSPEED…

单集群400TB,OceanBase稳定支撑快手核心业务场景

一款日均超过千万人访问的短视频 App 快手&#xff0c;面对高并发流量如何及时有效地处理用户请求&#xff1f;通过在后端配置多套 MySQL 集群来支撑高流量访问&#xff0c;以解决大数据量存储和性能问题&#xff0c;这种传统的 MySQL 分库分表方案有何问题&#xff1f;快手对分…

评估回馈电子负载的重要指标?

回馈电子负载是用于测试电源、电池和其他电子设备性能的设备。它可以模拟实际负载&#xff0c;同时将多余的能量回馈到电网或电池中。在选择和使用回馈电子负载时&#xff0c;有几个重要的指标需要考虑&#xff1a; 功率范围&#xff1a;回馈电子负载的功率范围是指其能够提供的…

巅峰画师Midjourney:新时代的独角兽

介绍 AI绘画领域中&#xff0c;Midjourney处于绝对地位&#xff0c;并且一年时间就登顶。 Midjourney是一家独立的AI研究实验室,探索新的思维媒介,拓展人类的想象力。 它由一个小型的自筹资金团队组成,专注于设计、人类基础设施和AI。 在AI绘画领域,Midjourney取得了非常突出…

百度Apollo五步入门自动驾驶:Dreamview与离线数据包分析(文末赠送apollo周边)

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏:《linux深造日志》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! ⛳️ 粉丝福利活动 ✅参与方式&#xff1a;通过连接报名观看课程&#xff0c;即可免费获取精美周边 ⛳️活动链接&#xf…

pytest 断言异常

一、前置说明 在 pytest 中,断言异常是通过 pytest 内置的 pytest.raises 上下文管理器来实现的。通过使用 pytest.raises,可以捕获并断言代码中引发的异常。 二、操作步骤 1. 编写测试代码 atme/demos/demo_pytest_tutorials/test_pytest_raises.py import pytest# 示例…

【PDF密码】 一键强制去掉pdf密码

想要给PDF文件设置一个密码防止他人对文件进行编辑&#xff0c;那么我们可以对PDF文件设置限制编辑&#xff0c;设置方法很简单&#xff0c;我们在PDF编辑器中点击文件 – 属性 – 安全&#xff0c;在权限下拉框中选中【密码保护】 然后在密码保护界面中&#xff0c;我们勾选【…

通过three.js玩转车展项目

1.项目搭建 1.1 创建文件夹 mkdir 文件名1.2 初始化package.json npm init -y1.3 安装打包工具并配置相关依赖 npm i parcel -d在package.json中打包路径和指令 1.4 安装three.js npm i three -d2.项目搭建 2.1 新建index.html&#xff0c;并再index.html引入car.js,在…

【es6】async、await原理

async、await是es6新出的&#xff0c;主要是为了解决多个promise函数产生的嵌套层级过多的问题。 async、await是基于generator实现的代码中断操作&#xff08;上一个await未处理完时&#xff0c;代码不会继续向下执行&#xff0c;看上去就是中断了代码&#xff09; generator…

Mysql数据库批量更新表编码及排序规则

SELECT CONCAT( ALTER TABLE , TABLE_NAME, DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; ) AS 修正SQL FROM information_schema.TABLES WHERE TABLE_COLLATION ! utf8mb4_unicode_ci AND TABLE_SCHEMA edu-integrated; SELECT…

从流星雨启程:Python和Pygame下载与安装全过程

文章目录 一、前言二、下载安装过程1.官网下载安装包2.安装python过程第一步第二步第三步第四步第五步安装完成 3.简单测试Python3.1 检查 Python 版本号3.2 打开 Python 解释器3.3 输入你的第一个代码3.4 运行 Python 脚本 4.安装Pygame4.1 cmd命令安装Pygame4.2 pip升级4.3 安…

zookeeper 面试

1zookeeper 是什么&#xff1f; 是一个开源的分布式协调服务&#xff0c;它提供了一个具有高可用性和一致性的分布式环境&#xff0c;用于协调和管理分布式系统中的各种数据和状态。 2 zookeeper 都有哪些功能&#xff1f; 分布式锁&#xff1a;可以通过 ZooKeeper 实现分布式…

实战:朴素贝叶斯文本分类器搭建与性能评估

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…

ROS程序中常用循环结构的用途和用法

在 ROS (Robot Operating System) 程序中,循环结构是核心的一部分,用于控制节点的行为和处理消息。下面是一些常用的循环结构及其用途和用法: while (ros::ok()) 循环: 用途: 保持节点运行,用于执行持续的任务或周期性检查。这个循环确保节点在 ROS 环境下正常运行,直到接…

Java 快速入门

简介 跨平台性&#xff1a;Java 最大的优势之一就是跨平台性&#xff0c;即一份 Java 程序可以在多平台上运行&#xff0c;而无需重写。 简单易学&#xff1a;Java 的语法和面向对象的开发方式非常简单易学。 安全性&#xff1a;Java 对于安全性的处理非常慎重&#xff0c;对…

接口测试和功能测试

本文主要分为两个部分&#xff1a; 第一部分&#xff1a;主要从问题出发&#xff0c;引入接口测试的相关内容并与前端测试进行简单对比&#xff0c;总结两者之前的区别与联系。但该部分只交代了怎么做和如何做&#xff1f;并没有解释为什么要做&#xff1f; 第二部分&#xf…

oracle11体系结构二-存储结构

数据区&#xff1a; 数据区&#xff08;数据扩展区&#xff09;由一组连续的oracle数据块所构成的存储结构&#xff0c;一个或多个数据块组成一个数据区&#xff0c;一个或多个数据区组成一个段。当段中所有空间被使用完后&#xff0c;oracle系统将自动为该段分配一个新的数据…

2312clang,基于访问者的前端动作

原文 基于RecursiveASTVisitor的ASTFrontendActions. 创建用RecursiveASTVisitor查找特定名字的CXXRecordDeclAST节点的FrontendAction. 创建FrontendAction 编写基于clang的工具(如Clang插件或基于LibTooling的独立工具)时,常见入口是允许在编译过程中执行用户特定操作的F…

如何在生产环境正确使用Redis

一、在生产环境使用Redis 如果在生产环境使用Redis&#xff0c;需要遵守一定的使用规范&#xff0c;以保障服务稳定、高效。。 1.1、明确Redis集群的服务定位 1、仅适用于缓存场景&#xff1a;Redis定位于高性能缓存服务&#xff0c;强调快速读写和低延迟的特性&#xff0c;…

Adobe Application Manager丢失或损坏 - 解决方案

前言 Adobe Application Manager&#xff08;简称AAM&#xff09;&#xff0c;是用来管理旧版Adobe软件的管理器&#xff0c;后来已经升级为Adobe Creative Cloud&#xff08;简称ACC&#xff09;。 使用Adobe系列软件时可能会报错提示需要使用Adobe Application Manager解决…