并发设计模式实战系列(3):工作队列

🌟 ​大家好,我是摘星!​ 🌟

今天为大家带来的是并发设计模式实战系列,第三章工作队列(Work Queue)​​,废话不多说直接开始~

目录

一、核心原理深度拆解

1. 生产者-消费者架构

2. 核心组件

二、生活化类比:餐厅厨房系统

三、Java代码实现(生产级Demo)

1. 完整可运行代码

2. 关键配置解析

四、横向对比表格

1. 多线程模式对比

2. 队列实现对比

五、高级优化技巧

1. 动态线程池调整

2. 优先级任务处理

3. 监控指标埋点

六、扩展设计模式集成

1. 责任链+工作队列(复杂任务处理)

七、高级错误处理机制

1. 重试策略设计

2. 代码实现(带重试的Worker)

八、分布式工作队列扩展

1. 基于Kafka的分布式架构

2. 关键配置参数

九、性能调优实战指南

1. 性能瓶颈定位四步法

2. JVM优化参数建议

十、行业应用案例解析

1. 电商秒杀系统实现

2. 日志处理流水线

十一、虚拟线程(Loom)前瞻

1. 新一代线程模型对比

2. 虚拟线程工作队列示例

十二、设计模式决策树


一、核心原理深度拆解

1. 生产者-消费者架构

                                                                                                              ┌─────────────┐       ┌─────────────┐       ┌─────────────┐
│  Producers  │───>   │ Work Queue   │───>   │ Consumers   │
│ (多线程生成)  │<───   │ (任务缓冲)    │<───   │ (线程池处理) │
└─────────────┘       └─────────────┘       └─────────────┘
  • 解耦设计:分离任务创建(生产者)与任务执行(消费者)
  • 流量削峰:队列缓冲突发流量,防止系统过载
  • 资源控制:通过线程池限制最大并发处理数

2. 核心组件

  • BlockingQueue:线程安全的任务容器(支持put/take阻塞操作)
  • ThreadPool:可配置核心/最大线程数,保持CPU利用率与响应速度平衡
  • 任务拒绝策略:定义队列满时的处理方式(丢弃/抛异常/生产者处理)

二、生活化类比:餐厅厨房系统

系统组件

现实类比

核心机制

生产者

服务员接收顾客点单

快速记录订单,不参与烹饪

工作队列

悬挂式订单传送带

暂存待处理订单,平衡前后台节奏

消费者

厨师团队

按订单顺序并行烹饪

  • 高峰期应对:10个服务员接收订单 → 传送带缓冲50单 → 5个厨师并行处理

三、Java代码实现(生产级Demo)

1. 完整可运行代码

import java.util.concurrent.*;public class WorkQueuePattern {// 任务队列(建议根据内存设置合理容量)private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100);// 线程池配置private final ExecutorService workerPool = new ThreadPoolExecutor(4,                              // 核心厨师数8,                              // 最大厨师数(应对高峰期)30, TimeUnit.SECONDS,          // 闲置线程存活时间new LinkedBlockingQueue<>(20), // 线程池等待队列new ThreadFactory() {          // 定制线程命名private int count = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "worker-" + count++);}},new ThreadPoolExecutor.AbortPolicy() // 队列满时拒绝任务);// 生产者模拟class OrderProducer implements Runnable {@Overridepublic void run() {int orderNum = 0;while (!Thread.currentThread().isInterrupted()) {try {Runnable task = () -> {System.out.println("处理订单: " + Thread.currentThread().getName());// 模拟处理耗时try { Thread.sleep(500); } catch (InterruptedException e) {}};workQueue.put(task);  // 阻塞式提交System.out.println("生成订单: " + (++orderNum));Thread.sleep(200);    // 模拟下单间隔} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}}// 启动系统public void start() {// 启动2个生产者线程new Thread(new OrderProducer(), "producer-1").start();new Thread(new OrderProducer(), "producer-2").start();// 消费者自动从队列取任务new Thread(() -> {while (!Thread.currentThread().isInterrupted()) {try {Runnable task = workQueue.take();workerPool.execute(task);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}).start();}public static void main(String[] args) {WorkQueuePattern kitchen = new WorkQueuePattern();kitchen.start();// 模拟运行后关闭try { Thread.sleep(5000); } catch (InterruptedException e) {}kitchen.shutdown();}// 优雅关闭public void shutdown() {workerPool.shutdown();try {if (!workerPool.awaitTermination(3, TimeUnit.SECONDS)) {workerPool.shutdownNow();}} catch (InterruptedException e) {workerPool.shutdownNow();}}
}

