用户会话信息在异步事件/线程池的传递
@author:shengfq
@date:2024-07-29
@version:1.0
背景:
同事写的一个代码功能,是在一个主线程中通过如下代码进行异步任务的执行,结果遇到了问题.
1.ThreadPool.execute(Runnable)启动一个子线程执行异步任务
2.applicationContext.publishEvent(applicationEvent)启动一个异步事件执行异步任务
问题:
由于异步任务线程内也需要用户的身份会话信息UserDetail对象,他将该对象作为payload直接从主线程通过方法传递进去(硬编码),导致业务方法带了个UserDetail的参数,而且进入到子线程后,通过DetailHelper.setUserDetail(UserDetail)这种方式将会话数据放入Context后,执行service方法却意外清理了UserDetail对象.为什么会出现这种情况? 用户会话信息在这种场景下如何传递比较合适?
@Async(ThreadPoolName)
@EventListener(PublishEvent event){UserDetail userDetail=event.getPayload();DetailHelper.setUserDetail(userDetail);...DetailHelper.getUserDetail();//是null
}
实现原理:
通过定制化线程池对象ThreadPoolExecutor的submit(),进行包装,将Context的用户信息通过方法传参给子线程的ContextHolder.
代码如下:
1. ITask ICallback 这是执行异步任务的抽象,每个任务一个类一个回调
public interface ITask {/**执行异步任务*/void executeTask();
}public interface ICallback {/**是否执行成功*/void callback(boolean result);
}
2. EssContextHolder 线程变量上下文
package org.hzero.samples.core.context;/*** 线程变量上下文*/
public class EssContextHolder {private EssContextHolder() {}/*** sid*/private final static ThreadLocal<String> SID = new ThreadLocal<>();/*** token*/private final static ThreadLocal<String> TOKEN = new ThreadLocal<>();/*** 联盟code*/private final static ThreadLocal<String> UNION_CODE = new ThreadLocal<>();/*** 联盟unionId*/private final static ThreadLocal<String> UNION_ID = new ThreadLocal<>();/*** 设置SID** @param sid*/public static void setSID(String sid) {EssContextHolder.SID.set(sid);}/*** 获取SID*/public static String getSID() {return EssContextHolder.SID.get();}/*** 设置TOKEN** @param token*/public static void setToken(String token) {EssContextHolder.TOKEN.set(token);}/*** 获取TOKEN*/public static String getToken() {return EssContextHolder.TOKEN.get();}/*** 设置unionCode*/public static void setUnionCode(String unionCode) {EssContextHolder.UNION_CODE.set(unionCode);}/*** 获取unionCode*/public static String getUnionCode() {return EssContextHolder.UNION_CODE.get();}/*** 设置unionId*/public static void setUnionId(String unionId) {EssContextHolder.UNION_ID.set(unionId);}/*** 获取联盟unionId*/public static String getUnionId() {return EssContextHolder.UNION_ID.get();}
}
3. ThreadPoolExecutorMdcWrapper 线程池的个性化定制类
package org.hzero.samples.core.context;import org.slf4j.MDC;import java.util.concurrent.*;/*** @ClassName ThreadPoolExecutorMdcWrapper* @Description 线程池的个性化定制类* @Author shengfq* @Date 2021/5/28 0028 上午 10:53* @Version*/
public class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {public ThreadPoolExecutorMdcWrapper(AsyncTaskThreadPoolConfig config, ThreadFactory threadFactory, RejectedExecutionHandler handler ) {this(config.getCorePoolSize(), config.getMaxPoolSize(), config.getKeepAliveSecond(), TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(config.getQueueCapacity()),threadFactory,handler);}public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);}public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);}public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}@Overridepublic void execute(Runnable task) {String sid = EssContextHolder.getSID();String token = EssContextHolder.getToken();String unionId = EssContextHolder.getUnionId();super.execute(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap()));}@Overridepublic <T> Future<T> submit(Runnable task, T result) {String sid = EssContextHolder.getSID();String token = EssContextHolder.getToken();String unionId = EssContextHolder.getUnionId();return super.submit(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap()), result);}@Overridepublic <T> Future<T> submit(Callable<T> task) {String sid = EssContextHolder.getSID();String token = EssContextHolder.getToken();String unionId = EssContextHolder.getUnionId();return super.submit(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap()));}@Overridepublic Future<?> submit(Runnable task) {String sid = EssContextHolder.getSID();String token = EssContextHolder.getToken();String unionId = EssContextHolder.getUnionId();return super.submit(ThreadMdcUtil.wrap(sid,token,unionId,task, MDC.getCopyOfContextMap()));}
}
4. ThreadMdcUtil.java Runnable和Callable的wrap包装,增加环境变量的绑定
package org.hzero.samples.core.context;import org.slf4j.MDC;import java.util.Map;
import java.util.concurrent.Callable;/*** @ClassName ThreadMdcUtil* @Description * @Author shengfq* @Date 2021/5/28 0028 上午 10:54* @Version*/
public class ThreadMdcUtil {public static <T> Callable<T> wrap(final String sid,final String token,final String unionId,final Callable<T> callable, final Map<String, String> context) {return () -> {EssContextHolder.setSID(sid);EssContextHolder.setToken(token);EssContextHolder.setUnionId(unionId);if (context == null) {MDC.clear();} else {MDC.setContextMap(context);}try {return callable.call();} finally {MDC.clear();}};}public static Runnable wrap(final String sid,final String token,final String unionId,final Runnable runnable, final Map<String, String> context) {return () -> {EssContextHolder.setSID(sid);EssContextHolder.setToken(token);EssContextHolder.setUnionId(unionId);if (context == null) {MDC.clear();} else {MDC.setContextMap(context);}try {runnable.run();} finally {MDC.clear();}};}
}
5. ExecuteTaskUtils 异步任务执行器(对调用者开放)
package org.hzero.samples.core.context;import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;import java.util.Map;/*** @ClassName ExecuteTaskUtils* @Description 执行异步线程工具类* @Author shengfq* @Date 2021/5/28 0028 上午 8:23* @Version*/
@Slf4j
public class ExecuteTaskUtils {private static ExecuteTaskService executeTaskService;/*** 获取单例对象** @return*/private static synchronized ExecuteTaskService getInstance() {if (executeTaskService == null) {executeTaskService = new ExecuteTaskService();}return executeTaskService;}/*** 提交异步任务** @return*/public static void submitTask(ITask task) {submitTask(task, null);}/*** 提交异步任务,执行结束后回调方法.** @param task* @param callback*/public static void submitTask(ITask task, ICallback callback) {Map<String, String> mdcMap = MDC.getCopyOfContextMap();String sid = EssContextHolder.getSID();String token = EssContextHolder.getToken();String unionId = EssContextHolder.getUnionId();getInstance().submitTask(() -> {try {// 设置if (mdcMap != null) {MDC.setContextMap(mdcMap);}EssContextHolder.setSID(sid);EssContextHolder.setToken(token);EssContextHolder.setUnionId(unionId);task.executeTask();if (callback != null) {callback.callback(true);}} catch (Exception e) {log.error("执行异步任务异常:", e);if (callback != null) {callback.callback(false);}}});}
}
6. ExecuteTaskService.java 对线程池对象ThreadPoolExecutor进行初始化,异步任务api封装
package org.hzero.samples.core.context;import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Slf4j
public class ExecuteTaskService {protected static ThreadPoolExecutor pools = null;/*** 提交任务请求* @param task*/public void submitTask(Runnable task) {pools.execute(task);}/*** 停止线程池*/public void shutdown() {pools.shutdown();}/*** 初始化线下池*/public ExecuteTaskService(){initPool(6,10);}/*** 初始化线下池*/public ExecuteTaskService(int corePoolSize, int maxPoolSize){initPool(corePoolSize,maxPoolSize);}private void initPool(int corePoolSize,int maxPoolSize){ThreadFactory guavaThreadFactory = new ThreadFactoryBuilder().setNameFormat("ess-task-pool-%d").build();pools = new ThreadPoolExecutorMdcWrapper(corePoolSize, maxPoolSize, 10L,TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10000),guavaThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());}
}
总结
这是我几年前实现的环境变量传递,在异步线程池执行异步任务时通过公共组装实现,而不用用户自己每次传递会话信息,同时也实现了线程池对象的定制化,对于异步事件@Async注解如果指定了线程池名字,原则上也是使用了我们自己初始化的ThreadPoolExecutorMdcWrapper对象.