45.自定义线程池(三)-拒绝策略

 拒绝策略采用函数式接口参数传入,策略模式

@FunctionalInterface
public interface RejectPolicy<T> {void reject(BlockingQueue<T> queue, T task);
}
package com.xkj.thread.pool;import com.aspose.words.Run;
import lombok.extern.slf4j.Slf4j;import java.util.HashSet;
import java.util.concurrent.TimeUnit;@Slf4j(topic = "c.ThreadPool")
public class ThreadPool {//任务队列private BlockingQueue<Runnable> taskQueue;//线程集合private HashSet<Worker> workers = new HashSet<>();//核心线程数private int coreSize;//获取任务的超时时间private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int coreSize,int queueCapcity,long timeout,TimeUnit timeUnit,RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);this.rejectPolicy = rejectPolicy;}class Worker extends Thread {private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 执行任务// 1.当task不为空执行任务// 2.当task执行完毕,再接着从任务队列获取任务并执行while(task != null || (task = taskQueue.take()) != null) {try {log.debug("正在执行...{}", task);task.run();}catch (Exception e) {}finally {task = null;}}synchronized (workers) {log.debug("worker 被移除{}", this);workers.remove(this);}}}//执行任务public void execute(Runnable task) {synchronized (workers) {if(workers.size() < coreSize) {Worker worker = new Worker(task);log.debug("新增worker{},{}", worker, task);// 当任务数没有超过coreSize时,直接交给worker对象执行workers.add(worker);worker.start();} else {// 当任务数超过coreSize时,加入任务队列暂存// 1) 死等
//                taskQueue.put(task);// 2)超时等待// 3)放弃任务执行// 4)抛出异常// 5)让调用者自己执行任务taskQueue.tryPut(rejectPolicy, task);}}}
}

在原来的基础上添加了 带超时时间的阻塞添加方法,offer方法

package com.xkj.thread.pool;import lombok.extern.slf4j.Slf4j;import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;@Slf4j(topic = "c.blockingQueue")
public class BlockingQueue<T> {//1.任务队列private Deque<T> queue = new ArrayDeque<>();//2.锁private Lock lock = new ReentrantLock();//3.生产者条件变量private Condition fullWaitSet = lock.newCondition();//4.消费者条件变量private Condition emptyWaitSet = lock.newCondition();//5.容量private int capcity;public BlockingQueue(int capcity) {this.capcity = capcity;}/*** 带超时的获取元素* @param timeout* @param unit* @return*/public T poll(long timeout, TimeUnit unit) {lock.lock();try {//将timeout统一转化成纳秒long nanos = unit.toNanos(timeout);while (queue.isEmpty()) { //判断队列是否为空try {if(nanos <= 0) {return null;}//阻塞等待,当被唤醒后,队列不会空,不满足while条件,程序继续向下执行//返回的是timeout - 已经等待的时间 = 剩余的时间//防止虚假唤醒nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}//获取队列头部的元素返回,获取元素后应该从队列中移除T t = queue.removeFirst();//唤醒生产者,继续添加元素fullWaitSet.signal();return t;}finally {lock.unlock();}}/*** 获取元素* @return*/public T take() {lock.lock();try {while (queue.isEmpty()) { //判断队列是否为空try {//阻塞等待,当被唤醒后,队列不会空,不满足while条件,程序继续向下执行emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}//获取队列头部的元素返回,获取元素后应该从队列中移除T t = queue.removeFirst();//唤醒生产者,继续添加元素fullWaitSet.signal();return t;}finally {lock.unlock();}}/*** 添加元素* @param element*/public void put(T element) {lock.lock();try {while (queue.size() == capcity){try {log.debug("等待加入任务队列{}...", element);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列{}", element);queue.addLast(element);//唤醒消费者,继续获取任务emptyWaitSet.signal();}finally {lock.unlock();}}/*** 带超时时间的阻塞添加* @param element 任务* @param timeout 超时时间* @param timeUnit 时间单位* @return*/public boolean offer(T element, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while(queue.size() == capcity) {try {log.debug("等待加入任务队列{}...", element);if(nanos < 0) {return false;}nanos = fullWaitSet.awaitNanos(nanos);}catch (Exception e) {e.printStackTrace();}}log.debug("加入任务队列{}", element);queue.addLast(element);emptyWaitSet.signal();return true;}finally {lock.unlock();}}/*** 获取大小* @return*/public int size() {lock.lock();try {return queue.size();}finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {if(queue.size() == capcity) { //队列已满rejectPolicy.reject(this, task);}else { //队列有空闲log.debug("加入任务队列{}", task);queue.addLast(task);emptyWaitSet.signal();}}finally {lock.unlock();}}
}

