Node.js 中 cluster 模块全部 API 详解
1. 模块属性
const cluster = require('cluster');// 1. isMaster
// 判断当前进程是否为主进程
console.log('是否为主进程:', cluster.isMaster);// 2. isWorker
// 判断当前进程是否为工作进程
console.log('是否为工作进程:', cluster.isWorker);// 3. schedulingPolicy
// 获取或设置调度策略
// SCHED_NONE: 由操作系统调度
// SCHED_RR: 轮询调度
console.log('当前调度策略:', cluster.schedulingPolicy);
cluster.schedulingPolicy = cluster.SCHED_RR;// 4. workers
// 获取所有工作进程的引用
console.log('工作进程数量:', Object.keys(cluster.workers).length);
2. 主进程方法
// 1. fork()
// 创建新的工作进程
const worker = cluster.fork();// 2. setupMaster([settings])
// 配置主进程
cluster.setupMaster({exec: 'worker.js', // 工作进程文件args: ['--use', 'http'], // 传递给工作进程的参数silent: false, // 是否将工作进程的输出重定向到主进程stdio: ['pipe', 'pipe', 'pipe', 'ipc'], // 标准输入输出配置uid: 1000, // 用户 IDgid: 1000, // 组 IDinspectPort: 0 // 调试端口
});// 3. disconnect([callback])
// 断开所有工作进程的连接
cluster.disconnect(() => {console.log('所有工作进程已断开连接');
});// 4. settings
// 获取当前配置
console.log('当前配置:', cluster.settings);
3. 工作进程属性
// 1. worker.id
// 获取工作进程 ID
console.log('工作进程 ID:', cluster.worker.id);// 2. worker.process
// 获取工作进程的进程对象
console.log('进程 ID:', cluster.worker.process.pid);// 3. worker.exitedAfterDisconnect
// 判断工作进程是否在断开连接后退出
console.log('是否在断开连接后退出:', cluster.worker.exitedAfterDisconnect);// 4. worker.isDead()
// 判断工作进程是否已死亡
console.log('是否已死亡:', cluster.worker.isDead());// 5. worker.isConnected()
// 判断工作进程是否已连接
console.log('是否已连接:', cluster.worker.isConnected());
4. 工作进程方法
// 1. worker.send(message[, sendHandle][, callback])
// 发送消息给主进程
cluster.worker.send('hello from worker', (err) => {if (err) console.error('发送消息失败:', err);
});// 2. worker.disconnect()
// 断开工作进程连接
cluster.worker.disconnect();// 3. worker.kill([signal])
// 终止工作进程
cluster.worker.kill('SIGTERM');
5. 事件
5.1 主进程事件
// 1. fork
// 当创建新的工作进程时触发
cluster.on('fork', (worker) => {console.log('工作进程已创建:', worker.id);
});// 2. online
// 当工作进程上线时触发
cluster.on('online', (worker) => {console.log('工作进程已上线:', worker.id);
});// 3. listening
// 当工作进程开始监听时触发
cluster.on('listening', (worker, address) => {console.log('工作进程正在监听:', worker.id, address);
});// 4. message
// 当收到工作进程消息时触发
cluster.on('message', (worker, message, handle) => {console.log('收到工作进程消息:', worker.id, message);
});// 5. disconnect
// 当工作进程断开连接时触发
cluster.on('disconnect', (worker) => {console.log('工作进程已断开连接:', worker.id);
});// 6. exit
// 当工作进程退出时触发
cluster.on('exit', (worker, code, signal) => {console.log('工作进程已退出:', worker.id, code, signal);
});// 7. error
// 当工作进程发生错误时触发
cluster.on('error', (worker, code, signal) => {console.log('工作进程错误:', worker.id, code, signal);
});
5.2 工作进程事件
// 1. message
// 当收到主进程消息时触发
cluster.worker.on('message', (message, handle) => {console.log('收到主进程消息:', message);
});// 2. disconnect
// 当工作进程断开连接时触发
cluster.worker.on('disconnect', () => {console.log('工作进程已断开连接');
});// 3. error
// 当工作进程发生错误时触发
cluster.worker.on('error', (code, signal) => {console.log('工作进程错误:', code, signal);
});// 4. exit
// 当工作进程退出时触发
cluster.worker.on('exit', (code, signal) => {console.log('工作进程已退出:', code, signal);
});
6. 完整示例
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;if (cluster.isMaster) {console.log(`主进程 ${process.pid} 正在运行`);// 配置主进程cluster.setupMaster({exec: 'worker.js',args: ['--use', 'http'],silent: false});// 启动工作进程for (let i = 0; i < numCPUs; i++) {cluster.fork();}// 主进程事件处理cluster.on('fork', (worker) => {console.log('工作进程已创建:', worker.id);});cluster.on('online', (worker) => {console.log('工作进程已上线:', worker.id);});cluster.on('listening', (worker, address) => {console.log('工作进程正在监听:', worker.id, address);});cluster.on('message', (worker, message, handle) => {console.log('收到工作进程消息:', worker.id, message);worker.send('消息已收到');});cluster.on('disconnect', (worker) => {console.log('工作进程已断开连接:', worker.id);});cluster.on('exit', (worker, code, signal) => {console.log('工作进程已退出:', worker.id, code, signal);// 重启工作进程cluster.fork();});cluster.on('error', (worker, code, signal) => {console.log('工作进程错误:', worker.id, code, signal);});// 定期检查工作进程状态setInterval(() => {for (const id in cluster.workers) {const worker = cluster.workers[id];console.log(`工作进程 ${worker.id} 状态:`, {pid: worker.process.pid,isDead: worker.isDead(),isConnected: worker.isConnected(),exitedAfterDisconnect: worker.exitedAfterDisconnect});}}, 5000);// 优雅关闭process.on('SIGTERM', () => {console.log('收到 SIGTERM 信号,开始优雅关闭');for (const id in cluster.workers) {cluster.workers[id].send('shutdown');cluster.workers[id].disconnect();}});
} else {// 工作进程代码const server = http.createServer((req, res) => {res.writeHead(200);res.end(`你好世界,来自工作进程 ${process.pid}\n`);});server.listen(8000);// 工作进程事件处理cluster.worker.on('message', (message) => {console.log('收到主进程消息:', message);if (message === 'shutdown') {server.close(() => {console.log(`工作进程 ${process.pid} 已关闭`);process.exit(0);});}});cluster.worker.on('disconnect', () => {console.log('工作进程已断开连接');});cluster.worker.on('error', (code, signal) => {console.log('工作进程错误:', code, signal);});cluster.worker.on('exit', (code, signal) => {console.log('工作进程已退出:', code, signal);});// 定期发送心跳setInterval(() => {cluster.worker.send('heartbeat');}, 30000);
}
7. 高级用法
// 1. 动态调整工作进程数量
const cluster = require('cluster');
const http = require('http');
const os = require('os');if (cluster.isMaster) {let workerCount = os.cpus().length;console.log(`初始工作进程数量: ${workerCount}`);// 启动工作进程for (let i = 0; i < workerCount; i++) {cluster.fork();}// 动态调整工作进程数量process.on('SIGUSR1', () => {workerCount = Math.max(1, workerCount - 1);console.log(`减少工作进程数量至: ${workerCount}`);const workers = Object.values(cluster.workers);if (workers.length > workerCount) {workers[workers.length - 1].disconnect();}});process.on('SIGUSR2', () => {workerCount = Math.min(os.cpus().length * 2, workerCount + 1);console.log(`增加工作进程数量至: ${workerCount}`);if (Object.keys(cluster.workers).length < workerCount) {cluster.fork();}});
}// 2. 工作进程负载均衡
const cluster = require('cluster');
const http = require('http');
const os = require('os');if (cluster.isMaster) {const workerCount = os.cpus().length;const workers = [];// 启动工作进程for (let i = 0; i < workerCount; i++) {const worker = cluster.fork();workers.push({worker,load: 0,connections: 0});}// 负载均衡http.createServer((req, res) => {// 选择负载最低的工作进程const target = workers.reduce((min, w) => w.load < min.load ? w : min, workers[0]);target.connections++;target.load = target.connections / 100; // 简单的负载计算// 转发请求target.worker.send('request', { url: req.url });}).listen(8000);// 处理工作进程响应cluster.on('message', (worker, message) => {if (message.type === 'response') {const workerInfo = workers.find(w => w.worker.id === worker.id);if (workerInfo) {workerInfo.connections--;workerInfo.load = workerInfo.connections / 100;}}});
}// 3. 工作进程健康检查
const cluster = require('cluster');
const http = require('http');if (cluster.isMaster) {const workers = new Map();// 启动工作进程for (let i = 0; i < 4; i++) {const worker = cluster.fork();workers.set(worker.id, {worker,healthy: true,lastHeartbeat: Date.now()});}// 健康检查setInterval(() => {const now = Date.now();for (const [id, info] of workers) {if (now - info.lastHeartbeat > 30000) {console.log(`工作进程 ${id} 可能已死亡`);info.healthy = false;info.worker.kill();const newWorker = cluster.fork();workers.set(newWorker.id, {worker: newWorker,healthy: true,lastHeartbeat: now});}}}, 5000);// 处理心跳cluster.on('message', (worker, message) => {if (message === 'heartbeat') {const info = workers.get(worker.id);if (info) {info.healthy = true;info.lastHeartbeat = Date.now();}}});
}
cluster 模块的主要特点:
- 支持多核 CPU 并行处理
- 提供完整的事件系统
- 支持进程间通信
- 支持动态调整工作进程数量
- 支持负载均衡和健康检查
使用建议:
- 根据 CPU 核心数合理设置工作进程数量
- 实现完善的错误处理和重启机制
- 使用事件系统进行进程间通信
- 实现健康检查确保系统稳定性
- 考虑使用更高级的进程管理工具(如 PM2)