【JUC进阶】14. TransmittableThreadLocal

目录

1、前言

2、TransmittableThreadLocal

2.1、使用场景

2.2、基本使用

3、实现原理

4、小结


1、前言

书接上回《【JUC进阶】13. InheritableThreadLocal》,提到了InheritableThreadLocal虽然能进行父子线程的值传递,但是如果在线程池中,就无法达到预期的效果了。为了更好的解决该问题,TransmittableThreadLocal诞生了。

2、TransmittableThreadLocal

TransmittableThreadLocal 是Alibaba开源的、用于解决 “在使用线程池等会缓存线程的组件情况下传递ThreadLocal” 问题的 InheritableThreadLocal 扩展。既然是扩展,那么自然具备InheritableThreadLocal不同线程间值传递的能力。但是他也是专门为了解决InheritableThreadLocal在线程池中出现的问题的。

官网地址:https://github.com/alibaba/transmittable-thread-local

2.1、使用场景

  1. 分布式跟踪系统 或 全链路压测(即链路打标)
  2. 日志收集记录系统上下文
  3. Session级Cache
  4. 应用容器或上层框架跨应用代码给下层SDK传递信息

2.2、基本使用

我们拿《【JUC进阶】13. InheritableThreadLocal》文中最后的demo进行改造。这里需要配合TtlExecutors一起使用。这里先讲述使用方法,具体为什么下面细说。

首先,我们需要添加依赖:

<!-- https://mvnrepository.com/artifact/com.alibaba/transmittable-thread-local -->
<dependency><groupId>com.alibaba</groupId><artifactId>transmittable-thread-local</artifactId><version>2.14.2</version>
</dependency>

其次,ThreadLocal的实现改为TransmittableThreadLocal。

static ThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();

最后创建线程池的时候,使用TTL装饰器:

static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());

完整代码如下:

// threadlocal改为TransmittableThreadLocal
static ThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();// 线程池添加TtlExecutors
static ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newSingleThreadExecutor());public static void main(String[] args) throws InterruptedException {//threadLocal.set("我是主线程的threadlocal变量,变量值为:000000");// 线程池执行子线程executorService.submit(() -> {System.out.println("-----> 子线程" + Thread.currentThread() + " <----- 获取threadlocal变量:" + threadLocal.get());});// 主线程睡眠3s,模拟运行Thread.sleep(3000);// 将变量修改为11111,在InheritableThreadLocal中修改是无效的threadLocal.set("我是主线程的threadlocal变量,变量值为:11111");// 这里线程池重新执行线程任务executorService.submit(() -> {System.out.println("-----> 子线程" + Thread.currentThread() + " <----- 获取threadlocal变量:" + threadLocal.get());});// 线程池关闭executorService.shutdown();
}

执行看下效果:

已经成功获取到threadlocal变量。

该方式也解决了因为线程被重复利用,而threadlocal重新赋值失效的问题。

3、实现原理

首先可以看到TransmittableThreadLocal继承InheritableThreadLocal,同时实现了TtlCopier接口。TtlCopier接口只提供了一个方法copy()。看到这里,可能有人大概猜出来他的实现原理了,既然实现了copy()方法,那么大概率是将父线程的变量复制一份存起来,接着找个地方存起来,然后找个适当的时机再还回去。没错,其实就是这样。

public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> {
}

知道了TransmittableThreadLocal类的定义之后,我们再来看一个重要的属性holder:

// Note about the holder:
// 1. holder self is a InheritableThreadLocal(a *ThreadLocal*).
// 2. The type of value in the holder is WeakHashMap<TransmittableThreadLocal<Object>, ?>.
//    2.1 but the WeakHashMap is used as a *Set*:
//        the value of WeakHashMap is *always* null, and never used.
//    2.2 WeakHashMap support *null* value.
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {@Overrideprotected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {return new WeakHashMap<>();}@Overrideprotected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {return new WeakHashMap<>(parentValue);}};

