线程池优化之充分利用线程池资源

一、前言

  最近做了电子发票的需求,分省开票接口和发票下载接口都有一定的延迟。为了完成开票后自动将发票插入用户微信卡包,目前的解决方案是利用线程池,将开票后插入卡包的任务(轮询分省发票接口,直到获取到发票相关信息或者轮询次数用完,如果获取到发票信息,执行发票插入微信卡包,结束任务)放入线程池异步执行。仔细想一想,这种实现方案存在一个问题,线程池没有充分的利用。为什么没有充分的利用?下面详细的分析。

二、异步线程池和异步任务包装

  AsyncConfigurerSupport可以帮我们指定异步任务(注有@Async注解)对应的线程池。

@Configuration
public class MyAsyncConfigurer extends AsyncConfigurerSupport {private static Logger LOGGER = LoggerFactory.getLogger(MyAsyncConfigurer.class);@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(2);taskExecutor.setMaxPoolSize(4);taskExecutor.setQueueCapacity(10);taskExecutor.setRejectedExecutionHandler((runnable, executor) -> LOGGER.error("异步线程池拒绝任务..." + runnable));taskExecutor.setThreadFactory(new MyAsyncThreadFactory());taskExecutor.initialize();return taskExecutor;}static class MyAsyncThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;MyAsyncThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "myasync-pool-" +poolNumber.getAndIncrement() +"-thread-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}}
}

  异步任务包装,除了异步,还加入了retry功能,实现指定次数的接口轮询。

@Component
public class AsyncWrapped {protected static Logger LOGGER = LoggerFactory.getLogger(AsyncWrapped.class);@Asyncpublic void asyncProcess(Runnable runnable, Callback callback, Retry retry) {try {if (retry == null) {retry = new Retry(1);}retry.execute(ctx -> {runnable.run();return null;}, ctx -> {if (callback != null) {callback.call();}return null;});} catch (Exception e) {LOGGER.error("异步调用异常...", e);}}
}

  业务代码大致逻辑如下。

asyncWrapped.asyncProcess(() -> {//调用分省接口获取发票信息//如果发票信息异常,抛出异常(进入下次重试)//否则,插入用户微信卡包}, () -> {//轮询次数用尽,用户插入卡包失败
    }, new Retry(2, 1000)
);

  这里说一下为什么线程池没有充分的利用。异步任务中包含轮询操作,轮询有一定的时间间隔,导致在这段时间间隔内,线程一直处于被闲置的状态。所以为了能更好的利用线程池资源,我们得想办法解决时间间隔的问题。假如有个延迟队列,队列里放着我们的异步任务(不包含重试机制),然后延迟(轮询的时间间隔)一定时间之后,将任务放入线程池中执行,任务执行完毕之后根据是否需要再次执行决定是否再次放入到延迟队列去,这样每个线程池中的线程都不会闲着,达到了充分利用的目的。

三、定时任务线程池和实现轮询机制

  @EnableScheduling 帮助开启@Scheduled注解解析。注册一个名字是ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME的定时任务线程池。

@Configuration
@EnableScheduling
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class TaskConfiguration {@Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledExecutorService scheduledAnnotationProcessor() {return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());}private static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-schedule-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon()) {t.setDaemon(false);}if (t.getPriority() != Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}}
}

   实现轮询任务,实现接口SchedulingConfigurer,获取ScheduledTaskRegistrar 并指定定时任务线程池。

@Override
public void configureTasks(ScheduledTaskRegistrar registrar) {this.registrar = registrar;this.registrar.setScheduler(this.applicationContext.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, ScheduledExecutorService.class));scheduledTaskRegistrarHelper = new ScheduledTaskRegistrarHelper();
}

  scheduledFutures提交定时任务时返回结果集,periodTasks 定时任务结果集。

private static final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, TimingTask> periodTasks = new ConcurrentHashMap<>();

  定时任务包装类,包含任务的执行次数(重试次数)、重试间隔、具体任务、重试次数用尽之后的回调等,以及自动结束定时任务、重试计数重置功能。

