Java并发编程之线程池ThreadPoolExecutor解析

线程池存在的意义

平常使用线程即new Thread()然后调用start()方法去启动这个线程,但是在频繁的业务情况下如果在生产环境大量的创建Thread对象是则会浪费资源,不仅增加GC回收压力,并且还浪费了时间,创建线程是需要花时间的;

线程池的存在就是降低频繁的创建线程,降低资源的消耗以及创建时间的浪费,并且可以同一管理。

ThreadPoolExecutor

在JDK中所有的线程池的父类就是ThreadPoolExecutor,以下是它的构造方法

    /*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even*        if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the*        pool* @param keepAliveTime when the number of threads is greater than*        the core, this is the maximum time that excess idle threads*        will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are*        executed.  This queue will hold only the {@code Runnable}*        tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor*        creates a new thread* @param handler the handler to use when execution is blocked*        because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

int corePoolSize :线程池中核心线程数,小于corePoolSize ,就会创建新线程,等于corePoolSize ,这个任务就会保存到BlockingQueue,如果调用prestartAllCoreThreads()方法就会一次性的启动corePoolSize个数的线程。

int maximumPoolSize: 允许的最大线程数,BlockingQueue也满了,小于maximumPoolSize时候就会再次创建新的线程

long keepAliveTime:线程空闲下来后,存活的时间,这个参数只在大于corePoolSize才有用

TimeUnit unit:存活时间的单位值

BlockingQueue<Runnable> workQueue:保存任务的阻塞队列

ThreadFactory threadFactory:创建线程的工厂,给新建的线程赋予名字

RejectedExecutionHandler handler:饱和策略

          AbortPolicy :直接抛出异常,默认;

         CallerRunsPolicy:用调用者所在的线程来执行任务

         DiscardOldestPolicy:丢弃阻塞队列里最老的任务,队列里最靠前的任务

         DiscardPolicy :当前任务直接丢弃

也可以实现自己的饱和策略,实现RejectedExecutionHandler接口即可

实现基本原理

主要是依赖BlockingQueue<Runnable>队列和HashSet<Worker>实现的,Worker继承了Runnable以及AQS的一个内部类,所以这个类具体等待并且开启线程的功能

在提交Runnable可执行的线程时,

当前线程数小于corePoolSize  的时候,仅仅是将Runnable添加到HashSet<Worker>当中,并且执行start()方法,调用的是runWorker()方法

当前线程数大于或等于corePoolSize  的时候,会将Runnable添加到workerQueue队列中等待并且会添加一个null的Runnable到addWorker()方法当中。如果队列满了offer失败就会执相应的reject拒绝策略。

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

在addWorker()方法当中,如果Runnable为空的话,会直接返回false,否则将创建一个Worker对象并且启动它,在runWorker中,首先执行完后传输过来的Runnable对象中的run(),然后循环去workerQueue队列使用take方法拿等待队列中的Runnable对象,并且执行相应的run()方法。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

关闭线程池的方法:

shutdownNow():设置线程池的状态,还会尝试停止正在运行或者暂停任务的线程

shutdown()设置线程池的状态,只会中断所有没有执行任务的线程

工作机制

合理配置线程池

根据任务的性质来:计算密集型(CPU),IO密集型,混合型

计算密集型:加密,大数分解,正则……., 线程数适当小一点,最大推荐:机器的Cpu核心数+1,为什么+1,防止页缺失,(机器的Cpu核心=Runtime.getRuntime().availableProcessors();)

IO密集型:读取文件,数据库连接,网络通讯, 线程数适当大一点,机器的Cpu核心数*2,

混合型:尽量拆分,IO密集型>>计算密集型,拆分意义不大,IO密集型~计算密集型

队列的选择上,应该使用有界,无界队列可能会导致内存溢出

Executors预定义的线程池

FixedThreadPool:创建固定线程数量的,适用于负载较重的服务器,使用了无界队列

SingleThreadPoolExecutor:创建单个线程,需要顺序保证执行任务,不会有多个线程活动,使用了无界队列

CachedThreadPool:会根据需要来创建新线程的,执行很多短期异步任务的程序,使用了SynchronousQueue
WorkStealingPool(JDK7以后): 基于ForkJoinPool实现

Executor框架

还有一个是定时器,待会儿再说吧

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/537159.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面向过程的门面模式

{*******************************************************}{ }{ 业务逻辑一 }{ }{ 版权所有 (C) 2008 陈…

Java并发编程之线程定时器ScheduledThreadPoolExecutor解析

定时器 就是需要周期性的执行任务&#xff0c;也叫调度任务&#xff0c;在JDK中有个类Timer是支持周期性执行&#xff0c;但是这个类不建议使用了。 ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor线程池&#xff0c;在Executors默认创建了两种&#xff1a; newSin…

python xml转换键值对_Python 提取dict转换为xml/json/table并输出

#!/usr/bin/python#-*- coding:gbk -*-#设置源文件输出格式import sysimport getoptimport jsonimport createDictimport myConToXMLimport myConToTabledef getRsDataToDict():#获取控制台中输入的参数&#xff0c;并根据参数找到源文件获取源数据csDict{}try:#通过getopt获取…

应用开发框架之——根据数据表中的存储的方法名称来调用方法

功用一&#xff1a;在框架里面根据存储在数据表中的方法名来动态调用执行方法。 unit Unit1; interface uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, StdCtrls; type TForm1 class(TForm) Button1: TButton; procedu…

Spring IOC容器组件注入的几种方式

整理一下之前Spring的学习笔记&#xff0c;大致有一下几种Spring注入到容器中的方法: 1&#xff09;、配置在xml的方式。 2&#xff09;、开启包扫描ComponentScan使用Component&#xff0c;Service&#xff0c;Controller&#xff0c;Repository&#xff08;其实后三个都继承…

我们是如何拿下Google和Facebook Offer的?

http://posts.careerengine.us/p/57c3a1c1a09633ee7e57803c 大家好&#xff0c;我是小高&#xff0c;CMU CS Master&#xff0c;来Offer第一期学员&#xff0c;2014年初在孙老师的带领下我在几个月的时间内进入了Yahoo&#xff0c;并工作了近2年。2016年初&#xff0c;Yahoo工作…

Spring中BeanFactory和FactoryBean的区别

先介绍一下Spring的IOC容器到底是个什么东西&#xff0c;都说是一个控制反转的容器&#xff0c;将对象的控制权交给IOC容器&#xff0c;其实在看了源代码之后&#xff0c;就会发现IOC容器只是一个存储单例的一个ConcurrentHashMap<String, BeanDefinition> BeanDefiniti…

python中数字和字符串可以直接相加_用c语言或者python将文件中特定字符串后面的数字相加...

匿名用户1级2014-08-31 回答代码应该不难吧。既然用爬虫爬下来了&#xff0c;为什么爬取数据的时候没做处理呢。之前用过Scrapy爬虫框架&#xff0c;挺好用的&#xff0c;你可研究下。代码&#xff1a;#!codingutf-8import osimport reimport random# 获取当前目录文件列表def …

Spring中Aware的用法以及实现

Aware 在Spring当中有一些内置的对象是未开放给我们使用的&#xff0c;例如Spring的上下文ApplicationContext、环境属性Environment&#xff0c;BeanFactory等等其他的一些内置对象&#xff0c;而在我们可以通过实现对应的Aware接口去拿到我们想要的一些属性&#xff0c;一般…

c#字符型转化为asc_C#字符串和ASCII码的转换

//字符转ASCII码&#xff1a;public static int Asc(string character){if (character.Length 1){System.Text.ASCIIEncoding asciiEncoding new System.Text.ASCIIEncoding();int intAsciiCode (int)asciiEncoding.GetBytes(character)[0];return (intAsciiCode);}else{thr…

topcoder srm 625 div1

problem1 link 假设第$i$种出现的次数为$n_{i}$&#xff0c;总个数为$m$&#xff0c;那么排列数为$T\frac{m!}{\prod_{i1}^{26}(n_{i}!)}$ 然后计算回文的个数&#xff0c;只需要考虑前一半&#xff0c;得到个数为$R$&#xff0c;那么答案为$\frac{R}{T}$. 为了防止数字太大导致…

Spring的组件赋值以及环境属性@PropertySource

PropertySource 将指定类路径下的.properties一些配置加载到Spring当中&#xff0c; 有个跟这个差不多的注解PropertySources Target(ElementType.TYPE) Retention(RetentionPolicy.RUNTIME) Documented public interface PropertySources {PropertySource[] value();} 使用…

python语音识别框架_横评:五款免费开源的语音识别工具

编者按&#xff1a;本文原作者 Cindi Thompson&#xff0c;美国德克萨斯大学奥斯汀分校(University of Texas at Austin)计算机科学博士&#xff0c;数据科学咨询公司硅谷数据科学(Silicon Valley Data Science&#xff0c;SVDS)首席科学家&#xff0c;在机器学习、自然语言处理…

csharp read excel file get sheetName list

1 /// <summary>2 /// 3 /// 塗聚文4 /// 201208035 /// Geovin Du6 ///找到EXCEL的工作表名称 要考慮打開的文件的進程問題7 /// </summary>8 /// <param name"filename">…

Spring Bean的生命周期以及IOC源码解析

IOC源码这一块太多只能讲个大概吧&#xff0c;建议还是去买本Spring IOC源码解析的书来看比较好&#xff0c;我也是自己看源代码以及视频整理的笔记 Bean的生命周期大概可以分为四个阶段&#xff0c;具体的等会再说&#xff0c;先看看IOC的源码吧 1、bean的创建 2、bean的属…

python3绘图_python3绘图示例2(基于matplotlib:柱状图、分布图、三角图等)

#!/usr/bin/env python# -*- coding:utf-8 -*-from matplotlib import pyplot as pltimport numpy as npimport pylabimport os,sys,time,math,random# 图1-给已有的图加上刻度filer‘D:\jmeter\jmeter3.2\data\Oracle数据库基础.png‘arrnp.array(file.getdata()).reshape(fil…

bzoj4152-[AMPPZ2014]The_Captain

Description 给定平面上的n个点&#xff0c;定义(x1,y1)到(x2,y2)的费用为min(|x1-x2|,|y1-y2|)&#xff0c;求从1号点走到n号点的最小费用。 Input 第一行包含一个正整数n(2<n<200000)&#xff0c;表示点数。 接下来n行&#xff0c;每行包含两个整数x[i],yi&#xff0c;…

python日志统计_python试用-日志统计

最近两天尝试用python代替bash写Linux Shell脚本来统计日志。发现python写起来比bash更简单和容易阅读&#xff0c;发现不少惊喜。所以写了一个粗糙的脚本来统计日志。目标1、通过简单命令和脚本统计事件发生数2、日志限定文本类型假定环境日志文件&#xff1a;1.logtest:aaa,1…

Spring AOP两种使用方式以及如何使用解析

AOP是一种面向切面编程思想&#xff0c;也是面向对象设计&#xff08;OOP&#xff09;的一种延伸。 在Spring实现AOP有两种实现方式&#xff0c;一种是采用JDK动态代理实现&#xff0c;另外一种就是采用CGLIB代理实现&#xff0c;Spring是如何实现的在上篇已经讲到了Spring Be…