这里存放的是一个全局的WeakMap(同ThreadLocal一样,weakMap也是为了解决内存泄漏的问题),里面存放了TransmittableThreadLocal对象并且重写了initialValue和childValue方法,尤其是childValue,可以看到在即将异步时父线程的属性是直接作为初始化值赋值给子线程的本地变量对象。引入holder变量后,也就不必对外暴露Thread中的 inheritableThreadLocals,保持ThreadLocal.ThreadLocalMap的封装性。

而TransmittableThreadLocal中的get()和set()方法,都是从该holder中获取或添加该map。

重点来了,前面不是提到了需要借助于TtlExecutors.getTtlExecutorService()包装线程池才能达到效果吗?我们来看看这里做了什么事。

我们从TtlExecutors.getTtlExecutorService()方法跟进可以发现一个线程池的ttl包装类ExecutorServiceTtlWrapper。其中包含了我们执行线程的方法submit()和execute()。我们进入submit()方法:

@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {return executorService.submit(TtlCallable.get(task, false, idempotent));
}

可以发现在线程池进行任务执行时,对我们提交的任务进行了一层预处理,TtlCallable.get()。TtlCallable也是Callable的装饰类,同样还有TtlRunnable,也是同样道理。我们跟进该方法偷瞄一眼:

@Nullable
@Contract(value = "null, _, _ -> null; !null, _, _ -> !null", pure = true)
public static <T> TtlCallable<T> get(@Nullable Callable<T> callable, boolean releaseTtlValueReferenceAfterCall, boolean idempotent) {if (callable == null) return null;if (callable instanceof TtlEnhanced) {// avoid redundant decoration, and ensure idempotencyif (idempotent) return (TtlCallable<T>) callable;else throw new IllegalStateException("Already TtlCallable!");}return new TtlCallable<>(callable, releaseTtlValueReferenceAfterCall);
}

上面判断下当前线程的类型是否已经是TtlEnhanced,如果是直接返回,否则创建一个TtlCallable。接着进入new TtlCallable()方法:

private TtlCallable(@NonNull Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) {this.capturedRef = new AtomicReference<>(capture());this.callable = callable;this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall;
}

可以看到在初始化线程的时候,调用了一个capture()方法,并将该方法得到的值存放在capturedRef中。没错,这里就是上面我们提到的将父线程的本地变量复制一份快照,存放起来。跟进capture():

@NonNull
public static Object capture() {final HashMap<Transmittee<Object, Object>, Object> transmittee2Value = newHashMap(transmitteeSet.size());for (Transmittee<Object, Object> transmittee : transmitteeSet) {try {transmittee2Value.put(transmittee, transmittee.capture());} catch (Throwable t) {if (logger.isLoggable(Level.WARNING)) {logger.log(Level.WARNING, "exception when Transmitter.capture for transmittee " + transmittee +"(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);}}}return new Snapshot(transmittee2Value);
}

这里的transmitteeSet是一个存放Transmitteedede 集合,在初始化中会将我们 前面提到的holder注册进去:

private static final Set<Transmittee<Object, Object>> transmitteeSet = new CopyOnWriteArraySet<>();static {registerTransmittee(ttlTransmittee);registerTransmittee(threadLocalTransmittee);
}@SuppressWarnings("unchecked")
public static <C, B> boolean registerTransmittee(@NonNull Transmittee<C, B> transmittee) {return transmitteeSet.add((Transmittee<Object, Object>) transmittee);
}

跟进transmittee.capture()方法,该方法由静态内部类Transmitter实现并重写,com.alibaba.ttl.TransmittableThreadLocal.Transmitter.Transmittee#capture

private static final Transmittee<HashMap<TransmittableThreadLocal<Object>, Object>, HashMap<TransmittableThreadLocal<Object>, Object>> ttlTransmittee =new Transmittee<HashMap<TransmittableThreadLocal<Object>, Object>, HashMap<TransmittableThreadLocal<Object>, Object>>() {@NonNull@Overridepublic HashMap<TransmittableThreadLocal<Object>, Object> capture() {final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = newHashMap(holder.get().size());for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {ttl2Value.put(threadLocal, threadLocal.copyValue());}return ttl2Value;}
}

transmittee.capture()扫描holder里目前存放的k-v里的key,就是需要传给子线程的TTL对象,其中调用的threadLocal.copyValue()便是前面看到的TtlCopier接口提供的方法。

看到这里已经大致符合我们前面的猜想,将变量复制一份存起来。那么不出意外接下来应该就是要找个适当的机会还回去。我们接着看。

接下来我们看真正执行线程的时候,也就是call()方法。由于前面线程被TtlCallable包装过,以为这里的call()方法肯定是TtlCallable.call():

@Override
@SuppressFBWarnings("THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION")
public V call() throws Exception {// 获取由之前捕获到的父线程变量集final Object captured = capturedRef.get();if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) {throw new IllegalStateException("TTL value reference is released after call!");}// 这里的backup是当前线程原有的变量,这里进行备份,等线程执行完毕后,会将该变量进行恢复final Object backup = replay(captured);try {// 任务执行return callable.call();} finally {// 恢复上述提到的backup原有变量restore(backup);}
}

果然,在执行线程时,先获取之前存放起来的变量。然后调用replay():

@NonNull
public static Object replay(@NonNull Object captured) {final Snapshot capturedSnapshot = (Snapshot) captured;final HashMap<Transmittee<Object, Object>, Object> transmittee2Value = newHashMap(capturedSnapshot.transmittee2Value.size());for (Map.Entry<Transmittee<Object, Object>, Object> entry : capturedSnapshot.transmittee2Value.entrySet()) {Transmittee<Object, Object> transmittee = entry.getKey();try {Object transmitteeCaptured = entry.getValue();transmittee2Value.put(transmittee, transmittee.replay(transmitteeCaptured));} catch (Throwable t) {if (logger.isLoggable(Level.WARNING)) {logger.log(Level.WARNING, "exception when Transmitter.replay for transmittee " + transmittee +"(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);}}}return new Snapshot(transmittee2Value);
}

继续跟进transmittee.replay(transmitteeCaptured):

@NonNull
@Override
public HashMap<TransmittableThreadLocal<Object>, Object> replay(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {final HashMap<TransmittableThreadLocal<Object>, Object> backup = newHashMap(holder.get().size());for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {TransmittableThreadLocal<Object> threadLocal = iterator.next();// 这里便是所有原生的本地变量都暂时存储在backup里,用于之后恢复用backup.put(threadLocal, threadLocal.get());// clear the TTL values that is not in captured// avoid the extra TTL values after replay when run task// 这里检查如果当前变量不存在于捕获到的线程变量,那么就将他清除掉,对应线程的本地变量也清理掉// 为什么要清除?因为从使用这个子线程做异步那里,捕获到的本地变量并不包含原生的变量,当前线程// 在做任务时的首要目标,是将父线程里的变量完全传递给任务,如果不清除这个子线程原生的本地变量,// 意味着很可能会影响到任务里取值的准确性。这也就是为什么上面需要做备份的原因。if (!captured.containsKey(threadLocal)) {iterator.remove();threadLocal.superRemove();}}// set TTL values to capturedsetTtlValuesTo(captured);// call beforeExecute callbackdoExecuteCallback(true);return backup;
}

继续跟进setTtlValuesTo(captured),这里就是把父线程本地变量赋值给当前线程了:

private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {TransmittableThreadLocal<Object> threadLocal = entry.getKey();threadLocal.set(entry.getValue());}
}

到这里基本的实现原理也差不多了,基本和我们前面猜想的一致。但是这里还少了前面提到的backup变量如何恢复的步骤,既然到这里了,一起看一下,跟进restore(backup):

public static void restore(@NonNull Object backup) {for (Map.Entry<Transmittee<Object, Object>, Object> entry : ((Snapshot) backup).transmittee2Value.entrySet()) {Transmittee<Object, Object> transmittee = entry.getKey();try {Object transmitteeBackup = entry.getValue();transmittee.restore(transmitteeBackup);} catch (Throwable t) {if (logger.isLoggable(Level.WARNING)) {logger.log(Level.WARNING, "exception when Transmitter.restore for transmittee " + transmittee +"(class " + transmittee.getClass().getName() + "), just ignored; cause: " + t, t);}}}
}

继续看transmittee.restore(transmitteeBackup):

@Override
public void restore(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {// call afterExecute callbackdoExecuteCallback(false);for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {TransmittableThreadLocal<Object> threadLocal = iterator.next();// clear the TTL values that is not in backup// avoid the extra TTL values after restoreif (!backup.containsKey(threadLocal)) {iterator.remove();threadLocal.superRemove();}}// restore TTL valuessetTtlValuesTo(backup);
}

与replay类似,只是重复进行了将backup赋给当前线程的步骤。到此基本结束。附上官网的时序图帮助理解:

4、小结

所以总结下来,TransmittableThreadLocal的实现原理主要就是依赖于TtlRunnable或TtlCallable装饰类的预处理方法,TtlExecutors是将普通线程转换成Ttl包装的线程,而ttl包装的线程会进行本地变量的预处理,也就是capture()拷贝一份快照到内存中,然后通过replay方法将父线程的变量赋值给当前线程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/621930.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spring-mvc(1):Hello World

虽然目前大多数都是使用springboot来开发java程序&#xff0c;或者使用其来为其他端提供接口&#xff0c;而为其他端提供接口&#xff0c;这些功能都是依靠springmvc实现的&#xff0c;所以有必要学习一下spring-mvc&#xff0c;这样才能更好的学习springboot。 一&#xff0c…

c语言题目之九九乘法表的打印

文章目录 题目一、题目分析二&#xff0c;代码编写三&#xff0c;拓展 题目 用c语言打印九九乘法表 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、题目分析 在上面图我们假设一个乘法为一个单位&#xff0c;在这里我们可以看到第一行有一行一列&…

掌握WPF控件:熟练常用属性(二)

WPF布局常用控件&#xff08;二&#xff09; Calendar 用于日期选择的控件。它提供了一个可视化的界面&#xff0c;可以通过它来选择特定的日期。 常用属性描述DisplayMode用来设置Calendar的显示模式&#xff0c;有三种可选值&#xff1a;默认Month&#xff08;月&#xff…

自编C++题目——输入程序

预估难度 简单 题目描述 小明编了一个输入程序&#xff0c;当用户的输入之中有<时&#xff0c;光标移动到最右边&#xff1b;当输入有>时&#xff0c;光标移动到最左边&#xff0c;当输入有^时&#xff0c;光标移动到前一个字符&#xff0c;当输入为#时&#xff0c;清…

SLAM第十四讲

基础知识 四元数 先将三维空间的点p(x,y,z) 变成四元数的表示q(0,x,y,z) 其中0为四元数的实部&#xff0c;x,y,z为四元数的虚部。 实部为0的四元数也叫纯虚四元数。 通过 左乘四元数&#xff…

YOLOv8 Ultralytics:使用Ultralytics框架进行SAM图像分割

YOLOv8 Ultralytics&#xff1a;使用Ultralytics框架进行SAM图像分割 前言相关介绍前提条件实验环境安装环境项目地址LinuxWindows 使用Ultralytics框架进行SAM图像分割参考文献 前言 由于本人水平有限&#xff0c;难免出现错漏&#xff0c;敬请批评改正。更多精彩内容&#xf…

TypeScript进阶(四)声明文件

✨ 专栏介绍 TypeScript是一种由微软开发的开源编程语言&#xff0c;它是JavaScript的超集&#xff0c;意味着任何有效的JavaScript代码都是有效的TypeScript代码。TypeScript通过添加静态类型和其他特性来增强JavaScript&#xff0c;使其更适合大型项目和团队开发。 在TypeS…

长亭科技-雷池WAF的安装与使用

目录 1、安装雷池 2、登录雷池 3、简单配置 4、防护测试 5、其他补充 1、安装雷池 在Linux系统上执行如下命令 &#xff08;需要docker环境&#xff0c;提前把docker、docker-compose 装好&#xff09; bash -c "$(curl -fsSLk https://waf-ce.chaitin.cn/release…

【电源专题】案例:不同模块同一个管脚默认状态不一样会导致什么异常?

案例背景:在产品设计中,有时候会兼容两个不同供应商同一个方案的模块。比如两个供应商使用的内部方案都是一样的芯片,封装也是兼容的。但是由于专利、LAYOUT方便、软件开发方便等角度来看,可能会存在不同模块供应商的同一个PIN脚对应的芯片内部的管脚不一样。管脚不一样那么…

java基础知识点系列——分支语句(六)

java基础知识点系列——分支语句&#xff08;六&#xff09; 流程控制 流程控制语句分类 顺序结构分支结构循环结构 顺序结构 顺序结构是程序中最简单最基本的流程控制&#xff0c;没有特定的语法结构&#xff0c;按照代码的先后顺序&#xff0c;依次执行。 if语句 if语…

39岁学JAVA来得及吗?

39岁学JAVA来得及吗? 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「Java的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;学习Java编…

五种嵌入式经典通信总线协议

一.先前知识 1.并行与串行 并行通信和串行通信是两种不同的数据传输方式&#xff1a; 并行通信&#xff1a;并行通信是指在同一时间使用多条并行传输的线路传输多个比特的数据。每个比特使用独立的线路进行传输&#xff0c;同时进行。这样可以在一个时钟周期内传输多个比特&…

螺纹钢负公差轧制中的测径仪应用

1、负公差轧制意义 为了满足生产使用要求&#xff0c;并根据轧制水平&#xff0c;在产品标准冲规定钢材尺寸的波动范围&#xff0c;允许钢材的实际尺寸与公称尺之间有一定的偏差&#xff0c;这个偏差一般称公差&#xff0c;公差分正、负公差&#xff0c;钢材按负公差轧制时&…

02.neuvector之Enforcer容器功能介绍

原文链接 一、功能介绍 Enforcer容器在neuvector中主要负责网络与DLP/WAF的规则策略的实现以及网络数据的采集上报&#xff1b; 以DaemonSet的方式运行&#xff0c;主要有三个进程monitor、agent、dp&#xff1b;进程分别主要职责如下&#xff1a; monitor&#xff1a;负责监…

[SpringAop + Logback +MDC] 现网必备全链路日志追踪

缘起&#xff1a;前几天有个粉丝私信&#xff0c;想了解现网环境如果出现问题&#xff0c;怎么快速定位。可能有些小伙伴这时候就会脱口而出&#xff0c;直接去看log 呗&#xff0c;有什么好说的。 但是&#xff0c;众所周知&#xff0c;后端服务面向的前端应用是多种多样的&am…

Shiro框架:Shiro登录认证流程源码解析

目录 1.用户登录认证流程 1.1 生成认证Token 1.2 用户登录认证 1.2.1 SecurityManager login流程解析 1.2.1.1 authenticate方法进行登录认证 1.2.1.1.1 单Realm认证 1.2.1.2 认证通过后创建登录用户对象 1.2.1.2.1 复制SubjectContext 1.2.1.2.2 对subjectContext设…

二、MySQL安装

目录 1、双击mysql8的安装向导 2、分为首次安装和再安装 1&#xff09;、首次安装 &#xff08;1&#xff09;如果是首次安装mysql系列的产品&#xff0c;需要先安装mysql产品的安装向导 &#xff08;2&#xff09;选择安装模式 2&#xff09;、不是首次安装 &#xff0…

学会这个技巧,制作电子杂志SOEASY

​电子杂志是一种非常流行的传播方式&#xff0c;它能够以更加生动、直观的方式展示你的品牌和产品。通过电子杂志&#xff0c;你可以将文字、图片、视频等多种元素有机地结合起来&#xff0c;创造出令人难忘的视觉效果。 如果你想制作一本电子杂志&#xff0c;但不知道从何入…

POSIX API与网络协议栈

本文介绍linux中与tcp网络通信相关的POSIX API&#xff0c;在每次调用的时候&#xff0c;网络协议栈会进行的操作与记录。 POSIX API Posix API&#xff0c;提供了统一的接口&#xff0c;使程序能得以在不同的系统上运行。简单来说不同的操作系统进行同一个活动&#xff0c;比…

QT上位机开发(进度条操作)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 进度条是一个比较常见的控件。如果某个操作需要很长的时间才能完成&#xff0c;那么这个时候最好有一个进度条提示&#xff0c;这样比较容易平复一…