场景一:拒绝策略死等 

package com.xkj.thread.pool;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;@Slf4j(topic = "c.TestPool")
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS,(queue, task) -> {// 1.死等queue.put(task);});for (int i = 0; i < 3; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}

场景二:带超时等待

@Slf4j(topic = "c.TestPool")
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS,(queue, task) -> {// 1.死等
//                    queue.put(task);// 2.带超时时间的等待queue.offer(task, 500, TimeUnit.MILLISECONDS);});for (int i = 0; i < 3; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}

注意:ThreadPool的区别,取任务也要调用超时的poll方法

package com.xkj.thread.pool;import lombok.extern.slf4j.Slf4j;import java.util.HashSet;
import java.util.concurrent.TimeUnit;@Slf4j(topic = "c.ThreadPool")
public class ThreadPool {//任务队列private BlockingQueue<Runnable> taskQueue;//线程集合private HashSet<Worker> workers = new HashSet<>();//核心线程数private int coreSize;//获取任务的超时时间private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int coreSize,int queueCapcity,long timeout,TimeUnit timeUnit,RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);this.rejectPolicy = rejectPolicy;}class Worker extends Thread {private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 执行任务// 1.当task不为空执行任务// 2.当task执行完毕,再接着从任务队列获取任务并执行while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行...{}", task);task.run();}catch (Exception e) {}finally {task = null;}}synchronized (workers) {log.debug("worker 被移除{}", this);workers.remove(this);}}}//执行任务public void execute(Runnable task) {synchronized (workers) {if(workers.size() < coreSize) {Worker worker = new Worker(task);log.debug("新增worker{},{}", worker, task);// 当任务数没有超过coreSize时,直接交给worker对象执行workers.add(worker);worker.start();} else {// 当任务数超过coreSize时,加入任务队列暂存// 1) 死等
//                taskQueue.put(task);// 2)超时等待// 3)放弃任务执行// 4)抛出异常// 5)让调用者自己执行任务taskQueue.tryPut(rejectPolicy, task);}}}
}

流程分析

一共三个任务,一个核心线程,队列的容量为1。

第一个任务占用核心线程

第二个任务进入队列

因为队列已满,第三个任务无法放入队列中,只有等待,等待超时时间为500ms

第一个任务执行完成需要1s

1s后才会执行队列中的任务

但是,第三个任务等待500ms就会超时,就不会等待了,也就是不会添加到队列中了。第三个任务也不会被执行了。

这个时候还没有结束,取任务的也会超时,超时时间为1s,所以取任务等了1s后队列中没有新的任务所以也会超时。超时后,核心线程会被移除。

结果

如果第三个任务等待的超时时间变大,设置为1500ms,那么当第一个线程执行完毕花费1s时间,然后从队列中取出第二个线程执行,此时队列为空,第三个线程添加到队列还未超时,成功添加到队列中,等待执行。所以三个线程都可以得到执行。

@Slf4j(topic = "c.TestPool")
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS,(queue, task) -> {// 1.死等
//                    queue.put(task);// 2.带超时时间的等待queue.offer(task, 1500, TimeUnit.MILLISECONDS);});for (int i = 0; i < 3; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}

 场景三:放弃任务的执行

队列满了,不做任何的操作,任务就不会加入到队列中,就等于放弃任务的执行。

@Slf4j(topic = "c.TestPool")
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS,(queue, task) -> {// 1.死等
//                    queue.put(task);// 2.带超时时间的等待
//                    queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3.让调用者放弃任务执行(不要添加任务到队列就是放弃)log.debug("放弃{}", task);});for (int i = 0; i < 3; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}

 场景四:让调用者抛出异常

package com.xkj.thread.pool;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;@Slf4j(topic = "c.TestPool")
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS,(queue, task) -> {// 1.死等
//                    queue.put(task);// 2.带超时时间的等待
//                    queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3.让调用者放弃任务执行(不要添加任务到队列就是放弃)
//                    log.debug("放弃{}", task);// 4.让调用者抛出异常throw new RuntimeException("任务执行失败" + task);});for (int i = 0; i < 3; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}

第三个任务抛出异常后,如果后面还有任务也不会再执行了。

场景五:让调用者自己执行任务

package com.xkj.thread.pool;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;@Slf4j(topic = "c.TestPool")
public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS,(queue, task) -> {// 1.死等
//                    queue.put(task);// 2.带超时时间的等待
//                    queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3.让调用者放弃任务执行(不要添加任务到队列就是放弃)
//                    log.debug("放弃{}", task);// 4.让调用者抛出异常
//                    throw new RuntimeException("任务执行失败" + task);// 5.让调用者自己执行任务task.run();});for (int i = 0; i < 3; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/21682.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SaaS 电商设计 (十一) 那些高并发电商系统的限流方案设计

目录 一.什么是限流二.怎么做限流呢2.1 有哪些常见的系统限流算法2.1.1 固定窗口2.1.1 滑动窗口2.1.2 令牌桶2.1.3 漏桶算法 2.2 常见的限流方式2.2.1 单机限流&集群限流2.2.2 前置限流&后置限流 2.3 实际落地是怎么做的2.3.1 流量链路2.3.2 各链路限流2.3.2.1 网关层2…

重学java 56. Map集合

我们要拥有一定成功的信念 —— 24.6.3 一、双列集合的集合框架 HashMap 1.特点: a.key唯一,value可重复 b.无序 c.无索引 d.线程不安全 e.可以存null键,null值 2.数据结构:哈希表 LinkedHashMap&#xff08;继承HashMap&#xff09; 1.特点: a.key唯一,value可重复 b.有序 c.无…

矩阵连乘问题

#include<iostream> using namespace std; #define N 7 void MatrixChain(int p[N],int n,int m[N][N],int s[N][N]) {for(int i1;i<n;i)m[i][i]0;for(int r2;r<n;r)//有多少个相乘(规模){for(int i1;i<n-r1;i){int jir-1;m[i][j]m[i][i]m[i1][j]p[i]*p[i1]*p[j…

小熊家务帮day10- 门户管理

门户管理 1 门户介绍1.1 介绍1.2 常用技术方案 2 缓存技术方案2.1 需求分析2.1.1 C端用户界面原型2.1.2 缓存需求2.1.3 使用的工具 2.2 项目基础使用2.2.1 项目集成SpringCache2.2.2 测试Cacheable需求Service测试 2.1.3 缓存管理器&#xff08;设置过期时间&#xff09;2.1.4 …

深入理解序列化:概念、应用与技术

在计算机科学中&#xff0c;序列化&#xff08;Serialization&#xff09;是指将数据结构或对象状态转换为可存储或传输的格式的过程。这个过程允许将数据保存到文件、内存缓冲区&#xff0c;或通过网络传输至其他计算机环境&#xff0c;不受原始程序语言的限制。相对地&#x…

URL编码:讲解,抓包

URL 编码&#xff08;也称为百分号编码&#xff09;是一种在 URLs 中编码数据的方法。它将特殊字符转换为由百分号&#xff08;%&#xff09;后跟两个十六进制数字组成的格式。URL 编码通常用于将数据传递到网页或 Web 服务器时&#xff0c;以确保 URL 在传输过程中保持一致和安…

167.二叉树:另一棵树的字树(力扣)

代码解决 /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr) {}* TreeNode(int x) : val(x), left(nullptr), right(nullptr) {}* Tre…

2.3 OpenCV随手简记(四)

阈值处理是很多高级算法底层处理的预方法之一。 自己求图像平均阈值&#xff1a; # -*- codingGBK -*- import cv2 as cv import numpy as np #求出图像均值作为阈值来二值化 def custom_image(image): gray cv.cvtColor(image, cv.COLOR_BGR2GRAY) cv.imshow("原来&qu…

【JavaScript】---DOM操作1:获取元素

【JavaScript】—DOM操作1&#xff1a;获取元素 文章目录 【JavaScript】---DOM操作1&#xff1a;获取元素一、什么是DOM&#xff1f;1.1 概念1.2 图例演示 二、查找HTML元素2.1 getElementById()2.2 getElementsByTagName()2.3 getElementsByClassName()2.4 querySelector()2.…

Go语言 几种常见的IO模型用法 和 netpoll与原生GoNet对比

【go基础】16.I/O模型与网络轮询器netpoller_go中的多路io复用模型-CSDN博客 字节开源的netPoll多路复用器源码解析-CSDN博客 一、几种常见的IO模型 1. 阻塞I/O (1) 解释&#xff1a; 用户调用如accept、read等系统调用&#xff0c;向内核发起I/O请求后&#xff0c;应用程序…

【Spring Cloud Alibaba】服务注册与发现+远程调用

目录 注册微服务到Nacos&#xff08;服务提供者&#xff09;创建项目修改依赖信息添加启动注解添加配置信息启动服务&#xff0c;Nacos控制台查看服务列表 注册微服务到Nacos&#xff08;服务消费者&#xff09;创建项目添加依赖信息添加启动注解添加配置信息启动服务&#xff…

基于卷积神经网络(CNN)的深度迁移学习在声发射(AE)监测螺栓连接状况的应用

螺栓结构在工业中用于组装部件&#xff0c;它们在多种机械系统中扮演着关键角色。确保这些连接结构的健康状态对于航空航天、汽车和建筑等各个行业至关重要&#xff0c;因为螺栓连接的故障可能导致重大的安全风险、经济损失、性能下降和监管合规问题。 在早期阶段检测到螺栓松动…

vue3路由详解,从0开始手动配置路由(vite,vue-router)

创建一个不含路由的vue项目 &#xff08;查看路由配置可以直接跳过这一段&#xff09; 输入npm指令&#xff0c;然后写一个项目名称&#xff0c;之后一路回车即可 npm create vuelatest 注意这里我们不选引入vue router&#xff0c;成功后可以 查看目录 然后按提示信息输入指…

新闻出版署发布新规定,腾讯游戏限制未成年人端午期间每天一小时

原标题&#xff1a;腾讯游戏端午节期间针对未成年人的游戏时间限制措施 易采游戏网6月3日消息&#xff1a;近日国家新闻出版署针对未成年人沉迷网络游戏问题发布了《关于进一步严格管理 切实防止未成年人沉迷网络游戏的通知》&#xff0c;旨在加强对未成年人保护的力度&#xf…

GIS之arcgis系列06:线划图缓冲区分析

缓冲区工具将在输入要素周围指定距离内创建缓冲区面。 缓冲区例程将遍历输入要素的每个折点并创建缓冲区偏移。 通过这些偏移创建输出缓冲区要素 原理&#xff1a; 01.打开文件 02.确定单位&#xff0c;在文件属性里。 03.工具箱-->分析工具-->邻域分析-->缓冲区。 …

派派派森02

目录 1.容器 1.列表 2.元组 3.字符串 3.序列 4.集合 5.字典 2.数据容器通用操作 • max最大元素 • min最小元素 • 容器的通用转换功能 • 通用排序功能 3.字符串大小比较 4.函数中多个返回值 5.函数参数多种传递方式 1.位置参数 2.关键字参数 3.缺省参数 …

【C++/STL】list(常见接口、模拟实现、反向迭代器)

&#x1f308;个人主页&#xff1a;秦jh_-CSDN博客&#x1f525; 系列专栏&#xff1a;https://blog.csdn.net/qinjh_/category_12575764.html?spm1001.2014.3001.5482 目录 前言 list的常见接口 对迭代器的封装 节点 重载-> const迭代器 list与vector的对比 反向迭代…

2020长安杯

链接成功 检材一 1检材 1 的操作系统版本是 ()A. CentOS release 6.5 (Final)B. Ubuntu 16.04.3 LTSC. Debian GNU/ Linux 7.8 (wheezy)D. CentOS Linux release 7.6.1810 (Core)D 2检材 1 中&#xff0c;操作系统的内核版本是 ()(答案格式&#xff1a; “1.2.34” 数字和半角…

JVMの堆、栈内存存储

1、JVM栈的数据存储 通过前面的学习&#xff0c;我们知道&#xff0c;将源代码编译成字节码文件后&#xff0c;JVM会对其中的字节码指令解释执行&#xff0c;在解释执行的过程中&#xff0c;又利用到了栈区的操作数栈和局部变量表两部分。 而局部变量表又分为一个个的槽位&…

前端将DOM元素导出为图片

前端工作中经常会用到把一些元素导出&#xff0c;比如表格&#xff0c;正好项目有遇到导出为excel和导出为图片&#xff0c;就都封装实现了一下&#xff0c;以供其他需求的开发者使用&#xff1a; 1.导出为文档 这个说白了就是下载的功能&#xff0c;传过去检索参数&#xff…