ThreadPool 类
const { Worker,parentPort,isMainThread } = require('worker_threads')
//主线程
if(isMainThread){class ThreadPool {size = 5;queue = [];workerGroup = [];free=0;maxFree=2;monitor=null;constructor(size) {this.size = size;}//初始化子线程init(){for (let i = 0; i < this.size; i++) {this.workerGroup.push({id: i,status: false,worker: new Worker(__filename)});this.workerGroup[i].worker.on("message", (message) => {if (message === 'end') {this.workerGroup[i].status = false;this.check();}});}this.monitor=setInterval(()=>{if(this.isFree()){this.free++;console.log(`空闲次数: ${this.free},如果超过${this.maxFree}次,线程池将关闭,后续提交任务将自动开启`)}else{this.free=0;}this.check();if(this.free>this.maxFree){this.shutdown();clearInterval(this.monitor);this.monitor=null;this.workerGroup = [];this.free=0;}},10000)}isFree(){for (let i = 0; i < this.workerGroup.length; i++) {if (this.workerGroup[i].status) {return false;}}if(this.queue.length>0){return false;}return true;}//清理所有子线程shutdown() {this.workerGroup.forEach(e => {e.worker.terminate();});}/*** 提交异步任务* @param {*} taskContext 任务函数* @param {*} data 任务所需参数*/submitAsync(taskContext,data) {this.add(true,taskContext,data)}/*** 提交同步任务* @param {*} taskContext 任务函数* @param {*} data 任务所需参数*/submit(taskContext,data) {this.add(false,taskContext,data)}//添加任务排队add(isAsync,taskContext,data){if(this.workerGroup.length<1)this.init();//懒加载this.queue.push({isAsync:isAsync,data:data,taskContext:taskContext.toString()});this.check();}//检查任务check() {if(this.queue.length>0){for (let i = 0; i < this.workerGroup.length; i++) {if (!this.workerGroup[i].status) {this.workerGroup[i].status = true;this.workerGroup[i].worker.postMessage(this.queue.pop());break;}}}}}module.exports = ThreadPool;
}else{//子线程//监听任务parentPort.on("message", (task) => {if(task.isAsync){let taskContext=eval("("+task.taskContext+")")taskContext(()=>{parentPort.postMessage('end')//通知主线程任务结束},task.data);}else{let taskContext=eval("("+task.taskContext+")")taskContext(task.data);parentPort.postMessage('end')//通知主线程任务结束}})
}
使用方式
const ThreadPool = require('./ThreadPool');
const pool = new ThreadPool(2);let data = { name: '你好' }// 异步任务
pool.submitAsync(end => {setTimeout(() => {console.log("任务一开始" + new Date().getTime());let num = 0;while (true) {num++;if (num > 100000000) {console.log("任务一结束" + new Date().getTime());break;}}end();}, 1000);
});//异步任务+入参
pool.submitAsync((end, data1) => {setTimeout(() => {let num = 0;console.log("任务二开始" + new Date().getTime() + ",data=" + JSON.stringify(data1));while (true) {num++;if (num > 100000000) {console.log("任务二结束" + new Date().getTime());break;}}end();}, 1000);
}, data);//同步任务
pool.submit(data1 => {console.log("任务一开始" + new Date().getTime() + ",data" + JSON.stringify(data1));let num = 0;while (true) {num++;if (num > 100000000) {console.log("任务一结束" + new Date().getTime());break;}}}, data);//同步任务+入参
pool.submit(data1 => {console.log("任务二开始" + new Date().getTime() + ",data" + JSON.stringify(data1));let num = 0;while (true) {num++;if (num > 100000000) {console.log("任务二结束" + new Date().getTime());break;}}
}, data);
输出日志
任务一开始1706802924114
任务二开始1706802924114,data={"name":"你好"}
任务二结束1706802924196
任务一结束1706802924197
任务二开始1706802924197,data{"name":"你好"}
任务二结束1706802924271
任务一开始1706802924197,data{"name":"你好"}
任务一结束1706802924272
空闲次数: 1,如果超过2次,线程池将关闭,后续提交任务将自动开启
空闲次数: 2,如果超过2次,线程池将关闭,后续提交任务将自动开启
空闲次数: 3,如果超过2次,线程池将关闭,后续提交任务将自动开启
如果是简单的setTimeout,输出日志就是顺序执行的,非并发。