0、前言
TransmittableThreadLocal,简称 TTL,是阿里巴巴开源的一个Java库,它能够实现ThreadLocal在多线程间的值传递,适用于使用线程池、异步调用等需要线程切换的场景,解决了ThreadLocal在使用父子线程、线程池时不能正确传递值的问题。
核心实现:捕获(capture)- 重放(replay)- 恢复(restore)
- 捕获:将父线程的 TTL/ThreadLocal 拷贝一份到子线程中存为快照;
private static class Snapshot {final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;final HashMap<ThreadLocal<Object>, Object> threadLocal2Value;}
- 重放:将快照中的内容存入子线程的 TTL/ThreadLocal 中,并移除不存在快照中的子线程已经存在的 TTL/ThreadLocal;
- 恢复:清除子线程的 TTL/ThreadLocal。
1、上下文乱象
背景:为了实现在异步线程中也能正确进行通用字段的填充,引入了 TTL,将原先存储用户上下文信息的 ThreadLocal 换成了 TTL。(注:异步线程通过线程池进行管理)
乱象: 子线程在执行任务的过程中,用户上下文出现了两种状态:run() 执行前后 – 正确信息、run() 执行中 – null,如下图所示。
代码部分:如下所示。
功能逻辑
线程池装饰器
字段填充
2、没有使用 TtlRunnable
capture,replay,restore 本质是线程任务执行前后的增强方法,这些方法的调用发生于 TtlRunnable 的 run 方法中。
/*** wrap method {@link Runnable#run()}.*/@Overridepublic void run() {final Object captured = capturedRef.get();if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {throw new IllegalStateException("TTL value reference is released after run!");}final Object backup = replay(captured);try {runnable.run();} finally {restore(backup);}}
使用方式:
- 直接调用 TtlRunnable.get(…) 对 Runnable 进行包装增强;
- 通过 TtlExecutors 工具类获取相应的包装类。
错误例子:
正确例子:
3、父子线程引用共享问题
TTL 默认的上下文复制方式是浅拷贝,这就会造成父子线程中的上下文信息出现共享问题。解决这一问题的方法为:重写 TTL 的 copy 方法,将浅拷贝换成深拷贝。
/*** Computes the value for this transmittable thread-local variable* as a function of the source thread's value at the time the task* Object is created.* <p>* This method is called from {@link TtlRunnable} or* {@link TtlCallable} when it create, before the task is started.* <p>* This method merely returns reference of its source thread value(the shadow copy),* and should be overridden if a different behavior is desired.** @since 1.0.0*/public T copy(T parentValue) {return parentValue;}
错误例子:
private final static ThreadLocal<Map<String, Integer>> transmittableThreadLocal = new TransmittableThreadLocal<Map<String, Integer>>() {@Overrideprotected Map<String, Integer> initialValue() {return new HashMap<>();}};private static int i = 0;public static void main(String[] args) {transmittableThreadLocal.get().put(String.format("key-%d", ++i), i);Executor ttlExecutor = TtlExecutors.getTtlExecutor(Executors.newFixedThreadPool(1));CompletableFuture.runAsync(()-> {try {Thread.sleep(3 * 1000);} catch (InterruptedException e) {}System.out.println(StrUtil.format("[{}]子线程:{}", LocalTime.now(), transmittableThreadLocal.get()));}, ttlExecutor);transmittableThreadLocal.get().put(String.format("key-%d", ++i), i);System.out.println(StrUtil.format("[{}]父线程:{}", LocalTime.now(), transmittableThreadLocal.get()));transmittableThreadLocal.remove();}
正确例子:
private final static ThreadLocal<Map<String, Integer>> transmittableThreadLocal = new TransmittableThreadLocal<Map<String, Integer>>() {@Overrideprotected Map<String, Integer> initialValue() {return new HashMap<>();}@Overridepublic Map<String, Integer> copy(Map<String, Integer> parentValue) {return parentValue != null ? new HashMap<>(parentValue) : null;}};private static int i = 0;public static void main(String[] args) {transmittableThreadLocal.get().put(StrUtil.format("key-{}", ++i), i);Executor ttlExecutor = TtlExecutors.getTtlExecutor(Executors.newFixedThreadPool(1));CompletableFuture.runAsync(()-> {try {Thread.sleep(3 * 1000);} catch (InterruptedException e) {}System.out.println(StrUtil.format("[{}]子线程:{}", LocalTime.now(), transmittableThreadLocal.get()));}, ttlExecutor);transmittableThreadLocal.get().put(String.format("key-%d", ++i), i);System.out.println(StrUtil.format("[{}]父线程:{}", LocalTime.now(), transmittableThreadLocal.get()));transmittableThreadLocal.remove();}
拓展:捕获、重放期间的线程切换和 ThreadLocal 变化。
捕获:
重放:
- 备份
- 重新设置