2. 关键配置解析

// 线程池参数调优公式(参考)
最佳线程数 = CPU核心数 * (1 + 平均等待时间/平均计算时间)// 四种拒绝策略对比:
- AbortPolicy:直接抛出RejectedExecutionException(默认)
- CallerRunsPolicy:由提交任务的线程自己执行
- DiscardPolicy:静默丢弃新任务
- DiscardOldestPolicy:丢弃队列最旧任务

四、横向对比表格

1. 多线程模式对比

模式

任务调度方式

资源管理

适用场景

Work Queue

集中队列分配

精确控制线程数

通用任务处理

Thread-Per-Task

直接创建线程

容易资源耗尽

简单低并发场景

ForkJoin Pool

工作窃取算法

自动负载均衡

计算密集型任务

Event Loop

单线程事件循环

极低资源消耗

IO密集型任务

2. 队列实现对比

队列类型

排序方式

阻塞特性

适用场景

LinkedBlockingQueue

FIFO

可选有界/无界

通用任务排队

PriorityBlockingQueue

自定义优先级

无界队列

紧急任务优先处理

SynchronousQueue

无缓冲

直接传递

实时任务处理

DelayQueue

延迟时间

时间触发

定时任务调度


五、高级优化技巧

1. 动态线程池调整

// 根据队列负载动态扩容
if (workQueue.size() > threshold) {ThreadPoolExecutor pool = (ThreadPoolExecutor) workerPool;pool.setMaximumPoolSize(newMaxSize);
}

2. 优先级任务处理

// 使用PriorityBlockingQueue需实现Comparable
class PriorityTask implements Runnable, Comparable<PriorityTask> {private int priority;@Overridepublic int compareTo(PriorityTask other) {return Integer.compare(other.priority, this.priority);}// run()方法实现...
}

3. 监控指标埋点

// 监控队列深度
Metrics.gauge("workqueue.size", workQueue::size);// 线程池监控
ThreadPoolExecutor pool = (ThreadPoolExecutor) workerPool;
Metrics.gauge("pool.active.threads", pool::getActiveCount);
Metrics.gauge("pool.queue.size", () -> pool.getQueue().size());

六、扩展设计模式集成

1. 责任链+工作队列(复杂任务处理)

┌───────────┐     ┌───────────┐     ┌───────────┐
│  Task     │     │  Task     │     │  Task     │
│ Splitter  │───> │ Processor │───> │ Aggregator│
└───────────┘     └───────────┘     └───────────┘↓                ↓                ↓[拆分子任务]      [并行处理]       [结果合并]
  • 场景:电商订单处理(拆分子订单→并行校验→合并结果)
  • 代码片段
// 任务拆分器
class OrderSplitter {List<SubOrder> split(MainOrder order) { /* 拆分为N个子订单 */ }
}// 子任务处理器
class OrderValidator implements Runnable {public void run() { /* 库存校验/地址校验等 */ }
}// 结果聚合器
class ResultAggregator {void aggregate(List<SubResult> results) { /* 合并校验结果 */ }
}

七、高级错误处理机制

1. 重试策略设计

策略类型

实现方式

适用场景

立即重试

失败后立即重试最多3次

网络抖动等临时性问题

指数退避

等待时间=2^n秒(n为失败次数)

服务过载类错误

死信队列

记录失败任务供人工处理

数据错误等需干预问题

2. 代码实现(带重试的Worker)