private static class TimingTask {//重试次数private Integer retry;//任务标识private String taskId;//重试间隔private Long period;//具体任务private ScheduledRunnable task;//结束回调private ScheduledCallback callback;//重试计数private AtomicInteger count = new AtomicInteger(0);//父线程MDCprivate Map<String, String> curContext;public TimingTask(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {this.retry = retry;this.taskId = taskId;this.period = period;this.task = task;this.callback = callback;this.curContext = MDC.getCopyOfContextMap();}public Long getPeriod() {return period;}public void setPeriod(Long period) {this.period = period;}public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId = taskId;}public Integer getRetry() {return retry;}public void setRetry(Integer retry) {this.retry = retry;}public AtomicInteger getCount() {return count;}public boolean reset() {for (int cnt = this.count.intValue(); cnt < this.retry; cnt = this.count.intValue()) {if (this.count.compareAndSet(cnt, 0)) {return true;}}return false;}public void process() {Map<String, String> preContext = MDC.getCopyOfContextMap();try {if (this.curContext == null) {MDC.clear();} else {// 将父线程的MDC内容传给子线程MDC.setContextMap(this.curContext);}this.task.run();exitTask(false);} catch (Exception e) {LOGGER.error("定时任务异常..." + this, e);if (count.incrementAndGet() >= this.retry) {exitTask(true);}} finally {if (preContext == null) {MDC.clear();} else {MDC.setContextMap(preContext);}}}//定时任务退出private void exitTask(boolean execCallback) {scheduledFutures.get(this.taskId).cancel(false);scheduledFutures.remove(this.getTaskId());periodTasks.remove(this.getTaskId());LOGGER.info("结束定时任务: " + this);if (execCallback && callback != null) {callback.call();}}@Overridepublic String toString() {return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE, false, false, TimingTask.class);}
}

  注意上面定时任务是如何退出的,是在某一次任务执行成功之后(没有异常抛出)或者定时任务执行次数用尽才退出的。直接调用ScheduledFuture的cancel方法可以退出定时任务。还有就是定时任务中的日志需要父线程中的日志变量,所以需要对MDC进行一下处理。

@Scope("prototype")
@Bean
public AspectTimingTask aspectTimingTask() {return new AspectTimingTask();
}@Aspect
@Component
public static class ScheduledAspect {@Around("target(AspectTimingTask)")public Object executeScheduledWrapped(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {if (proceedingJoinPoint instanceof MethodInvocationProceedingJoinPoint) {MethodInvocationProceedingJoinPoint methodJoinPoint = (MethodInvocationProceedingJoinPoint) proceedingJoinPoint;Method method = ((MethodSignature) methodJoinPoint.getSignature()).getMethod();if (AnnotatedElementUtils.isAnnotated(method, ScheduledTask.class)) {LOGGER.info("电子发票定时任务日志同步...");//其他处理
            }}return proceedingJoinPoint.proceed();}
}public static class AspectTimingTask implements Runnable {private TimingTask timingTask;@Override@ScheduledTaskpublic void run() {timingTask.process();}public void setTimingTask(TimingTask timingTask) {this.timingTask = timingTask;}
}

