上一篇文章我们知道了TTL利用了InheritableThreadLocal线程传递的特性进行扩展,也可以在使用线程池时线程复用的情况也可以正确的传递线程私有变量,现在我们就学习一下其设计
首先声明TTL重写了InheritableThreadLocal#childValue(T parentValue) 提供了一个以InheritableThreadLocal为基础的扩展。
InheritableThreadLocal 的线程传递只在当子线程为new的时候会调用,接下来分析代码
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
/**
* Computes the child's initial value for this inheritable thread-local
* variable as a function of the parent's value at the time the child
* thread is created. This method is called from within the parent
* thread before the child is started.
* <p>
* This method merely returns its input argument, and should be overridden
* if a different behavior is desired.
*
* @param parentValue the parent thread's value
* @return the child thread's initial value
*/
// 这是ThreadLocal的执行逻辑,相当于一个模板方法,由子类实现,ThreadLocal不支持传递给子线程
protected T childValue(T parentValue) {
return parentValue;
}
/**
* Get the map associated with a ThreadLocal.
*
* @param t the current thread
*/
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}
/**
* Create the map associated with a ThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the table.
*/
// 顾名思义,只有在线程new出来的时刻会调用当前方法,然后调用childValue
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}
然后看看 TTL的重写逻辑
// Note about holder:
// 1. The value of holder is type Map<TransmittableThreadLocal<?>, ?> (WeakHashMap implementation),
// but it is used as *set*.
// 2. WeakHashMap support null value.
// 这是TTL的核心设计,组装为一个 以TTL对象为key的map返回,同同时这个map对象还是TTL对象的一个内部静态对象,一直跟随客户端使用的TTL对象。
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
// 只有子线程 为new时调用
@Override
protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
}
// 只有子线程 为new时调用,虽然做了拓展,通过一个跟随客户端使用的TTL对象内部构造了这个holder中转站,但是还是使用的引用传递,如果主子线程一边直接修改了引用的对象,另一边也会感知到。并且存在并发修改问题。因为是增强InheritableThreadLocal,并没有修改这里的引用传递逻辑。实际其它扩展有传递为不可变对象的逻辑
@Override
protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
}
};
TTL也是使用的引用逻辑实际也有一些拓展是不可变对象的逻辑,例如
我们看一下CopyOnWriteSortedArrayThreadContextMap中的代码
private ThreadLocal<StringMap> createThreadLocalMap() {
return (ThreadLocal)(inheritableMap ? new InheritableThreadLocal<StringMap>() {
protected StringMap childValue(StringMap parentValue) {
if (parentValue == null) {
return null;
} else {
// 主要看看这个接口
StringMap stringMap = CopyOnWriteSortedArrayThreadContextMap.this.createStringMap(parentValue);
stringMap.freeze();
return stringMap;
}
}
} : new ThreadLocal());
}
// 看名字就知道了,是一个不可变对象,也就是不同于InheritableThreadLocal和TTL传递的对象引用,这里做了复制后变为不可变对象的逻辑,日后小伙伴们也可以借助TTL实现自己不可变对象的逻辑
public interface StringMap extends ReadOnlyStringMap {
void clear();
boolean equals(Object var1);
void freeze();
int hashCode();
boolean isFrozen();
void putAll(ReadOnlyStringMap var1);
void putValue(String var1, Object var2);
void remove(String var1);
}
接下来看装饰器
装饰器的引入,实际是对ExecutorService的执行Runnable,Callable等真正执行逻辑的拦截,做前,后的逻辑,而装饰器在不改变原有对象的逻辑包裹一层后,可以做到增强的目的,其实这个装饰器本身也是 Runnable,Callable的一个代理。
看看使用的接入
@Override
public Executor getAsyncExecutor() {
// 这里原本是设置一个 @Async的默认线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(512), FACTORY);
executor.setRejectedExecutionHandler(new CustomRejectedHandler());
// 最后我们装入了TTL的装饰器返回
return TtlExecutors.getTtlExecutorService(executor);
}
ExecutorServiceTtlWrapper作为ExecutorService的装饰器目的就是为了再进行真正执行的目标接口再封装一层装饰器。
如上图各种目标接口的装饰器,我们就看看 TtlCallable这个装饰器作为@Async线程池执行单元的增强
public final class TtlCallable<V> implements Callable<V>, TtlEnhanced {
//用于threadLocal中转的对象,通过Transmitter#capture()在装饰器初始化时就创建好,实际就是获取当前主线程的threadLocal
private final AtomicReference<Object> capturedRef;
// 被装饰的目标对象接口
private final Callable<V> callable;
// 是否释放TTL对象传递过来的业务对象引用,从代码看这里只决定了当前TtlCallable对象的引用是否释放,TtlCallable对象本身有一定生命周期,再者如果复用主线程传递过来的TTL对象引用也一直存在于主线程,目前都是false,子线程引用也会一直随着主线程传递而更新
private final boolean releaseTtlValueReferenceAfterCall;
private TtlCallable(@Nonnull Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) {
// 在new这个对象时是在主线程,所以capture()方法拿到的是主线程的TTL对象最新的引用,包括业务对象也是最新的
this.capturedRef = new AtomicReference<Object>(capture());
this.callable = callable;
this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall;
}
/**
* wrap method {@link Callable#call()}.
*/
@Override
public V call() throws Exception {
// 获取主线程的 TTL对象map,就是通过Transmitter#capture()方法从 TTL对象中上面所说的TTL对象中的内部holder中转map获取到主线程的所有TTL及业务对象引用
Object captured = capturedRef.get();
// 如果为空 或者 需要清理TTL对象引用,则进行一次原子操作对TTL对象引用置为空
if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after call!");
}
// 重放 captured为当前装饰器初始化时从主线程拿到的,这里对其进行重放替换
// 并返回当前子线程的 TTL对象作为还原
Object backup = replay(captured);
try {
//被增项的目标方法执行
return callable.call();
} finally {
// 再将当前子线程还原
restore(backup);
}
}
// ----------------省略大部分代码--------------
}
Transmitter#capture()方法
@Nonnull
public static Object capture() {
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
// 复制的核心 是从 holder中转对象中获取每个key的threadLocal中的业务对象引用
// 然后再用其TTL对象作为key 组装一个 TTL对象 -> 业务对象的map返回
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
captured.put(threadLocal, threadLocal.copyValue());
}
return captured;
}
//-----------copyValue 方法----------
private T copyValue() {
// 复制就是从当前主线程 的threadLocal get
return copy(get());
}
//------------copy 方法------
protected T copy(T parentValue) {
// 复制的是对象的引用
return parentValue;
}
下面看Transmitter#replay(@Nonnull Object captured) 重放逻辑
@Nonnull
public static Object replay(@Nonnull Object captured) {
@SuppressWarnings("unchecked")
// 主线程传递过来的引用
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
// 当前子线程的TTL引用用于返回后 还原
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// backup
backup.put(threadLocal, threadLocal.get());
// clear the TTL values that is not in captured
// avoid the extra TTL values after replay when run task
//清除掉可能失效和旧的子线程的TTL对象引用,为什么这么做,目前不太清楚
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// set values to captured TTL
// 我们上一篇文章以及当前文章上面提到,在thread new的时候调用initialValue和childValue 方法时,会将主线程的TTL对象引用传递给子线程,但是不同装饰器增强时,子线程里的TTL对象中的业务对象引用是一直不变的,一直是第一次传递过来的业务对象的值,而主线程的业务对象变更子线程感知不到,但是TTL对象也一直是一个引用这里将其旧的TTL引用
// 放入主线程新得 TTL中的业务对象引用,实际因为子线程的TTL对象引用和主线程的TTL对象是一样的,只不过主线程更新了业务对象引用子线程感知不到,因为java内存模型的原因,所以这里直接重新操作一次 子线程的TTL对象更新 *业务对象引用* 重复了一次主线程的操作
setTtlValuesTo(capturedMap);
// call beforeExecute callback
// 这里其实是一个模板方法,包括目标对象执行前也就是重放,及目标对象执行后,还原的实际的一个钩子
doExecuteCallback(true);
return backup;
}
我们来看看 setTtlValuesTo(capturedMap); 实际就是重复了主线程的操作,使用相同的TTL对象引用对业务对象引用进行更新
private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue());
}
}
看看钩子方法,可以用于我们扩展TTL对象进行钩子回调
private static void doExecuteCallback(boolean isBefore) {
for (Map.Entry<TransmittableThreadLocal<?>, ?> entry : holder.get().entrySet()) {
TransmittableThreadLocal<?> threadLocal = entry.getKey();
try {
// 两个模板方法钩子
if (isBefore) threadLocal.beforeExecute();
else threadLocal.afterExecute();
} catch (Throwable t) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t);
}
}
}
}
目前TTL对象中是空实现。如果继承扩展TTL对象可能用到噢
然后是 还原方法Transmitter#restore(@Nonnull Object backup)逻辑和上面的replay方法基本相同不过逻辑是反过来的小伙伴可以自行看代码
总结
- 利用装饰器对ExectorService包装后一步步的继续利用装饰器一直装饰到要执行的目标对象接口例如Runnable,Callable等对初始化,执行前,执行后三个时机进行增强
- 重写了InheritableThreadLocal#childValue 方法来传递 TTL定义的一个中转map对象 key为 TTL对象
- 利用了主子线程传递 TTL对象的引用一致,同时用以TTL对象为key的map进行重放,直接对主线程传递过来的TTL对象业务对象引用进行更新,因为子对象的引用相同相当于对子线程的TTL的业务对象引用更新。感觉用其它集合也可以,但是看代码map可以在重放的同时更方便的清理子线程的多余的TTL对象,保证主子线程的TTL对应一致性。
- 提供了 一些模板方法提高了扩展性 例如beforeExecute ,afterExecute
- 提供了屏蔽ForkJoin工作线程屏蔽InheritableThreadLocal的传递,帮助开发期间及时发现threadLocal的问题
其它问题,java8提供的parallelStream 并行流和CompletableFuture 都是使用ForkJoin框架实现,使用TTL还是会有问题
在TTL源码没有看到关于forkJoin的增强,但是发现了TtlForkJoinPoolHelper类,提供了DisableInheritableForkJoinWorkerThreadFactory 的支持,为了屏蔽掉InheritableThreadLocal的传递防止开发测试时theadLocal错误传递的假象。
// ForkJoinWorkerThreadFactory 的装饰器
public interface DisableInheritableForkJoinWorkerThreadFactory extends ForkJoinWorkerThreadFactory {
/**
* Unwrap {@link DisableInheritableThreadFactory} to the original/underneath one.
*/
@Nonnull
ForkJoinWorkerThreadFactory unwrap();
}
ForkJoin的逻辑大家自行查询资料,因为存在工作窃取等逻辑理论上是无法避免的ThreadLocal错乱问题。所以TTL提供了屏蔽装饰器,但是forkJoin的工作线程也可能是主线程,所以使用TTL的屏蔽逻辑只能屏蔽掉ForkJoin的工作线程,无法避免ForkJoin直接使用主线程执行任务单元时还是有正确的threadLocal对象引用。但是这样也足够开发测试期间及时发现threadLocal的问题了。
经过我网上搜索我们可以替换掉ForkJoin默认的ForkJoinWorkerThreadFactory,增强线程创建逻辑。
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
// ForkJoin会有一个扩展逻辑,这里如果获取到指定的线程工厂类则不会使用默认的。但是当前makeCommonPool 方法在 static {} 代码块中执行,经过测试直接System.setProperty无法掌控好加载顺序,可能获取不到自定义的系统变量,索性直接通过jvm启动参数指定
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
// 如果有自定义的线程工厂会初始化
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
} catch (Exception ignore) {
}
if (factory == null) {
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
但是看到TTL包内的DisableInheritableForkJoinWorkerThreadFactoryWrapper 线程工厂装饰器并没有构造方法,并且不是public不能继承,也就是直接指定这个类不能被正常加载后newInstance(),又不能继承,可能只是一个示例?那么我自定义一个类复制它的逻辑
class DisableInheritableForkJoinWorkerThreadFactoryWrapper implements DisableInheritableForkJoinWorkerThreadFactory {
final ForkJoinWorkerThreadFactory threadFactory;
public DisableInheritableForkJoinWorkerThreadFactoryWrapper(@Nonnull ForkJoinWorkerThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
// 看到这里在new thread时进行了 TTL对象的清理
//这个执行时机其实还是在主线程中,如果正常不执行这个代码子线程会拿到一个旧的主线程的TTL对象引用,但是这里清除了,就不会拿到了,方便开发测试阶段发现问题
final Object backup = TransmittableThreadLocal.Transmitter.clear();
try {
return threadFactory.newThread(pool);
} finally {
// 执行完后进行还原
TransmittableThreadLocal.Transmitter.restore(backup);
}
}
@Nonnull
@Override
public ForkJoinWorkerThreadFactory unwrap() {
return threadFactory;
}
}
我们自定义仿照上述类,直接复制的,区别是提供了构造方法,可以让ForkJoinPool#makeCommonPool方法可以加载扩展工厂,并且直接指定被增强的默认ForkJoinWorkerThreadFactory
public class CustomForkJoinThreadFactory implements DisableInheritableForkJoinWorkerThreadFactory {
// 被增强的默认的线程工厂
final ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;
// 有无参构造才可以 加载成功噢
public CustomForkJoinThreadFactory() {
}
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
final Object backup = TransmittableThreadLocal.Transmitter.clear();
try {
return threadFactory.newThread(pool);
} finally {
TransmittableThreadLocal.Transmitter.restore(backup);
}
}
@Nonnull
@Override
public ForkJoinWorkerThreadFactory unwrap() {
return threadFactory;
}
}
jvm启动参数(如果有办法在ForkJoinPool的static加载前System.setProperty也可以)
-Djava.util.concurrent.ForkJoinPool.common.threadFactory=xxx.xxx.xxx.CustomForkJoinThreadFactory