书接前文Java线程池及其实现原理
常用线程池有:
CachedThreadPool
FixedThreadPool
SingleThreadExecutor
ScheduledThreadPool
SingleThreadScheduledExecutor
Executors .newCachedThreadPool(); | 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 TimeUnit.SECONDS,new SynchronousQueue()); |
Executors .newFixedThreadPool(int); | 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 内部实现:new ThreadPoolExecutor(nThreads, nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue()); |
Executors .newSingleThreadExecutor(); | 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照顺序执行。 内部实现:new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue()) |
Executors .newScheduledThreadPool(int) | 创建一个定长线程池,支持定时及周期性任务执行。 内部实现:new ScheduledThreadPoolExecutor(corePoolSize) |
Executors .newSingleThreadScheduledExecutor() | 创建一个单线程的任务调度池(定时任务/延时任务) |
线程池使用场景
场景1:快速响应用户请求
场景2:快速处理批量任务
阿里巴巴Java开发手册中对线程池的使用规范
- 【强制】创建线程或线程池时请指定有意义的线程名称,方便出错时回溯
- 【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明: 使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。 - 【强制】线程池不允许使用 Executors 去创建,而是通过ThreadPoolExecutor的方式创建,这样的处理方式让写的Javer更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM。
综上所述,建议使用ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
线程池关闭
如果线程池没有做好关闭,可能会导致oom,严重可能会导致服务挂掉。
手动线程池
手动线程池关闭分为:shutdown和shutdownNow。
executor.shutdown();executor.shutdownNow();
区别:
shutdown主要做两件事:
- 把线程池状态置为 SHUTDOWN 状态
- 中断空闲线程
shutdownNow主要做三件事:
- 把线程池状态置为 STOP 状态
- 中断工作线程
- 把线程池中的任务都 drain 出来并返回
综上,shutdown方法会等待把线程池中的任务都执行完,而shutdownNow会直接中断当前工作线程。因为,shutdown方法只是把空闲的 woker 置为中断,不影响正在运行的woker,并且会继续把待执行的任务给处理完。shutdonwNow方法则是把所有的 woker 都置为中断,待执行的任务全部抽出并返回。
日常工作中更多是使用 shutdown()。
如果要优雅的关闭线程池,则可以使用 awaitTermination() 和 JVM 的钩子来实现。
Thread shutdownHook = new Thread(() -> {executor.shutdown();try {executor.awaitTermination(3, TimeUnit.MINUTES);} catch (InterruptedException e) {log.info("等待超时,直接关闭");}
});
Runtime.getRuntime().addShutdownHook(shutdownHook);
自动关闭线程池
核心线程数为 0 并指定线程存活时间
@Test
public void test(){// 重点关注 corePoolSize 和 keepAliveTime,其他参数不重要ThreadPoolExecutor executor = new ThreadPoolExecutor(0,5,15L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(15));for (int i = 0; i < 20; i++) {executor.execute(() -> {// 简单地打印当前线程名称System.out.println(Thread.currentThread().getName());});}
}
缺点:如果将corePoolSize设置为0的话,新到来的任务会永远优先被放入任务队列,然后等待被处理,这显然会影响程序的执行效率。
通过 allowCoreThreadTimeOut 控制核心线程存活时间
@Test
public void test(){// 这里把corePoolSize设为5,keepAliveTime保持不变ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5,15L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(15));// 允许核心线程超时销毁executor.allowCoreThreadTimeOut(true);for (int i = 0; i < 20; i++) {executor.execute(() -> {System.out.println(Thread.currentThread().getName());});}
}
线程池中的线程设置为守护线程
@Test
public void test(){// 这里把corePoolSize设为5,keepAliveTime保持不变ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5,15L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(15),new ThreadFactory() {@Overridepublic Thread newThread(@NotNull Runnable r) {Thread thread = new Thread(r, r.hashCode()+"");//设置成守护线程thread.setDaemon(true);return thread;}});// 允许核心线程超时销毁executor.allowCoreThreadTimeOut(true);for (int i = 0; i < 20; i++) {executor.execute(() -> {System.out.println(Thread.currentThread().getName());});}
}
PS:守护进程(Daemon)是运行在后台的一种特殊进程。它独立于控制终端并且周期性地执行某种任务或等待处理某些发生的事件。也就是说守护线程不依赖于终端,但是依赖于系统,与系统“同生共死”。
线程池大小设计
首先需要确定该应用的预估并发量,假设每秒钟需要处理 100 个请求。然后根据业务特点,可做如下设定:
- 确定线程池大小:为了充分利用 CPU 和内存资源,可以将线程池大小设置为 CPU 核心数的两倍。假设 CPU 核心数为 8,那么线程池大小可设置为 16。
- 确定任务队列容量:由于这里的任务是从数据库中读取数据并进行计算,因此任务执行时间相对较长,需要将任务队列容量设置得大一些。假设任务队列容量为 100。
也可通过Runtime.getRuntime().availableProcessors()获取当前CPU核心数。
局部使用
int size = Runtime.getRuntime().availableProcessors() * 2;
ThreadPoolExecutor executor = new ThreadPoolExecutor(size,size,100,TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(100),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy());
//修改报价单状态->已转为销售订单
executor.execute(() -> xx.changeQuoteStatus(accessToken, thirdId, userId));
Thread shutdownHook = new Thread(() -> {executor.shutdown();try {executor.awaitTermination(3, TimeUnit.MINUTES);} catch (InterruptedException e) {log.info("等待超时,直接关闭");}
});
Runtime.getRuntime().addShutdownHook(shutdownHook);
parallelStream并行流
parallelStream并行流的底层是ForkJoinPool,如果要设置并行流线程参数,需要
private int parallelThreads = Runtime.getRuntime().availableProcessors() * 2;
ForkJoinPool pool = new ForkJoinPool(parallelThreads);
ForkJoinTask task = pool.submit(() -> {Lists.partition(list, 100).parallelStream().forEach(...);
});
task.join();
pool.shutdown();
全局配置使用
配置
/*** @author lyonardo* @createTime 2019年04月11日 10:14:23* @Description*/
@Configuration
@EnableAsync
public class TaskExecutorConfig {@Bean(name = "taskExecutor")public ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//线程池大小executor.setCorePoolSize(16);//最大线程池大小executor.setMaxPoolSize(16);//任务队列容量executor.setQueueCapacity(100);//线程前缀名executor.setThreadNamePrefix("taskExecutor-");// 任务拒绝策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
}
使用
使用线程池时,可以通过注解 @Async 将方法标记为异步方法,让方法在另一个线程中执行:
@Test
@Async("taskExecutor")
public void testExecute(){log.info("213");
}
在代码中,使用了 @Async 注解将 testExecute() 方法标记为异步方法,并指定了线程池名称为 "taskExecutor",表示该方法将在 "taskExecutor" 线程池中执行。这样就可以避免在主线程中等待查询和计算的结果,提高了应用的响应速度。
如果需要接收异步操作的结果,可以使用Future做如下操作:
@Async
public Future<Integer> calculateSum(int a, int b) {int sum = a + b;return new AsyncResult<Integer>(sum);
}@Test
public void test() throws ExecutionException, InterruptedException {// 使用方法,等待所有线程执行完,在同一处理结果Future<Integer> futureResult = calculateSum(1, 2);//执行其他操作Integer result = futureResult.get(); //阻塞等待异步操作完成并获取结果log.info("result::"+result);
}