OpenAI API接口请求速率限制
速率限制以五种方式衡量:RPM(每分钟请求数)、RPD(每天请求数)、TPM(每分钟令牌数)、TPD(每天令牌数)和IPM(每分钟图像数)。
任何选项都可能会达到速率限制,具体取决于首先发生的情况。例如,向 ChatCompletions 端点发送仅包含 100 个令牌的 20 个请求,这将达到限制(如果RPM 为 20),即使在这 20 个请求中没有发送 150k 令牌(如果TPM 限制为 150k) 。
其他值得注意的重要事项:
- 速率限制是在组织级别而不是用户级别施加的。
- 速率限制因所使用的模型而异。
- 组织每月可以在 API 上花费的总金额也受到限制。这些也称为“使用限制”。
解决方法
OpenAI Cookbook 有一个Python 笔记本,解释了如何避免速率限制错误,以及一个用于在批处理 API 请求时保持速率限制的示例Python 脚本。
在提供编程访问、批量处理功能和自动社交媒体发布时,考虑只为部分用户启用这些功能。
为了防止自动和大量滥用,请在指定时间范围内(每日、每周或每月)为单个用户设置使用限制。考虑对超出限制的用户实施硬上限或手动审核流程。
方法一:使用指数退避重试
避免速率限制错误的一种简单方法是使用随机指数退避自动重试请求。使用指数退避重试意味着在遇到速率限制错误时执行短暂睡眠,然后重试不成功的请求。如果请求仍然不成功,则增加睡眠长度并重复该过程。这将持续到请求成功或达到最大重试次数为止。这种方法有很多好处:
- 自动重试意味着您可以从速率限制错误中恢复,而不会崩溃或丢失数据
- 指数退避意味着您可以快速尝试第一次重试,同时如果前几次重试失败,仍然可以从更长的延迟中受益
- 在延迟中添加随机抖动有助于同时重试所有命中。
请注意,不成功的请求会影响您的每分钟限制,因此连续重新发送请求将不起作用。
下面是一些使用指数退避的Python解决方案示例。
示例 1:使用 Tenacity 库
Tenacity 是一个 Apache 2.0 许可的通用重试库,用 Python 编写,用于简化向任何事物添加重试行为的任务。要为您的请求添加指数退避,您可以使用tenacity.retry装饰器。下面的示例使用该tenacity.wait_random_exponential函数向请求添加随机指数退避。
from openai import OpenAI
client = OpenAI()from tenacity import (retry,stop_after_attempt,wait_random_exponential,
) # 指数退避@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def completion_with_backoff(**kwargs):return client.completions.create(**kwargs)completion_with_backoff(model="gpt-3.5-turbo-instruct", prompt="Once upon a time,")
请注意,Tenacity 库是第三方工具,OpenAI 不保证其可靠性或安全性。
示例 2:使用backoff库
另一个为退避和重试提供函数装饰器的 python 库是backoff:
import backoff
import openai
from openai import OpenAI
client = OpenAI()@backoff.on_exception(backoff.expo, openai.RateLimitError)
def completions_with_backoff(**kwargs):return client.completions.create(**kwargs)completions_with_backoff(model="gpt-3.5-turbo-instruct", prompt="Once upon a time,")
与 Tenacity 一样,backoff 库是第三方工具,OpenAI 不保证其可靠性或安全性。
示例 3:手动退避实现
如果您不想使用第三方库,您可以按照以下示例实现自己的退避逻辑:
import random
import timeimport openai
from openai import OpenAI
client = OpenAI()# 定义一个重试装饰器
def retry_with_exponential_backoff(func,initial_delay: float = 1,exponential_base: float = 2,jitter: bool = True,max_retries: int = 10,errors: tuple = (openai.RateLimitError,),
):"""Retry a function with exponential backoff."""def wrapper(*args, **kwargs):# 初始化变量num_retries = 0delay = initial_delay# 循环直到成功响应或达到 max_retries 或引发异常while True:try:return func(*args, **kwargs)# 重试特定错误except errors as e:# 增量重试num_retries += 1# 检查是否已达到最大重试次数if num_retries > max_retries:raise Exception(f"Maximum number of retries ({max_retries}) exceeded.")# 增加延迟delay *= exponential_base * (1 + jitter * random.random())time.sleep(delay)# 针对任何未指定的错误引发异常except Exception as e:raise ereturn wrapper@retry_with_exponential_backoff
def completions_with_backoff(**kwargs):return client.completions.create(**kwargs)
同样,OpenAI 不保证该解决方案的安全性或效率,但它可以成为您自己的解决方案的良好起点。
方法二:充分利用max_tokens以匹配您完成的规模
max_tokens您的速率限制是根据您的请求的字符数计算的令牌的最大值和估计数量。尝试将该max_tokens值设置为尽可能接近您的预期响应大小。
批量请求
OpenAI API 对每分钟请求数和每分钟令牌数有单独的限制。
如果您达到了每分钟的请求限制,但每分钟的令牌有可用容量,则可以通过将多个任务批处理到每个请求中来提高吞吐量。这将使您每分钟处理更多令牌,特别是对于我们较小的模型。
发送一批提示的工作方式与普通 API 调用完全相同,只不过您将字符串列表而不是单个字符串传递给提示参数。
- 没有批处理的示例
from openai import OpenAI
client = OpenAI()num_stories = 10
prompt = "Once upon a time,"# 示例,每个请求完成一个故事
for _ in range(num_stories):response = client.completions.create(model="curie",prompt=prompt,max_tokens=20,)# 输出故事print(prompt + response.choices[0].text)
- 批处理示例
from openai import OpenAI
client = OpenAI()num_stories = 10
prompts = ["Once upon a time,"] * num_stories# 批量示例,每个请求完成 10 个故事
response = client.completions.create(model="curie",prompt=prompts,max_tokens=20,
)# 按索引将完成与提示进行匹配
stories = [""] * len(prompts)
for choice in response.choices:stories[choice.index] = prompts[choice.index] + choice.text# 输出故事
for story in stories:print(story)