Java并发编程之线程池ThreadPoolExecutor解析

线程池存在的意义

平常使用线程即new Thread()然后调用start()方法去启动这个线程,但是在频繁的业务情况下如果在生产环境大量的创建Thread对象是则会浪费资源,不仅增加GC回收压力,并且还浪费了时间,创建线程是需要花时间的;

线程池的存在就是降低频繁的创建线程,降低资源的消耗以及创建时间的浪费,并且可以同一管理。

ThreadPoolExecutor

在JDK中所有的线程池的父类就是ThreadPoolExecutor,以下是它的构造方法

    /*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even*        if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the*        pool* @param keepAliveTime when the number of threads is greater than*        the core, this is the maximum time that excess idle threads*        will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are*        executed.  This queue will hold only the {@code Runnable}*        tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor*        creates a new thread* @param handler the handler to use when execution is blocked*        because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

int corePoolSize :线程池中核心线程数,小于corePoolSize ,就会创建新线程,等于corePoolSize ,这个任务就会保存到BlockingQueue,如果调用prestartAllCoreThreads()方法就会一次性的启动corePoolSize个数的线程。

int maximumPoolSize: 允许的最大线程数,BlockingQueue也满了,小于maximumPoolSize时候就会再次创建新的线程

long keepAliveTime:线程空闲下来后,存活的时间,这个参数只在大于corePoolSize才有用

TimeUnit unit:存活时间的单位值

BlockingQueue<Runnable> workQueue:保存任务的阻塞队列

ThreadFactory threadFactory:创建线程的工厂,给新建的线程赋予名字

RejectedExecutionHandler handler:饱和策略

          AbortPolicy :直接抛出异常,默认;

         CallerRunsPolicy:用调用者所在的线程来执行任务

         DiscardOldestPolicy:丢弃阻塞队列里最老的任务,队列里最靠前的任务

         DiscardPolicy :当前任务直接丢弃

也可以实现自己的饱和策略,实现RejectedExecutionHandler接口即可

实现基本原理

主要是依赖BlockingQueue<Runnable>队列和HashSet<Worker>实现的,Worker继承了Runnable以及AQS的一个内部类,所以这个类具体等待并且开启线程的功能

在提交Runnable可执行的线程时,

当前线程数小于corePoolSize  的时候,仅仅是将Runnable添加到HashSet<Worker>当中,并且执行start()方法,调用的是runWorker()方法

当前线程数大于或等于corePoolSize  的时候,会将Runnable添加到workerQueue队列中等待并且会添加一个null的Runnable到addWorker()方法当中。如果队列满了offer失败就会执相应的reject拒绝策略。

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

在addWorker()方法当中,如果Runnable为空的话,会直接返回false,否则将创建一个Worker对象并且启动它,在runWorker中,首先执行完后传输过来的Runnable对象中的run(),然后循环去workerQueue队列使用take方法拿等待队列中的Runnable对象,并且执行相应的run()方法。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

关闭线程池的方法:

shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程

shutdown()设置线程池的状态,只会中断所有没有执行任务的线程

工作机制

合理配置线程池

根据任务的性质来:计算密集型(CPU),IO密集型,混合型

计算密集型:加密,大数分解,正则……., 线程数适当小一点,最大推荐:机器的Cpu核心数+1,为什么+1,防止页缺失,(机器的Cpu核心=Runtime.getRuntime().availableProcessors();)

IO密集型:读取文件,数据库连接,网络通讯, 线程数适当大一点,机器的Cpu核心数*2,

混合型:尽量拆分,IO密集型>>计算密集型,拆分意义不大,IO密集型~计算密集型

队列的选择上,应该使用有界,无界队列可能会导致内存溢出

Executors预定义的线程池

FixedThreadPool:创建固定线程数量的,适用于负载较重的服务器,使用了无界队列

SingleThreadPoolExecutor:创建单个线程,需要顺序保证执行任务,不会有多个线程活动,使用了无界队列

CachedThreadPool:会根据需要来创建新线程的,执行很多短期异步任务的程序,使用了SynchronousQueue
WorkStealingPool(JDK7以后): 基于ForkJoinPool实现

Executor框架

还有一个是定时器,待会儿再说吧

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/537159.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java并发编程之线程定时器ScheduledThreadPoolExecutor解析

定时器 就是需要周期性的执行任务&#xff0c;也叫调度任务&#xff0c;在JDK中有个类Timer是支持周期性执行&#xff0c;但是这个类不建议使用了。 ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor线程池&#xff0c;在Executors默认创建了两种&#xff1a; newSin…

Spring中BeanFactory和FactoryBean的区别

先介绍一下Spring的IOC容器到底是个什么东西&#xff0c;都说是一个控制反转的容器&#xff0c;将对象的控制权交给IOC容器&#xff0c;其实在看了源代码之后&#xff0c;就会发现IOC容器只是一个存储单例的一个ConcurrentHashMap<String, BeanDefinition> BeanDefiniti…

Spring中Aware的用法以及实现

Aware 在Spring当中有一些内置的对象是未开放给我们使用的&#xff0c;例如Spring的上下文ApplicationContext、环境属性Environment&#xff0c;BeanFactory等等其他的一些内置对象&#xff0c;而在我们可以通过实现对应的Aware接口去拿到我们想要的一些属性&#xff0c;一般…

Spring Bean的生命周期以及IOC源码解析

IOC源码这一块太多只能讲个大概吧&#xff0c;建议还是去买本Spring IOC源码解析的书来看比较好&#xff0c;我也是自己看源代码以及视频整理的笔记 Bean的生命周期大概可以分为四个阶段&#xff0c;具体的等会再说&#xff0c;先看看IOC的源码吧 1、bean的创建 2、bean的属…

MongoDB位运算基本使用以及位运算应用场景

最近在公司业务上用到了二进制匹配数据&#xff0c;但是MongoDB进行二进制运算&#xff08;Bitwise&#xff09;没用过&#xff0c;网上博客文章少&#xff0c;所以就上官网看API&#xff0c;因此记录一下&#xff0c;顺便在普及一下使用二进制位运算的一些应用。 在MongoDB的…

Mybatis源码日志模块分析

看源码需要先下载源码&#xff0c;可以去Mybatis的github上的仓库进行下载&#xff0c;Mybatis 这次就先整理一下日志这一块的源码分析&#xff0c;这块相对来说比较简单而且这个模块是Mybatis的基础模块。 之前的文章有谈到过Java的日志实现&#xff0c;大家也可以参考一下&…

python手机端给电脑端发送数据_期货交易软件有哪些比较好用?分手机端和电脑端...

一、电脑端交易软件期货电脑端交易软件目前市场上用的最多的是文华财经和博易大师&#xff0c;这两个软件都是免费交易使用的。从投资者使用角度来看&#xff0c;目前电脑端文华财经的评价比博易大师高一些。当然每个投资者有自己的使用习惯&#xff0c;博易大师也有自己优点&a…

Find the Difference(leetcode389)

2019独角兽企业重金招聘Python工程师标准>>> Given two strings s and t which consist of only lowercase letters. String t is generated by random shuffling string s and then add one more letter at a random position. Find the letter that was added in …

Mybatis源码之数据源模块分析

先来看看java纯jdbc查询数据的示例&#xff1a; try {//加载对应的驱动类Class.forName("com.mysql.cj.jdbc.Driver");//创建连接Connection connection DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/test?serverTimezoneUTC", "roo…

Mybatis源码之缓存模块分析

缓存这个东西在很多应用中都能看到它们的身影&#xff0c;这次就讲讲在Mybatis中的缓存是怎么应用的&#xff0c;虽然说吧Mybatis中的缓存基本不怎么用&#xff0c;用的更多是第三方组件redis、MongoDB、MemCache等等。 Mybatis的缓存是基于Map实现的&#xff0c;从缓存中读写…

Mybatis源码之核心流程分析

终于谈到了Mybatis最核心的东西了&#xff0c;最核心的就是通过配置XML文件或注解中的SQL&#xff0c;直接调用接口就能执行配置好的SQL语句并封装成对应的返回类型的数据。 先看一下Mybatis使用示例&#xff1a; //创建Builder对象 SqlSessionFactoryBuilder builder new S…

Mybatis源码之与Spring集成包

这次讲讲Mybatis与Spring的整合&#xff0c;作为两款优秀的开源框架&#xff0c;被大众广泛使用&#xff0c;自然是需要强强联合的。 使用示例 先看一下怎么使用&#xff0c;首先需要引用这两款框架的jar包&#xff1a; <dependency>//spring-webmvc会自动去引入其他S…

Mybatis源码之插件模块分析

总结完这个Mybatis的整体主要功能基本上就差不多完&#xff0c;还有一些细节的部分&#xff0c;后续都会记录补充。 插件这个东西一般用的比较少&#xff0c;就算用的多的插件也算是PageHelper分页插件&#xff1b; PageHelper官网&#xff1a;https://github.com/pagehelper…

AMD推出7nm高端显卡Radeon VII,直指英伟达RTX 2080

显卡战争已经发展到了2019年&#xff0c;并且变得比任何人预想的都要激烈。 CES 2019大会上&#xff0c;AMD发布了第一款消费级的 7nm GPU&#xff0c;取名&#xff1a;Radeon VII。据了解&#xff0c;这不是 AMD 的第一颗 7nm 处理器&#xff08;早期以 AI 运算为主的 Radeon …

Spring集成Mybatis多数据源配置

既然在整理Mybatis那就把经常用的这个多数据源的笔记也整一下吧。 Spring集成Mybatis在之前就已经提到了。Spring集成Mybatis 集成Mybatis多数据源有两种方式&#xff1a; 1、创建多个SqlSessionFactory&#xff0c;扫描每个SqlSessionFactoryBean对应的包&#xff0c;形成了…

Spring文件上传

2019独角兽企业重金招聘Python工程师标准>>> Spring文件上传 1、所需依赖包&#xff1a;commons-fileupload-1.3.1.jar2、Maven配置文件pom.xml文件中加入依赖Jar包<dependency><groupId>commons-fileupload</groupId><artifactId>commons-…

算法题学到的一些小语言细节

1.要学会用i&#xff1b;可以简化很多代码&#xff1a;i&#xff1b;copyFromMe(i)&#xff1b;可以写成&#xff1a;copyFromeMe(i) 2.StringBuffer也跟列表一样有append函数&#xff1b; 3.if语句是分支不能进行循环&#xff0c;要写成while才能替代循环里面的判断 4. 这里的…

Zookeeper基础常用操作以及ACL权限

这次将Zookeeper的一些基础用法以及权限这块的都补充一下在这篇博客中。 上篇博客介绍了基于ZooKeeper实现的分布式锁&#xff0c;也介绍了一些ZooKeeper的节点类型以及监听机制&#xff0c;今天这里就不作过多的介绍了&#xff0c;大家也可以自行的去官方文档上看看更具体的介…

[中医经络学习一]足阳明胃经

人体有六脏&#xff08;心、肝、脾、肺、肾五脏&#xff0c;再加心包&#xff09;六腑&#xff08;胃、小肠、大肠、膀胱、胆、三焦&#xff09;&#xff0c;每个脏腑都联接着一条经络&#xff0c;一共12条经络&#xff0c;称“十二正经”&#xff0c;经络的走向在四肢两侧基本…