一、三兄弟
ThreadLocal:线程本地变量,维护当前线程内变量,不可以其他线程共享
InheritableThreadLocal(可继承的):维护当前线程以及子线程变量,可共享,一个线程修改对象引用所有线程的值都发生改变,有线程安全问题,通过Thread.init()方法初始化,不适用线程池模式
TransmittableThreadLocal(可传输的):维护变量可在多线程间共享,适用于线程池
二、原理分析
1.Thread
//www.greatytc.com/p/b7d2958c5168
2.InheritableThreadLocal
使用
public class InheritableThreadLocalTest {
private static ThreadLocal<Integer> tl = new InheritableThreadLocal<>();
private static ThreadLocal<Hello> tl2 = new InheritableThreadLocal<>();
public static void main(String[] args) throws Exception {
tl.set(1);
Hello hello = new Hello();
hello.setName("init");
tl2.set(hello);
System.out.printf("当前线程名称: %s, main方法内获取线程内数据为: tl = %s,tl2.name = %s%n",
Thread.currentThread().getName(), tl.get(), tl2.get().getName());
fc();
new Thread(() -> {
Hello hello1 = tl2.get();
hello1.setName("init2");
fc();
}).start();
Thread.sleep(1000L); //保证下面fc执行一定在上面异步代码之后执行
fc(); //继续在主线程内执行,验证上面那一步是否对主线程上下文内容造成影响
}
private static void fc() {
System.out.printf("当前线程名称: %s, fc方法内获取线程内数据为: tl = %s,tl2.name = %s%n",
Thread.currentThread().getName(), tl.get(), tl2.get().getName());
}
}
@Data
class Hello {
private String name;
}
输出
当前线程名称: main, main方法内获取线程内数据为: tl = 1,tl2.name = init
当前线程名称: main, fc方法内获取线程内数据为: tl = 1,tl2.name = init
当前线程名称: Thread-0, fc方法内获取线程内数据为: tl = 1,tl2.name = init2
当前线程名称: main, fc方法内获取线程内数据为: tl = 1,tl2.name = init2
说明主线程和子线程维护的引用变量指向同一个对象
原理
InheritableThreadLocal是ThreadLocal子类
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
/**
* Computes the child's initial value for this inheritable thread-local
* variable as a function of the parent's value at the time the child
* thread is created. This method is called from within the parent
* thread before the child is started.
* <p>
* This method merely returns its input argument, and should be overridden
* if a different behavior is desired.
*
* @param parentValue the parent thread's value
* @return the child thread's initial value
*/
protected T childValue(T parentValue) {
return parentValue;
}
/**
* Get the map associated with a ThreadLocal.
*
* @param t the current thread
*/
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
/**
* Create the map associated with a ThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the table.
*/
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
调用InheritableThreadLocal.set()方法是调用父类ThreadLocal.set()
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
调用getMap()方法,调用的是InheritableThreadLocal.getMap()方法。变量inheritableThreadLocals是Thread的一个属性
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
/*
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
inheritableThreadLocals属性在子线程初始化的时候默认会指向父线程的引用:this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}
this.name = name;
Thread parent = currentThread();
SecurityManager security = System.getSecurityManager();
if (g == null) {
/* Determine if it's an applet or not */
/* If there is a security manager, ask the security manager
what to do. */
if (security != null) {
g = security.getThreadGroup();
}
/* If the security doesn't have a strong opinion of the matter
use the parent thread group. */
if (g == null) {
g = parent.getThreadGroup();
}
}
/* checkAccess regardless of whether or not threadgroup is
explicitly passed in. */
g.checkAccess();
/*
* Do we have the required permissions?
*/
if (security != null) {
if (isCCLOverridden(getClass())) {
security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
}
}
g.addUnstarted();
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
// 这一行代码将当前线程的引用指向父线程的引用
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;
/* Set thread ID */
tid = nextThreadID();
}
所以在子线程初始化的时候就会拿到父线程的变量。但是这样会有一个问题,如果变量是只读的还可以,如果有修改操作就会出现线程安全问题。所以就引出了InheritableThreadLocal的子类TransmittableThreadLocal。
3.TransmittableThreadLocal
使用
public class TransmittableThreadLocalTest {
// 需要注意的是,使用TTL的时候,要想传递的值不出问题,线程池必须得用TTL加一层代理(下面会讲这样做的目的)
private static final ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(2));
//这里采用TTL的实现
private static final ThreadLocal<Integer> tl = new TransmittableThreadLocal<>();
public static void main(String[] args) {
new Thread(() -> {
String mainThreadName = "main_01";
tl.set(1);
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之前(1), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
sleep(3L); //确保上面的会在tl.set执行之前执行
tl.set(2); // 等上面的线程池第一次启用完了,父线程再给自己赋值
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之后(2), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
System.out.printf("线程名称-%s, 变量值=%s%n", Thread.currentThread().getName(), tl.get());
}).start();
new Thread(() -> {
String mainThreadName = "main_02";
tl.set(3);
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之前(3), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
sleep(1L); //确保上面的会在tl.set执行之前执行
tl.set(4); // 等上面的线程池第一次启用完了,父线程再给自己赋值
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
executorService.execute(() -> {
sleep(1L);
System.out.printf("本地变量改变之后(4), 父线程名称-%s, 子线程名称-%s, 变量值=%s%n", mainThreadName, Thread.currentThread().getName(), tl.get());
});
System.out.printf("线程名称-%s, 变量值=%s%n", Thread.currentThread().getName(), tl.get());
}).start();
}
private static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果
本地变量改变之前(3), 父线程名称-main_02, 子线程名称-pool-1-thread-2, 变量值=3
线程名称-Thread-1, 变量值=2
本地变量改变之前(1), 父线程名称-main_01, 子线程名称-pool-1-thread-1, 变量值=1
线程名称-Thread-2, 变量值=4
本地变量改变之前(1), 父线程名称-main_01, 子线程名称-pool-1-thread-2, 变量值=1
本地变量改变之前(3), 父线程名称-main_02, 子线程名称-pool-1-thread-1, 变量值=3
本地变量改变之前(1), 父线程名称-main_01, 子线程名称-pool-1-thread-2, 变量值=1
本地变量改变之前(3), 父线程名称-main_02, 子线程名称-pool-1-thread-1, 变量值=3
本地变量改变之后(4), 父线程名称-main_02, 子线程名称-pool-1-thread-2, 变量值=4
本地变量改变之后(4), 父线程名称-main_02, 子线程名称-pool-1-thread-1, 变量值=4
本地变量改变之后(4), 父线程名称-main_02, 子线程名称-pool-1-thread-2, 变量值=4
本地变量改变之后(2), 父线程名称-main_01, 子线程名称-pool-1-thread-1, 变量值=2
本地变量改变之后(2), 父线程名称-main_01, 子线程名称-pool-1-thread-2, 变量值=2
本地变量改变之后(2), 父线程名称-main_01, 子线程名称-pool-1-thread-1, 变量值=2
两个主线程里都使用线程池异步,而且值在主线程里还发生过改变,测试结果展示一切正常,由此可以知道TTL在使用线程池的情况下,也可以很好的完成传递,而且不会发生错乱。
原理分析
TransmittableThreadLocal的一个重要属性holder
/**
* 是一个ITL类型的对象,持有一个全局的WeakMap(weakMap的key是弱引用,同TL一样,也是为了解决内存泄漏的问题),里面存放了TTL对象
* 并且重写了initialValue和childValue方法,尤其是childValue,可以看到在即将异步时父线程的属性是直接作为初始化值赋值给子线程的本地变量对象(TTL)的
*/
private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
return new WeakHashMap();
}
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
return new WeakHashMap(parentValue);
}
};
再看一下get()和set()
Override
public final void set(T value) {
super.set(value);
if (null == value) removeValue();
else addValue();
}
@Override
public final T get() {
T value = super.get();
if (null != value) addValue();
return value;
}
private void removeValue() {
holder.get().remove(this); //从holder持有的map对象中移除
}
private void addValue() {
if (!holder.get().containsKey(this)) {
holder.get().put(this, null); //从holder持有的map对象中添加
}
}
上边例子中使用TransmittableThreadLocal需要通过TtlExecutors.getTtlExecutorService包装一下线程池才可以,那么,下面就来看看在程序即将通过线程池异步的时候,TTL帮我们做了哪些操作(这一部分是TTL支持线程池传递的核心部分):
// 此方法属于线程池包装类ExecutorTtlWrapper
@Override
public void execute(@Nonnull Runnable command) {
//这里会把Rannable包装一层,这是关键,有些逻辑处理,需要在run之前执行
executor.execute(TtlRunnable.get(command));
}
// 对应上面的get方法,返回一个TtlRunnable对象,属于TtLRannable包装类
@Nullable
public static TtlRunnable get(@Nullable Runnable runnable) {
return get(runnable, false, false);
}
// 对应上面的get方法
@Nullable
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;
// 若发现已经是目标类型了(说明已经被包装过了)直接返回
if (runnable instanceof TtlEnhanced) {
// avoid redundant decoration, and ensure idempotency
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun); //最终初始化
}
// 对应上面的TtlRunnable方法
private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
//这里将捕获后的父线程本地变量存储在当前对象的capturedRef里
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
// 对应上面的capture方法,用于捕获当前线程(父线程)里的本地变量,此方法属于TTL的静态内部类Transmitter
@Nonnull
public static Object capture() {
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) { // holder里目前存放的k-v里的key,就是需要传给子线程的TTL对象
captured.put(threadLocal, threadLocal.copyValue());
}
return captured; // 这里返回的这个对象,就是当前将要使用线程池异步出来的子线程,所继承的本地变量合集
}
// 对应上面的copyValue,简单的将TTL对象里的值返回(结合之前的源码可以知道get方法其实就是获取当前线程(父线程)里的值,调用super.get方法)
private T copyValue() {
return copy(get());
}
protected T copy(T parentValue) {
return parentValue;
}
结合上述代码,其实就是把当前父线程里的本地变量取出来,然后赋值给Rannable包装类里的capturedRef属性,接下来大概率会在run方法里,将这些捕获到的值赋给子线程的holder赋对应的TTL值,那么我们继续往下看Rannable包装类里的run方法是怎么实现的:
//run方法属于Rannable的包装类TtlRunnable
@Override
public void run() {
Object captured = capturedRef.get(); // 获取由之前捕获到的父线程变量集
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
/**
* 重点方法replay,此方法用来给当前子线程赋本地变量,返回的backup是此子线程原来就有的本地变量值(原生本地变量,下面会详细讲),
* backup用于恢复数据(如果任务执行完毕,意味着该子线程会归还线程池,那么需要将其原生本地变量属性恢复)
*/
Object backup = replay(captured);
try {
runnable.run(); // 执行异步逻辑
} finally {
// 结合上面对于replay的解释,不难理解,这个方法就是用来恢复原有值的
restore(backup);
}
}
TTL在异步任务执行前,会先进行赋值操作(就是拿着异步发生时捕获到的父线程的本地变量,赋给自己),当任务执行完,就恢复原生的自己本身的线程变量值。
下面来具体看这俩方法:
//下面的方法均属于TTL的静态内部类Transmittable
@Nonnull
public static Object replay(@Nonnull Object captured) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured; //使用此线程异步时捕获到的父线程里的本地变量值
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>(); //当前线程原生的本地变量,用于使用完线程后恢复用
//注意:这里循环的是当前子线程原生的本地变量集合,与本方法相反,restore方法里循环这个holder是指:该线程运行期间产生的变量+父线程继承来的变量
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
backup.put(threadLocal, threadLocal.get()); // 所有原生的本地变量都暂时存储在backup里,用于之后恢复用
/**
* 检查,如果捕获到的线程变量里,不包含当前原生变量值,则从当前原生变量里清除掉,对应的线程本地变量也清掉
* 这就是为什么会将原生变量保存在backup里的原因,为了恢复原生值使用
* 那么,为什么这里要清除掉呢?因为从使用这个子线程做异步那里,捕获到的本地变量并不包含原生的变量,当前线程
* 在做任务时的首要目标,是将父线程里的变量完全传递给任务,如果不清除这个子线程原生的本地变量,
* 意味着很可能会影响到任务里取值的准确性。
*
* 打个比方,有ttl对象tl,这个tl在线程池的某个子线程里存在对应的值2,当某个主线程使用该子线程做异步任务时
* tl这个对象在当前主线程里没有值,那么如果不进行下面这一步的操作,那么在使用该子线程做的任务里就可以通过
* 该tl对象取到值2,不符合预期
*/
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 这一步就是直接把父线程本地变量赋值给当前线程了(这一步起就刷新了holder里的值了,具体往下看该方法,在异步线程运行期间,还可能产生别的本地变量,比如在真正的run方法内的业务代码,再用一个tl对象设置一个值)
setTtlValuesTo(capturedMap);
// 这个方法属于扩展方法,ttl本身支持重写异步任务执行前后的操作,这里不再具体赘述
doExecuteCallback(true);
return backup;
}
// 结合之前Rannable包装类的run方法来看,这个方法就是使用上面replay记录下的原生线程变量做恢复用的
public static void restore(@Nonnull Object backup) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
// call afterExecute callback
doExecuteCallback(false);
// 注意,这里的holder取出来的,实际上是replay方法设置进去的关于父线程里的所有变量(结合上面来看,就是:该线程运行期间产生的变量+父线程继承来的变量)
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
/**
* 同样的,如果子线程原生变量不包含某个父线程传来的对象,那么就删除,可以思考下,这里的清除跟上面replay里的有什么不同?
* 这里会把不属于原生变量的对象给删除掉(这里被删除掉的可能是父线程继承下来的,也可能是异步任务在执行时产生的新值)
*/
if (!backupMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 同样调用这个方法,进行值的恢复
setTtlValuesTo(backupMap);
}
// 真正给当前子线程赋值的方法,对应上面的setTtlValuesTo方法
private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue()); //赋值,注意,从这里开始,子线程的holder里的值会被重新赋值刷新,可以参照上面ttl的set方法的实现
}
}
所谓线程池内线程的本地原生变量,其实是第一次使用线程时被传递进去的值,TTL是继承至ITL的,线程池第一次启用时是会触发Thread的init方法的,也就是说,在第一次异步时拿到的主线程的变量会被传递给子线程,作为子线程的原生本地变量保存起来,后续是replay操作和restore操作也是围绕着这个原生变量(即原生holder里的值)来进行设置、恢复的,设置的是当前父线程捕获到的本地变量,恢复的是子线程原生本地变量。