  AspectTimingTask 是对TimingTask 的包装类,实现了Runnable接口。主要是为了对run接口做一层切面,获取ProceedingJoinPoint 实例(公司中的日志调用链系统需要这个参数)。AspectTimingTask 的bean实例的scope是prototype,这个注意下。

public static void register(Integer retry, Long period, String taskId, ScheduledRunnable task, ScheduledCallback callback) {scheduledTaskRegistrarHelper.register(retry, taskId, period, task, callback);
}private class ScheduledTaskRegistrarHelper {public void register(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {//是否可以重置定时任务TimingTask preTask = periodTasks.get(taskId);if (null != preTask&& preTask.reset()&& existTask(taskId)) {return;}TimingTask curTask = new TimingTask(retry, taskId, period, task, callback);AspectTimingTask aspectTimingTask = applicationContext.getBean(AspectTimingTask.class);aspectTimingTask.setTimingTask(curTask);ScheduledFuture<?> scheduledFuture = registrar.getScheduler().scheduleAtFixedRate(aspectTimingTask, period);scheduledFutures.put(taskId, scheduledFuture);periodTasks.put(taskId, curTask);LOGGER.info("注册定时任务: " + curTask);}private boolean existTask(String taskId) {return scheduledFutures.containsKey(taskId) && periodTasks.containsKey(taskId);}
}

  如果taskId的定时任务已经存在则重置定时任务,否则注册新的定时任务。AspectTimingTask 实例通过ApplicationContext获取,每次获取都是一个新的实例。

  由 异步轮询任务 优化成 定时任务,充分利用了线程池。修改之后的业务代码如下。

ScheduledTaskRegistrarHelper.register(10, 5*1000L, "taskId", () -> {//调用分省接口获取发票信息//如果发票信息异常,抛出异常(进入下次重试)//否则,插入用户微信卡包
    }() -> {//轮询次数用尽,用户插入卡包失败
    }
);

  针对电子发票插入微信卡包定时任务,重试执行次数10次,每隔5秒执行一次。任务完成之后结束定时任务,执行次数用尽之后触发插入卡包失败动作。

四、参考  

     Spring异步调用原理及SpringAop拦截器链原理

     Springboot定时任务原理及如何动态创建定时任务

转载于:https://www.cnblogs.com/hujunzheng/p/10660479.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/531197.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring MVC源码——Root WebApplicationContext

Spring MVC源码——Root WebApplicationContext 打算开始读一些框架的源码,先拿 Spring MVC 练练手,欢迎点击这里访问我的源码注释, SpringMVC官方文档一开始就给出了这样的两段示例: WebApplicationInitializer示例: public class MyWebApplicationInitializer implements Web…

Spring MVC源码——Servlet WebApplicationContext

上一篇笔记(Spring MVC源码——Root WebApplicationContext)中记录了下 Root WebApplicationContext 的初始化代码.这一篇来看 Servlet WebApplicationContext 的初始化代码 DispatcherServlet 是另一个需要在 web.xml 中配置的类, Servlet WebApplicationContext 就由它来创建…

Springboot源码——应用程序上下文分析

前两篇(Spring MVC源码——Root WebApplicationContext 和 Spring MVC源码——Servlet WebApplicationContext)讲述了springmvc项目创建上下文的过程&#xff0c;这一篇带大家了解一下springboot项目创建上下文的过程。 SpringApplication引导类 SpringApplication类用于启动或…

基于zookeeper实现分布式配置中心(一)

最近在学习zookeeper&#xff0c;发现zk真的是一个优秀的中间件。在分布式环境下&#xff0c;可以高效解决数据管理问题。在学习的过程中&#xff0c;要深入zk的工作原理&#xff0c;并根据其特性做一些简单的分布式环境下数据管理工具。本文首先对zk的工作原理和相关概念做一下…

基于zookeeper实现分布式配置中心(二)

上一篇&#xff08;基于zookeeper实现分布式配置中心&#xff08;一&#xff09;&#xff09;讲述了zookeeper相关概念和工作原理。接下来根据zookeeper的特性&#xff0c;简单实现一个分布式配置中心。 配置中心的优势 1、各环境配置集中管理。 2、配置更改&#xff0c;实时推…

Redis分布式锁实战

背景 目前开发过程中&#xff0c;按照公司规范&#xff0c;需要依赖框架中的缓存组件。不得不说&#xff0c;做组件的大牛对CRUD操作的封装&#xff0c;连接池、缓存路由、缓存安全性的管控都处理的无可挑剔。但是有一个小问题&#xff0c;该组件没有对分布式锁做实现&#xff…

基于RobotFramework实现自动化测试

Java robotframework seleniumlibrary 使用Robot Framework Maven Plugin&#xff08;http://robotframework.org/MavenPlugin/&#xff09;执行自动化测试chromedriver下载&#xff1a; http://chromedriver.storage.googleapis.com/index.htmlchromedriver和chrome版本对应…

Springboot国际化信息(i18n)解析

国际化信息理解 国际化信息也称为本地化信息 。 Java 通过 java.util.Locale 类来表示本地化对象&#xff0c;它通过 “语言类型” 和 “国家/地区” 来创建一个确定的本地化对象 。举个例子吧&#xff0c;比如在发送一个具体的请求的时候&#xff0c;在header中设置一个键值对…

看了就知道为什么别人C语言学习效率那么高了

谈及C语言&#xff0c;我想C语言功能强大都应该知道、应用广泛&#xff0c;一旦掌握了后&#xff0c;你就可以理直气壮地对他人说“我是电脑高手&#xff01;”&#xff0c;而且以后若是再自学其他语言就显得轻而易举了。忧虑的是&#xff0c;C语言般博大精深&#xff0c;太难学…

C语言一看就能上手的干货!你确定你不来看吗?

本地环境设置 如果您想要设置 C 语言环境&#xff0c;您需要确保电脑上有以下两款可用的软件&#xff0c;文本编辑器和 C 编译器。 文本编辑器 这将用于输入您的程序。文本编辑器包括 Windows Notepad、OS Edit command、Brief、Epsilon、EMACS 和 vim/vi。文本编辑器的名称…

C语言爆炸干货,小白你还不来看看嘛!

①&#xff1a;数据类型 int(整型)&#xff0c;short int(短整型)&#xff0c;long int(长整型)&#xff0c; char(字符型)&#xff0c;float&#xff08;单精度浮点型&#xff09; double&#xff08;双精度浮点型&#xff09; C语言编程交流群815393895 ②&#xff1a;逻…

10万码农五年的C语言笔记!你现在知道别人为什么这么优秀了吗?

c语言对许多同学来说确实是一门比较难学的课程&#xff0c;不仅抽象&#xff0c;而且繁琐&#xff0c;但这又是一门不得不学的课程。前两节可能还有兴致听一听&#xff0c;然而&#xff0c;再过几节课就是一脸蒙比。凭空要想出一道题的算法和程序&#xff0c;根本无从下手。 所…

C语言从来都没有过时,你大爷终究是你大爷

直到今天&#xff0c;有人在喊C语言过时的语言&#xff0c;还有什么值得学习的&#xff0c;现在看Python&#xff0c;PHP等语言现在都很容易用&#xff0c;谁还在学习老C语言&#xff0c;其实这是真的吗&#xff1f;作者下载了两种语言的源代码作为下载器。由于空间的限制&…

C语言超级玛丽菜单模块源码

C语言是面向过程的&#xff0c;而C&#xff0b;&#xff0b;是面向对象的 C和C的区别&#xff1a; C是一个结构化语言&#xff0c;它的重点在于算法和…

C语言使用函数必须知道的3点注意事项!

C语言是面向过程的&#xff0c;而C&#xff0b;&#xff0b;是面向对象的 C和C的区别&#xff1a; C是一个结构化语言&#xff0c;它的重点在于算法和数据结构。C程序的设计首要考虑的是如何通过一个过程&#xff0c;对输入&#xff08;或环境条件&#xff09;进行运算处理得…

C语言/C++编程学习:C语言环境设置!

C语言是面向过程的&#xff0c;而C&#xff0b;&#xff0b;是面向对象的 C和C的区别&#xff1a; C是一个结构化语言&#xff0c;它的重点在于算法和数据结构。C程序的设计首要考虑的是如何通过一个过程&#xff0c;对输入&#xff08;或环境条件&#xff09;进行运算处理得…

C语言指针原来也可以这么的通俗易懂!

C语言是面向过程的&#xff0c;而C&#xff0b;&#xff0b;是面向对象的 C和C的区别&#xff1a; C是一个结构化语言&#xff0c;它的重点在于算法和数据结构。C程序的设计首要考虑的是如何通过一个过程&#xff0c;对输入&#xff08;或环境条件&#xff09;进行运算处理得…

C语言过时了?你在做梦?

为什么要使用C语言&#xff1f; 在过去的四十年里&#xff0c;C语言已经成为世界上最流行、最重要的一种编程语言。 C是一种融合了控制特性的现代语言&#xff0c;而我们已发现在计算机科学的理论和实践中&#xff0c;控制特性是很重要的。其设计使得用户可以自然地采用自顶向…

C/C++的转义字符详解

所有的ASCII码都可以用“\”加数字&#xff08;一般是8进制数字&#xff09;来表示。而C中定义了一些字母前加"\"来表示常见的那些不能显示的ASCII字符&#xff0c;如\0,\t,\n等&#xff0c;就称为转义字符&#xff0c;因为后面的字符&#xff0c;都不是它本来的ASCI…

C语言深入理解!助你向大佬迈进!

Dennis Ritchie 过世了&#xff0c;他发明了C语言&#xff0c;一个影响深远并彻底改变世界的计算机语言。一门经历40多年的到今天还长盛不衰的语言&#xff0c;今天很多语言都受到C的影响&#xff0c;C&#xff0c;Java&#xff0c;C#&#xff0c;Perl&#xff0c; PHP&#xf…