一、前提介绍
我在前文实现过一个【批量任务调度管理器】的 demo,能实现简单的任务批量并发分组,过滤等操作。但是还有很多优化空间,所以查找一些优化的库,
主要想优化两个方面, 上篇提到的:
针对 3,其实可以自己手写一个,也可以依靠 如什么来实现。
针对 2,最难的是根据【当前系统负载或网络状况调整并发请求数量】,这必须需要引入一个检测系统的东西。
而动态调整数量,我现在的代码其实已经可以满足,我只是无法【获取当前系统负载或网络状况】。
当然我搜了一些,也有推荐 P-Queue 这类队列库来现在我现在的功能的,它有一些优点,比如:动态调整并发数、任务优先级、延迟、减少手动实现的复杂性,简化代码逻辑,提高可读性和维护性
等等。
最后实现功能:基于 P-Queue 和 AbortController 的批量操作功能,其中包括了动态并发控制、网络状态监测和请求取消功能。
二、P-Queue 介绍
P-Queue 是一个基于 Promise 的优先队列,它允许你控制并发任务的数量,并且可以设置任务的优先级。P-Queue 的使用非常简单,只需要创建一个队列实例,然后使用 add
方法添加任务即可。
优点:
1. 固定并发数: 创建 P-Queue 实例时指定一个并发数,例如 5。这表示队列最多同时处理 5 个任务。
2. 自动调度:只需要将任务添加到队列中,而不需要关心任务的调度和执行顺序。P-Queue 会根据你指定的并发数自动处理任务——我现在自己手写的那个只是考虑使用 for 循环启动多个并发请求,并通过递归调用来处理下一任务。这种方法不涉及任务队列的概念,而是直接处理任务并发
3. 任务执行:任务被添加到队列时,P-Queue 会立即检查当前的并发任务数量。如果当前执行的任务数少于并发数限制,它会立即开始执行新的任务。
如果当前正在执行的任务数达到并发限制,P-Queue 会将新任务放入队列中,等待直到有空闲的并发槽位(即当前正在运行的任务数减少到低于并发限制)。
4. 任务完成:当一个任务完成时,P-Queue 会检查队列中的其他待处理任务。如果有任务在队列中等待并且当前并发数仍然未达到限制,它会自动开始执行下一个任务。
使用案例:
import PQueue from 'p-queue'
import axios from 'axios'// 创建一个队列,设置并发数为 5
const queue = new PQueue({ concurrency: 5 })const tasks = [() => axios.get('/api/task1'),() => axios.get('/api/task2')// 更多任务...
]// 添加任务到队列中
tasks.forEach((task) => {queue.add(task)
})// 动态调整并发数——可以根据系统的情况来调整
setTimeout(() => {queue.concurrency = 10 // 动态调整并发数
}, 5000)// 等待所有任务完成
queue.onIdle().then(() => {console.log('All tasks completed')
})
三、动态检测的实现
我曾想过使用一些第三方库来实现检测当前系统的情况,然后动态控制并发数量,这样会更友好,查了一些方案,感觉没有必要。
实际的负载检测逻辑通常依赖于系统性能指标和资源使用情况。一些常见的负载检测方法和第三方库:
- os-utils:提供系统负载和资源使用情况的简单接口。
- systeminformation:提供详细的系统信息,包括 CPU、内存、网络等。
- node-os-utils:获取系统的 CPU、内存、磁盘和网络信息,适合用于 Node.js 环境。
对于大多数应用,特别是当任务负载和并发需求相对稳定时,动态调整可能显得过于复杂。遂放弃。
四、优化效果
最后,我的方案如下:
基于 P-Queue 和 AbortController 的批量操作功能,其中包括了动态并发控制、网络状态监测和请求取消功能。以下是实现功能的详细描述:
功能优化概述
- 动态并发控制:getConcurrency:根据设备的硬件线程数(CPU 核心数)动态计算并发请求数量,一个比较基础的检测,也可以考虑其他的检查
- 队列 queue:使用 P-Queue 创建任务队列,初始化并发数量。
- 监听网络是否中断,再批量请求处理:每个请求的处理函数,使用 AbortController 允许在网络断开时中断请求。
- 注意:组件挂载时初始化网络状态监听、也可以考虑监听网络的情况更新 P-Queue 的并发数量
优点是:可扩展性强,还是比较简单灵活,结合了并发控制、请求取消和网络状态检测。能够提升性能,优化用户体验。
代码实现
import PQueue from 'p-queue'
import axios from 'axios'onMounted(() => {window.addEventListener('online', handleNetworkChange)window.addEventListener('offline', handleNetworkChange)
})
// 创建 AbortController 实例
let abortController = new AbortController()// 网络状态检测函数
const isOnline = () => navigator.onLine
// 定义动态并发控制函数(根据需要自定义)
const getConcurrency = () => {//目的是根据系统的硬件资源或其他条件动态调整并发请求的数量。具体来说,它通过 navigator.hardwareConcurrency 获取设备的硬件线程数(CPU核心数),然后将其限制在一个合适的范围内(在这个例子中是 1 到 10)// 你可以根据系统负载、网络状况等动态调整并发数return Math.max(1, Math.min(10, navigator.hardwareConcurrency || 4))
}// 创建 P-Queue 实例
const queue = new PQueue({ concurrency: getConcurrency() })// 更新并发数的函数
const updateConcurrency = () => {queue.concurrency = getConcurrency()
}
// 存储每个请求的 AbortController 实例
const controllers = ref([])
const batchOperation = (title, operationType, axiosConfig, shouldFilterList) => {const successCount = ref(0)const errorCount = ref(0)// 执行批量操作startLoading()const requestTaskList = shouldFilterList? tableData.value.filter((item, index, arr) => arr.findIndex((val) => val.id === item.id) === index): tableData.valueif (requestTaskList.length === 0) {ElMessage.error('没有可操作的任务!')stopLoading()return}// 定义请求函数const requestFunction = async (row) => {// 创建新的 AbortController 实例const controller = new AbortController()controllers.value.push(controller) // 将控制器添加到集合中const params = {}try {const res = await axios.request({url: axiosConfig.url,method: axiosConfig.method,params: params,signal: abortController.signal // 使用 AbortController 实例})if (res.data.code === 200) {successCount.value++} else {errorCount.value++}} catch (error) {if (error.name === 'AbortError') {console.warn('Request was aborted due to network being offline.')} else {errorCount.value++console.error('Request failed:', error)}}}requestTaskList.forEach((row) => {queue.add(() => requestFunction(row))})queue.onIdle().then(() => {stopLoading(tabs)ElNotification({title: `${title}结果`,message: `共 ${requestTaskList.length} 个任务,${successCount.value} 个处理成功,${errorCount.value} 个处理失败。`,type: errorCount.value === 0 ? 'success' : 'warning'})controllers.value = [] // 清空控制器集合})// 动态调整并发数的定时器(每隔 5 分钟更新一次)——感觉作用不大,会导致不必要的性能开销// setInterval(() => {// queue.concurrency = getConcurrency()// }, 300000)
}
// 取消所有请求
const cancelAllRequests = () => {controllers.value.forEach((controller) => {controller.abort()})controllers.value = [] // 清空控制器集合
}
// 监听网络状态变化
const handleNetworkChange = () => {if (!isOnline()) {console.log('Network is offline. Cancelling ongoing requests.')cancelAllRequests() // 取消所有请求} else {console.log('Network is back online. Resuming requests.')abortController = new AbortController() // 创建新的 AbortController 实例}
}// 在组件卸载之前,取消所有未完成的请求
onBeforeUnmount(() => {isUnmount.value = truewindow.removeEventListener('online', handleNetworkChange)window.removeEventListener('offline', handleNetworkChange)cancelAllRequests() // 组件卸载时取消所有请求
})
效果评价
1.并发数测试
同样 20 条,优化前:
优化第一版,上篇:
优化第二版,这版:
其实吧,我也看不出来,性能有啥变化,累了,还是太少了吧……
但是,代码友好了一些些吧,并发上来讲,比之前好很多。
2.取消请求测试
同样 20 条,优化前,没有加取消的功能,断网会——500!如果你的 axios 拦截里对 500 进行处理的话,就会误伤。
优化后:
会取消。
五、总结
待完善~