【并发编程】手写线程池阻塞队列

       📝个人主页:五敷有你      
 🔥系列专栏:并发编程
⛺️稳重求进,晒太阳

示意图 

步骤1:自定义任务队列

变量定义

  1. 用Deque双端队列来承接任务
  2. 用ReentrantLock 来做锁
  3. 并声明两个条件变量 Condition fullWaitSet emptyWaitSet
  4. 最后定义容量 capcity

方法:

  1. 添加任务
    1. 注意点:
      1. 任务容量慢了 用await
      2. 每个添加都进行一个emptyWaitSet.signalAll 唤醒沉睡的线程
      3. 考虑万一死等的情况,加入时间的判断
  2. 取出任务
    1. 注意点:
      1. 任务空了 用await
      2. 每个任务取出来都进行一个fullWaitSet.signAll来唤醒沉睡的线程
      3. 考虑超时的情况,加入时间的判断
public class MyBlockQueue<T> {//1.任务队列private Deque<T> deque=new ArrayDeque();//2.锁private ReentrantLock lock=new ReentrantLock();//3.生产者条件变量private Condition fullWaitSet=lock.newCondition();//4.消费者条件变量private Condition emptyWaitSet=lock.newCondition();//5.容量private int capcity;public MyBlockQueue(int capcity) {this.capcity = capcity;}//带超时的阻塞获取public T poll(long timeOut, TimeUnit unit){lock.lock();try {//将timeOUt转换成统一转换为nslong nanos = unit.toNanos(timeOut);while (deque.isEmpty()) {try {//返回值=等待时间-经过的时间if(nanos<=0){return null;}nanos= emptyWaitSet.awaitNanos(nanos);}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}//6. 阻塞获取public T take() {lock.lock();try {while (deque.isEmpty()) {try {emptyWaitSet.await();}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}//阻塞添加public void put(T element){lock.lock();try {while (deque.size()==capcity){try {fullWaitSet.await();}catch (Exception e){}}deque.addLast(element);emptyWaitSet.signalAll();} finally {lock.unlock();}}public int size(){lock.lock();try {return deque.size();}finally {lock.unlock();}}}

步骤2:自定义线程池

  1. 定义变量:
    1. 任务队列 taskQueue
    2. 队列的容量
    3. 线程的集合
    4. 核心线程数
    5. 获取任务的超时时间
    6. 时间单位
  2. 方法
    1. 构造方法 初始化一些核心的参数
    2. 执行方法 execute(task) 里面处理任务
      1. 每执行一个任务就放入一个worker中,并开启线程执行 同时放入workers集合中
      2. 当任务数量>核心数量时,就加入到阻塞队列中
  3. 自定义的类worker
    1. 继承Thread 重写Run方法
      1. 执行传递的任务,每次任务执行完毕,不回收,
      2. 去队列中拿任务 当队列也空了之后 workers集合中移除线程,线程停止。
package com.aqiuo.juc;import java.util.HashSet;
import java.util.concurrent.TimeUnit;public class ThreadPool {//任务队列private MyBlockQueue<Runnable> taskQueue;//队列容量int queueCapcity;//线程集合private HashSet<Worker> workers=new HashSet();//线程池的核心线程private int coreSize;//获取任务的超时时间private long timeOut;//时间单位private TimeUnit timeUnit;public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity) {this.coreSize = coreSize;this.timeOut = timeOut;this.timeUnit = timeUnit;taskQueue=new MyBlockQueue<>(queueCapcity);}public void exectue(Runnable task){//当任务数没有超过coreSize时,直接交给work对象执行//如果任务超过coreSize时,加入任务队列synchronized (workers){if(workers.size()<coreSize){Worker worker=new Worker(task);System.out.println("新增worker");workers.add(worker);worker.start();//任务数超过了核心数}else{System.out.println(task+"加入任务队列");taskQueue.put(task);}}}class Worker extends Thread{private Runnable task;public Worker(Runnable task){this.task=task;}@Overridepublic void run() {//执行任务//1)当task不为空,执行任务//2)当task执行完毕,再接着从任务队列中获取任务while (task!=null||(task=taskQueue.take())!=null){try {System.out.println("正在执行worker"+this);sleep(10000);task.run();} catch (Exception e) {}finally {task=null;}}//执行完任务后销毁线程synchronized (workers){workers.remove(this);}}}}

测试

开启15个线程测试

public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);for (int i=0;i<15;i++){int j=i;threadPool.exectue(()->{System.out.println(j);});}}

        执行过程中,超过了队列容量之后,就会发生fullWaitSet阻塞。这个阻塞的线程就开始等待,当有队列不满之后,唤醒fullWaitSet阻塞的队列,

        同理,当队列为空,emptyWaitSet小黑屋阻塞,当有任务被放入,EmptyWaitSet唤醒所有的线程。

