前言:
1、主流浏览器在 HTTP/1.1 下对同一域名的最大并发请求数通常是 6~8 个。超过限制的请求会进入队列,等待空闲的连接。
2、可以利用Promise模拟任务队列,控制并发请求数量,以避免对服务器造成过大的压力。(先进先出)
代码
一个简单的请求队列调度器,用于控制并发请求的最大数量
import axios from 'axios'export const handQueue = (reqs // 请求总数
) => {reqs = reqs || []const requestQueue = (concurrency) => {concurrency = concurrency || 6 // 最大并发数const queue = [] // 请求池let current = 0const dequeue = () => {while (current < concurrency && queue.length) {current++;const requestPromiseFactory = queue.shift() // 出列requestPromiseFactory().then(() => { // 成功的请求逻辑}).catch(error => { // 失败console.log(error)}).finally(() => {current--dequeue()});}}return (requestPromiseFactory) => {queue.push(requestPromiseFactory) // 入队dequeue()}}const enqueue = requestQueue(6)for (let i = 0; i < reqs.length; i++) {enqueue(() => axios.get('/api/test' + i))}
}
功能拆解
-
handQueue 函数:
参数 reqs: 是一个数组,包含需要发送的请求。
函数的主要目的是对这些请求进行队列管理,确保并发请求的数量不会超过设定的上限。 -
requestQueue 函数:
concurrency:最大并发数
enqueue:用于将新的请求添加到队列并处理它们
queue: 请求池,用于存储待处理的请求。
current: 当前正在执行的请求数。 -
dequeue 函数:
1、从请求池中取出请求并发送。它在一个循环中运行,直到当前并发请求数current达到最大并发数concurrency或请求池queue为空。 2、对于每个出队的请求,它首先增加current的值,然后调用请求函数requestPromiseFactory来发送请求。 3、当请求完成(无论成功还是失败)后,它会减少current的值并再次调用dequeue,以便处理下一个请求。
const dequeue = () => { while (current < concurrency && queue.length) { current++; const requestPromiseFactory = queue.shift() // 出列 requestPromiseFactory() .then(() => { // 成功的请求逻辑 }) .catch(error => { // 失败 console.log(error) }) .finally(() => { current-- dequeue() }); } }
// 定义返回请求入队函数 return (requestPromiseFactory) => { queue.push(requestPromiseFactory) // 入队 dequeue() } // 函数返回一个函数,这个函数接受一个参数requestPromiseFactory,表示一个返回Promise的请求工厂函数。 // 这个返回的函数将请求工厂函数加入请求池queue,并调用dequeue来尝试发送新的请求, // 当然也可以自定义axios,利用Promise.all统一处理返回后的结果。
使用一个 while 循环,当当前请求数 current 小于最大并发数 concurrency 并且队列 queue 中有待处理请求时:
1、从队列中取出第一个请求工厂 requestPromiseFactory。
2、调用 requestPromiseFactory() 开始请求。
3、使用 then 和 catch 处理请求成功和失败的逻辑。
4、在 finally 中,减少当前请求数 current,并递归调用 dequeue,以确保队列持续运行。 -
分发逻辑:
for 循环遍历传入的请求数组 reqs。
对于每个请求,通过 enqueue 将其加入队列。
每个请求通过工厂函数 () => axios.get(‘/api/test’ + i) 包装,保证每次调用都返回新的请求 Promise。
优化
-
超时管理:
如果某个请求卡住,会阻塞队列,可为请求设置超时,例如:const timeout = (promise, ms) =>Promise.race([promise,new Promise((_, reject) => setTimeout(() => reject('Timeout'), ms)),]);
-
错误处理:
重试或告警机制:const retry = (fn, retries = 3) =>fn().catch(err => (retries > 0 ? retry(fn, retries - 1) : Promise.reject(err)));
-
队列清空检测:
在所有请求处理完成后,触发回调或事件通知:if (queue.length === 0 && current === 0) {console.log('All requests completed'); }
-
优化后的代码
import axios from "axios"; /*** 处理并发请求队列* @param {Array<Function>} reqs 请求总数* @param {Object} options 配置选项* @param {number} options.concurrency 最大并发数* @param {number} options.timeout 请求超时时间 (ms)* @param {Function} options.onQueueEmpty 队列清空后的回调*/ export const handQueue = (reqs, options = {}) => {const { concurrency = 6, timeout = 10000, onQueueEmpty = () => {} } = options;// 确保请求队列为数组reqs = reqs || [];const queue = [];let current = 0;// 超时管理器const timeoutRequest = (requestPromise, timeout) =>new Promise((resolve, reject) => {const timer = setTimeout(() => reject(new Error("Request timeout")), timeout);requestPromise.then((res) => {clearTimeout(timer);resolve(res);}).catch((err) => {clearTimeout(timer);reject(err);});});// 出列处理const dequeue = async () => {while (current < concurrency && queue.length) {current++;const requestPromiseFactory = queue.shift(); // 出列try {await timeoutRequest(requestPromiseFactory(), timeout); // 包装超时处理console.log("Request success");} catch (error) {console.error("Request failed:", error.message); // 错误处理} finally {current--;dequeue();// 如果队列为空且没有正在处理的请求,调用队列清空回调if (queue.length === 0 && current === 0) {onQueueEmpty();}}}};// 入队函数const enqueue = (requestPromiseFactory) => {queue.push(requestPromiseFactory);dequeue();};// 将请求添加到队列for (let i = 0; i < reqs.length; i++) {enqueue(() => axios.get(`/api/test${i}`));} };
-
使用
const reqs = Array.from({ length: 20 }, (_, i) => () => axios.get(`/api/test${i}`)); handQueue(reqs, {concurrency: 4, // 最大并发数timeout: 5000, // 超时时间 5 秒onQueueEmpty: () => {console.log("All requests have been processed!");}, });