class RetryWorker implements Runnable {private final Runnable task;private int retries = 0;public RetryWorker(Runnable task) {this.task = task;}@Overridepublic void run() {try {task.run();} catch (Exception e) {if (retries++ < MAX_RETRY) {long delay = (long) Math.pow(2, retries);executor.schedule(this, delay, TimeUnit.SECONDS);} else {deadLetterQueue.put(task);}}}
}

八、分布式工作队列扩展

1. 基于Kafka的分布式架构

                          ┌────────────┐│  Kafka     ││ (Partition)│└─────┬──────┘│
┌───────────┐              ┌───┴────┐              ┌───────────┐
│ Producer  ├───orders───>  │        │  ──workers─>  │ Consumer  │
│ Service   │              │  Topic  │               │ Group     │
└───────────┘              └─────────┘               └───────────┘
  • 特性
    • 分区机制实现并行处理
    • 消费者组自动负载均衡
    • 持久化保证不丢消息

2. 关键配置参数

# 生产者端
acks=all                  # 确保消息持久化
retries=10                # 发送失败重试次数
max.in.flight=5           # 最大未确认请求数# 消费者端
enable.auto.commit=false  # 手动提交offset
max.poll.records=100      # 单次拉取最大记录数
session.timeout.ms=30000  # 心跳检测时间

九、性能调优实战指南

1. 性能瓶颈定位四步法

  1. 监控队列深度workQueue.size() > 阈值时报警
  2. 分析线程状态
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
for (long tid : bean.getAllThreadIds()) {System.out.println(bean.getThreadInfo(tid));
}
  1. JVM资源检查
jstat -gcutil <pid> 1000  # GC情况
jstack <pid>              # 线程dump
  1. 压测工具验证
ab -n 10000 -c 500 http://api/endpoint

2. JVM优化参数建议

-XX:+UseG1GC                           # G1垃圾回收器
-XX:MaxGCPauseMillis=200               # 目标暂停时间
-Xms4g -Xmx4g                          # 固定堆大小
-XX:MetaspaceSize=256m                 # 元空间初始值
-XX:+ParallelRefProcEnabled            # 并行处理引用

十、行业应用案例解析

1. 电商秒杀系统实现

┌───────────────┐     ┌───────────────┐     ┌───────────────┐
│  请求入口       │     │  库存预扣      │     │  订单生成       │
│ (Nginx限流)    │───> │ (Redis队列)   │───> │ (DB批量写入)   │
└───────────────┘     └───────────────┘     └───────────────┘
  • 关键设计
    • 使用Redis List作为分布式队列
    • 库存预扣与订单生成解耦
    • 数据库批量写入合并操作

2. 日志处理流水线