这就有一个执行完毕之后,线程不会停止,他会一定等待拿去任务,线程阻塞了EmptyWaitSet

改进

获取任务的超时结束

获取任务take的增强 超时

  //带超时的阻塞获取public T poll(long timeOut, TimeUnit unit){lock.lock();try {//将timeOUt转换成统一转换为nslong nanos = unit.toNanos(timeOut);while (deque.isEmpty()) {try {//返回值=等待时间-经过的时间if(nanos<=0){return null;}nanos= emptyWaitSet.awaitNanos(nanos);}catch (InterruptedException e) {throw new RuntimeException(e);}}T t = deque.removeFirst();fullWaitSet.signalAll();return t;}finally {lock.unlock();}}

修改worker的run函数

      public void run() {//执行任务//1)当task不为空,执行任务//2)当task执行完毕,再接着从任务队列中获取任务
//            while (task!=null||(task=taskQueue.take())!=null){//修改如下while (task!=null||(task=taskQueue.poll(timeOut,timeUnit))!=null){try {System.out.println("正在执行worker"+this);sleep(1000);task.run();} catch (Exception e) {}finally {task=null;}}

正常结束了

放入任务的超时结束offer()

那么有装入任务 的增强 ,就再提供一个超时装入入offer()吧 ,当放入一个满的队列时,超时后返回false不再放入

//带有超时的队列添加
public Boolean offer(T element,long timeOut, TimeUnit unit){lock.lock();long nanos = unit.toNanos(timeOut);try {while (deque.size()==capcity){try {long l = fullWaitSet.awaitNanos(nanos);if(l<=0){return false;}}catch (Exception e){}}deque.addLast(element);emptyWaitSet.signalAll();return true;} finally {lock.unlock();}
}

拒绝策略

函数式接口

@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {void reject(MyBlockQueue<T> queue, T task);
}

代码改进

如下部分代码是存入任务的部分

public void exectue(Runnable task){//当任务数没有超过coreSize时,直接交给work对象执行//如果任务超过coreSize时,加入任务队列synchronized (workers){if(workers.size()<coreSize){Worker worker=new Worker(task);System.out.println("新增worker");workers.add(worker);worker.start();//任务数超过了核心数}else{//存入任务//taskQueue.put(task);//当队列满了之后 执行的策略//1) 死等//2)带有超时的等待//3)当调用者放弃任务执行//4)让调用者抛出异常//5)让调用者自己执行任务...//为了增加灵活性,这里不写死,交给调用者//重新写了一个放入任务的方法taskQueue.tryPut(rejectPolicy,task);}}}

阻塞队列里的tryPut

public void tryPut(ThreadPool.RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {//如果队列容量满了,就开始执行拒绝策略if(capcity>= deque.size()){rejectPolicy.reject(this,task);}else{//不满就正常加入到队列中System.out.println(task+"正常加入到队列");deque.addLast(task);}}finally {lock.unlock();}}

//1) 死等

//2)带有超时的等待

//3)当调用者放弃任务执行

//4)让调用者抛出异常

//5)让调用者自己执行任务...

谁调用方法,谁写拒绝策略

为了传入策略,就再构造函数里面加入一个方法的参数传入

//部分代码...
//拒绝策略
RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int coreSize, long timeOut, TimeUnit timeUnit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeOut = timeOut;this.timeUnit = timeUnit;taskQueue=new MyBlockQueue<>(queueCapcity);this.rejectPolicy=rejectPolicy;
}

主函数编写拒绝的策略,就lamda表达式会把...

public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1, 1000, TimeUnit.MILLISECONDS, 1,(queue,task)->{//死等
//            queue.put(task);//超时添加
//            System.out.println(queue.offer(task, 100, TimeUnit.NANOSECONDS));//放弃执行
//            System.out.print("我放弃");//调用者抛出异常
//            throw new RuntimeException("任务执行失败");//调用者执行
//            task.run();});for (int i=0;i<5;i++){int j=i;threadPool.exectue(()->{System.out.println(j);});}}

五种拒绝策略的结果(我不会用slog4j)

1.死等的结果

2.超时拒绝的结果(每个false都是时间到了,每加进去)

3.不作为,调用者放弃任务

4.抛出异常,停止

5.调用者线程执行了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/670883.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网站不收录,与服务器不备案有关吗

随着互联网的快速发展&#xff0c;网站已经成为企业、个人和机构宣传和展示自己的重要平台。然而&#xff0c;许多网站在建设完成后却面临着不收录的问题&#xff0c;这给网站的管理者和拥有者带来了很大的困扰。其中&#xff0c;一些人认为&#xff0c;网站不收录的原因与服务…

什么是Instagram Reels?用好Reels 让你的流量暴涨!

Instagram Reels是Instagram在2020年全新推出的短视频功能&#xff0c;旨在与TikTok展开竞争。作为跨境卖家的你&#xff0c;利用 Reels 这一神器&#xff0c;将为你带去更多的流量。那该如何利用好这一神器呢&#xff1f;本篇文章&#xff0c;大白将带大家深入了解 Reels 并用…

推动海外云手机发展的几个因素

随着科技的不断发展&#xff0c;海外云手机作为一种新兴技术&#xff0c;在未来呈现出令人瞩目的发展趋势。本文将在用户需求、技术创新和全球市场前景等方面&#xff0c;探讨海外云手机在未来的发展。 1. 用户需求的引领&#xff1a; 随着人们对移动性和便捷性的需求不断增长&…

《Git 简易速速上手小册》第2章:理解版本控制(2024 最新版)

文章目录 2.1 本地仓库与版本历史2.1.1 基础知识讲解2.1.2 重点案例&#xff1a;回滚错误提交2.1.3 拓展案例 1&#xff1a;利用 git bisect 查找引入 bug 的提交2.1.4 拓展案例 2&#xff1a;合并提交历史 2.2 远程仓库的使用2.2.1 基础知识讲解2.2.2 重点案例&#xff1a;在 …

通过Harbor构建docker私服仓库

Harbor是一个用于存储和分发Docker镜像的企业级Registry服务器&#xff0c;它扩展了开源的Docker Distribution&#xff0c;通过添加一些企业必需的功能特性&#xff0c;如安全、标识和管理等。Harbor由VMware公司开发并开源&#xff0c;旨在帮助用户迅速搭建一个企业级的Docke…

点云transformer算法: FlatFormer 论文阅读笔记

代码&#xff1a;https://github.com/mit-han-lab/flatformer论文&#xff1a;https://arxiv.org/abs/2301.08739[FlatFormer.pdf] Flatformer是对点云检测中的 backbone3d部分的改进工作&#xff0c;主要在探究怎么高效的对点云应用transformer 具体的工作如下&#xff1a;一…

PostgreSQL的wal文件回收问题

引子 将PostgreSQL的GUC参数wal_recycle设置为on&#xff0c;然后对数据库执行一定业务量的操作&#xff0c;会发现在pg_wal目录下&#xff0c;有很多未来使用的wal文件&#xff0c;且创建时间比现在正在使用的wal文件更早&#xff0c;下文将描述和分析这种情况。 问题描述 …

springboot160社区智慧养老监护管理平台设计与实现

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获取资料方式 **项…

【数据分享】1929-2023年全球站点的逐年平均能见度(Shp\Excel\免费获取)

气象数据是在各项研究中都经常使用的数据&#xff0c;气象指标包括气温、风速、降水、能见度等指标&#xff0c;说到气象数据&#xff0c;最详细的气象数据是具体到气象监测站点的数据&#xff01; 之前我们分享过1929-2023年全球气象站点的逐年平均气温数据、逐年最高气温数据…

在django中集成markdown文本框

首先需要下载开源组件&#xff1a;http://editor.md.ipandao.com/&#xff0c;可能需要挂梯子。 百度网盘&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1D9o3P8EQDqSqfhAw10kYkw 提取码&#xff1a;eric 1.在html代码中生成一个div&#xff0c;ideditor <div c…

Oracle数据表ID自增操作

一、Oracle ID自增长功能介绍 Oracle数据库默认不支持像 SQLServer、MySQL中的自增长&#xff08;auto increment&#xff09;功能&#xff0c;即自动为每一行记录的自增长字段生成下一个值。 二、Oracle ID自增长方法 第一种&#xff0c;通过序列&#xff08;sequence&#…

蓝桥杯Web应用开发-CSS 基础语法4(字体属性、链接中的伪类、列表样式)

专栏持续更新中 字体属性 字体属性用于定义字体的类型、字号大小、加粗、斜体等方面样式。常用的字体属性如下表所示&#xff1a; 属 性可 取 值描 述fontfont-style、font-variant、font-weight、font-size&#xff08;或 line-height&#xff09;、font-family在一个声明中…

《计算机网络简易速速上手小册》第5章:无线网络和移动通信(2024 最新版)

5.1 WLAN的工作原理 - 揭秘无线局域网络的魔法 5.1.1 基础知识 无线局域网络&#xff08;WLAN&#xff09;允许设备通过无线方式连接到一个局部区域网络&#xff0c;主要基于IEEE 802.11标准&#xff0c;俗称Wi-Fi。WLAN的核心是无线路由器&#xff0c;它不仅充当着网络中各设…

【多模态】27、Vary | 通过扩充图像词汇来提升多模态模型在细粒度感知任务(OCR等)上的效果

论文&#xff1a;Vary: Scaling up the Vision Vocabulary for Large Vision-Language Models 代码&#xff1a;https://github.com/Ucas-HaoranWei/Vary 出处&#xff1a;旷视 时间&#xff1a;2023.12 一、背景 当前流行的大型视觉-语言模型 Large Vision-Language Mode…

挑战杯 python+opencv+深度学习实现二维码识别

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; pythonopencv深度学习实现二维码识别 &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度系数&#xff1a;3分工作量&#xff1a;3分创新点&#xff1a;3分 该项目较为新颖&…

探索Gin框架:Golang Gin框架请求参数的获取

前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站https://www.captainbed.cn/kitie。 前言 我们在专栏的前面几篇文章内讲解了Gin框架的路由配置&#xff0c;服务启动等内容。 专栏地址&…

后台弱口令问题

网站的运营管理不能缺少后台管理系统的支持&#xff0c;若能成功进入后台管理系 统&#xff0c;就意味着在Web渗透测试中成功了一大半。进行非授权登录有很多种方法&#xff0c; 这里主要介绍的是弱口令问题&#xff0c;破解弱口令是进入系统的最常见也是最有效的方 法&#xf…

如何在 Java 中通过 Map.Entry 访问 Map 的元素

我们使用 Map.Entry 来遍历 ConcurrentHashMap 的代码片段如下&#xff1a; for (Map.Entry<String, String> entry : map.entrySet()) { System.out.println("Key: " entry.getKey() ", Value: " entry.getValue()); } 在 Map.java 中&…

RabbitMQ_00000

MQ的相关概念 RabbitMQ官网地址&#xff1a;https://www.rabbitmq.com RabbitMQ API地址&#xff1a;https://rabbitmq.github.io/rabbitmq-java-client/api/current/ 什么是MQ&#xff1f; MQ(message queue)本质是个队列&#xff0c;FIFO先入先出&#xff0c;只不过队列中…

web前后端小坑记录

游戏服务器过年这段时间忙完了&#xff0c;好久没看web了&#xff0c;重温一下。发现竟然没有文章记录这些修BUG的过程&#xff0c;记录一下。 目录 如何处理F5刷新&#xff1f; 如何处理F5刷新&#xff1f; 后端应该发现路由不存在&#xff0c;直接返回打包好的index.html就…