// 使用Disruptor高性能队列
class LogEventProcessor {void onEvent(LogEvent event, long sequence, boolean endOfBatch) {// 1. 格式清洗// 2. 敏感信息过滤// 3. 批量写入ES}
}
  • 性能对比
    • 传统队列:10万条/秒
    • Disruptor:2000万条/秒

十一、虚拟线程(Loom)前瞻

1. 新一代线程模型对比

维度

平台线程

虚拟线程

内存消耗

1MB/线程

1KB/线程

切换成本

涉及内核调度

用户态轻量级切换

适用场景

CPU密集型任务

IO密集型高并发场景

2. 虚拟线程工作队列示例

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();void handleRequest(Request request) {executor.submit(() -> {try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {Future<String> user = scope.fork(() -> queryUser(request));Future<String> order = scope.fork(() -> queryOrder(request));scope.join();return new Response(user.get(), order.get());}});
}

十二、设计模式决策树

graph TDA[任务类型?] --> B{CPU密集型}A --> C{IO密集型}B --> D[线程数=CPU核心数+1]C --> E[线程数=CPU核心数*2]E --> F{是否需资源隔离?}F --> |是| G[使用多个独立线程池]F --> |否| H[共享线程池+队列]H --> I{是否需优先级?}I --> |是| J[PriorityBlockingQueue]I --> |否| K[LinkedBlockingQueue]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/76716.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云账号安全事件应急响应指南:应对来自中国IP的异常访问

在当今数字化时代,云服务已成为企业IT基础设施的核心。然而,随之而来的安全挑战也日益突出。本文将详细介绍当发现云账号被来自中国的IP地址异常利用时,应如何快速有效地响应,以确保账户安全并最小化潜在风险。 1. 确认异常活动 首先,我们需要确认是否真的发生了安全事件…

三网通电玩城平台系统结构与源码工程详解(五):客户端热更机制与多端资源分发流程

本篇将聚焦三网通平台在多客户端部署中的资源热更机制设计、跨平台同步策略、版本控制与前端资源发布管理&#xff0c;帮助开发者搭建高效稳定的资源更新系统。 一、资源分发平台架构 为实现安卓端、iOS端、PC端的统一更新分发&#xff0c;平台采用 Node.js Express 构建资源…

spark和hadoop的区别

一、spark概述 二、处理速度 三、 编程模型 四、实时性处理 五、spark内置模块 六、spark的运行模式

AI写代码之GO+Python写个爬虫系统

下面我们我们来利用AI&#xff0c;来用GOPython写个爬虫系统。 帮我写一个Python语言爬取数据写入Mysql的案例&#xff0c;信息如下&#xff1a; 1、Mysql数据库地址是&#xff1a;192.168.1.20 &#xff0c;mysql用户名是&#xff1a;root&#xff0c; Mysql密码是&#xff1…

从单模态到多模态:深度生成模型的演进历程

在人工智能领域&#xff0c;生成模型的发展一直是研究热点。从最早的自编码器到如今的多模态扩散模型&#xff0c;这一技术路线不断突破&#xff0c;为创意内容生成、数据增强和表示学习等领域带来革命性变化。本文将详细介绍几种关键生成模型的技术原理和演进路径&#xff0c;…

【系统架构设计师】嵌入式微处理器

目录 1. 说明2. 微处理器(MPU)3. 微控制器(MCU)4. 信号处理器(DSP)5. 图形处理器(GPU)6. 片上系统(SoC)7. 例题7.1 例题1 1. 说明 1.嵌入式微处理器主要用于处理相关任务。2.由于嵌入式系统通常都在室外使用&#xff0c;可能处于不同环境&#xff0c;因此&#xff0c;选择处理…

Cursor Free VIP 重置进程错误,轻松恢复使用!

快速修复 Cursor Free VIP 重置进程错误&#xff0c;轻松恢复使用&#xff01; 在使用 Cursor Free VIP 的过程中&#xff0c;突然遭遇 “重置进程错误” 是不是让你手忙脚乱&#xff1f;当屏幕弹出 “文件未找到: C:\Users\用户\AppData\Local\Programs\Cursor\resources\app…

dolphinscheduler实现(oracle-hdfs-doris)数据ETL

dolphinscheduler执行 完整脚本(自行替换相关变量)配置文件conf配置文件解析脚本转base64脚本 完整脚本(自行替换相关变量) user_olsh conf/getInfo.sh Oracle user conf/databases.conf password_olsh conf/getInfo.sh Oracle password conf/databases.conf dblink_olsh conf…

小小矩阵设计

在电气设计图中&#xff0c;矩阵设计的接线方法是通过结构化布局实现多灵活链接的技术&#xff0c;常用于信号切换、配电调压或更加复杂的控制场景。 今天聊一种在电气图纸中用到的一种简单矩阵接法&#xff0c;一眼就看明白&#xff0c;很大程度简化了程序控制点和继电器的使用…

【音视频】FFmpeg解封装

解封装 复用器&#xff0c;比如MP4/FLV 解复用器&#xff0c;MP4/FLV 封装格式相关函数 avformat_alloc_context(); 负责申请一个AVFormatContext结构的内存,并进行简单初始化avformat_free_context(); 释放该结构里的所有东西以及该结构本身avformat_close_input();关闭解复…

1️⃣5️⃣three.js_GUI辅助调试器

15、GUI辅助调试器 3D虚拟工厂在线体验 GUI辅助调试器将原本需要修改代码调整参数并刷新页面的操作&#xff0c;简化为直接在GUI中实时调整&#xff0c;实现所见即所得的效果。 导入GUI 库 //引入GUI辅助调试器 import { GUI } from three/addons/libs/lil-gui.module.min.js…

Redis 的指令执行方式:Pipeline、事务与 Lua 脚本的对比

Pipeline 客户端将多条命令打包发送&#xff0c;服务器顺序执行并一次性返回所有结果。可以减少网络往返延迟&#xff08;RTT&#xff09;以提升吞吐量。 需要注意的是&#xff0c;Pipeline 中的命令按顺序执行&#xff0c;但中间可能被其他客户端的命令打断。 典型场景&…

Linux下的网络管理配置

一、 IPv4原理 IPv4&#xff08;Internet Protocol version 4&#xff09;&#xff0c;采用32位地址。IPv4地址通常用点分十进制表示&#xff0c;如 192.168.1.10。 IPv4网络通信基于数据包交换原理&#xff0c;当一台主机要向另一台主机发送数据时&#xff0c;会将数据分割成…

基于Python(Django)+SQLite实现(Web)校园助手

校园助手 本校园助手采用 B/S 架构。并已将其部署到服务器上。在网址上输入 db.uplei.com 即可访问。 使用说明 可使用如下账号体验&#xff1a; 学生界面: 账号1&#xff1a;123 密码1&#xff1a;123 账户2&#xff1a;201805301348 密码2&#xff1a;1 # --------------…

unity动态骨骼架设+常用参数分享(包含部分穿模解决方案)

Unity骨骼物理模拟插件Dynamic Bone Dynamic Bone 可用于对角色的骨骼&#xff08;bones&#xff09;或者铰链系统&#xff08;joints&#xff09;施加物理效果。 物理效果可以使得游戏角色的头发、衣服、胸部或者是其他的任何部位&#xff0c;都可以以近似真实的状态运动。 …

科技天眼守望农田:珈和卫星遥感监测赋能智慧农业,护航粮食安全新未来

农情监测与粮食安全密切相关&#xff0c;以往农作物的长势、环境、病虫害、灾情等相关数据和图像信息都是靠物联网硬件及县、镇、村等人力来完成&#xff0c;不仅要耗费大量人力、物力&#xff0c;而且数据时效性、准确性较差。珈和科技开发建设农情遥感监测系统&#xff0c;运…

【TeamFlow】4.2 Yew库详细介绍

Yew 是一个用于构建高效、交互式前端 Web 应用程序的现代 Rust 框架&#xff0c;它借鉴了 React 和 Elm 等框架的设计理念&#xff0c;同时充分利用 Rust 的语言特性。 核心特性 基于组件的架构 Yew 采用组件化开发模式&#xff0c;类似于 React: 组件是可重用的 UI 构建块 …

毕设 - 数字孪生智慧农场(vue+高德地图)项目分享

感兴趣的同学可以私信我或者在下方添加我的qq 在线地址: 数字孪生智慧农场

深入理解 VMware 虚拟机网络模式:为虚拟化管理铺平道路

随着云计算和虚拟化技术的快速发展&#xff0c;VMware作为行业领军者&#xff0c;在企业的IT基础设施中扮演着越来越重要的角色。无论是开发、测试还是生产环境&#xff0c;虚拟机&#xff08;VM&#xff09;都成为了我们不可或缺的工具。在VMware中&#xff0c;网络是虚拟机能…

安恒安全渗透面试题

《网安面试指南》https://mp.weixin.qq.com/s/RIVYDmxI9g_TgGrpbdDKtA?token1860256701&langzh_CN 5000篇网安资料库https://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247486065&idx2&snb30ade8200e842743339d428f414475e&chksmc0e4732df793fa3bf39…