TransmittableThreadLocal线程间传递逻辑

上一篇文章我们知道了TTL利用了InheritableThreadLocal线程传递的特性进行扩展,也可以在使用线程池时线程复用的情况也可以正确的传递线程私有变量,现在我们就学习一下其设计

首先声明TTL重写了InheritableThreadLocal#childValue(T parentValue) 提供了一个以InheritableThreadLocal为基础的扩展。

InheritableThreadLocal 的线程传递只在当子线程为new的时候会调用,接下来分析代码

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    /**
     * Computes the child's initial value for this inheritable thread-local
     * variable as a function of the parent's value at the time the child
     * thread is created.  This method is called from within the parent
     * thread before the child is started.
     * <p>
     * This method merely returns its input argument, and should be overridden
     * if a different behavior is desired.
     *
     * @param parentValue the parent thread's value
     * @return the child thread's initial value
     */
// 这是ThreadLocal的执行逻辑,相当于一个模板方法,由子类实现,ThreadLocal不支持传递给子线程
    protected T childValue(T parentValue) {
        return parentValue;
    }
    /**
     * Get the map associated with a ThreadLocal.
     *
     * @param t the current thread
     */
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

    /**
     * Create the map associated with a ThreadLocal.
     *
     * @param t the current thread
     * @param firstValue value for the initial entry of the table.
     */
// 顾名思义,只有在线程new出来的时刻会调用当前方法,然后调用childValue
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

然后看看 TTL的重写逻辑

    // Note about holder:
    // 1. The value of holder is type Map<TransmittableThreadLocal<?>, ?> (WeakHashMap implementation),
    //    but it is used as *set*.
    // 2. WeakHashMap support null value.
// 这是TTL的核心设计,组装为一个 以TTL对象为key的map返回,同同时这个map对象还是TTL对象的一个内部静态对象,一直跟随客户端使用的TTL对象。
    private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
            new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
// 只有子线程 为new时调用
                @Override
                protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
                }
// 只有子线程 为new时调用,虽然做了拓展,通过一个跟随客户端使用的TTL对象内部构造了这个holder中转站,但是还是使用的引用传递,如果主子线程一边直接修改了引用的对象,另一边也会感知到。并且存在并发修改问题。因为是增强InheritableThreadLocal,并没有修改这里的引用传递逻辑。实际其它扩展有传递为不可变对象的逻辑
                @Override
                protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
                }
            };
TTL也是使用的引用逻辑实际也有一些拓展是不可变对象的逻辑,例如
childValue在我的电脑里可以看到的子类

我们看一下CopyOnWriteSortedArrayThreadContextMap中的代码

    private ThreadLocal<StringMap> createThreadLocalMap() {
        return (ThreadLocal)(inheritableMap ? new InheritableThreadLocal<StringMap>() {
            protected StringMap childValue(StringMap parentValue) {
                if (parentValue == null) {
                    return null;
                } else {
// 主要看看这个接口
                    StringMap stringMap = CopyOnWriteSortedArrayThreadContextMap.this.createStringMap(parentValue);
                    stringMap.freeze();
                    return stringMap;
                }
            }
        } : new ThreadLocal());
    }
// 看名字就知道了,是一个不可变对象,也就是不同于InheritableThreadLocal和TTL传递的对象引用,这里做了复制后变为不可变对象的逻辑,日后小伙伴们也可以借助TTL实现自己不可变对象的逻辑
public interface StringMap extends ReadOnlyStringMap {
    void clear();
    boolean equals(Object var1);
    void freeze();
    int hashCode();
    boolean isFrozen();
    void putAll(ReadOnlyStringMap var1);
    void putValue(String var1, Object var2);
    void remove(String var1);
}
接下来看装饰器

装饰器的引入,实际是对ExecutorService的执行Runnable,Callable等真正执行逻辑的拦截,做前,后的逻辑,而装饰器在不改变原有对象的逻辑包裹一层后,可以做到增强的目的,其实这个装饰器本身也是 Runnable,Callable的一个代理。
看看使用的接入

    @Override
    public Executor getAsyncExecutor() {
// 这里原本是设置一个 @Async的默认线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8, 30, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(512), FACTORY);
        executor.setRejectedExecutionHandler(new CustomRejectedHandler());
// 最后我们装入了TTL的装饰器返回
        return TtlExecutors.getTtlExecutorService(executor);
    }

ExecutorServiceTtlWrapper作为ExecutorService的装饰器目的就是为了再进行真正执行的目标接口再封装一层装饰器。


装饰器们

如上图各种目标接口的装饰器,我们就看看 TtlCallable这个装饰器作为@Async线程池执行单元的增强

public final class TtlCallable<V> implements Callable<V>, TtlEnhanced {
//用于threadLocal中转的对象,通过Transmitter#capture()在装饰器初始化时就创建好,实际就是获取当前主线程的threadLocal
    private final AtomicReference<Object> capturedRef;
// 被装饰的目标对象接口
    private final Callable<V> callable;
// 是否释放TTL对象传递过来的业务对象引用,从代码看这里只决定了当前TtlCallable对象的引用是否释放,TtlCallable对象本身有一定生命周期,再者如果复用主线程传递过来的TTL对象引用也一直存在于主线程,目前都是false,子线程引用也会一直随着主线程传递而更新
    private final boolean releaseTtlValueReferenceAfterCall;

    private TtlCallable(@Nonnull Callable<V> callable, boolean releaseTtlValueReferenceAfterCall) {
// 在new这个对象时是在主线程,所以capture()方法拿到的是主线程的TTL对象最新的引用,包括业务对象也是最新的
        this.capturedRef = new AtomicReference<Object>(capture());
        this.callable = callable;
        this.releaseTtlValueReferenceAfterCall = releaseTtlValueReferenceAfterCall;
    }

    /**
     * wrap method {@link Callable#call()}.
     */
    @Override
    public V call() throws Exception {
// 获取主线程的 TTL对象map,就是通过Transmitter#capture()方法从 TTL对象中上面所说的TTL对象中的内部holder中转map获取到主线程的所有TTL及业务对象引用
        Object captured = capturedRef.get();
// 如果为空 或者 需要清理TTL对象引用,则进行一次原子操作对TTL对象引用置为空
        if (captured == null || releaseTtlValueReferenceAfterCall && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after call!");
        }
  // 重放 captured为当前装饰器初始化时从主线程拿到的,这里对其进行重放替换
// 并返回当前子线程的 TTL对象作为还原
        Object backup = replay(captured);
        try {
//被增项的目标方法执行
            return callable.call();
        } finally {
// 再将当前子线程还原
            restore(backup);
        }
    }
// ----------------省略大部分代码--------------
}

Transmitter#capture()方法

        @Nonnull
        public static Object capture() {
            Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
// 复制的核心 是从 holder中转对象中获取每个key的threadLocal中的业务对象引用
// 然后再用其TTL对象作为key 组装一个 TTL对象 -> 业务对象的map返回
            for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
                captured.put(threadLocal, threadLocal.copyValue());
            }
            return captured;
        }
//-----------copyValue 方法----------
   private T copyValue() {
// 复制就是从当前主线程 的threadLocal get
        return copy(get());
    }
//------------copy 方法------
  protected T copy(T parentValue) {
// 复制的是对象的引用
        return parentValue;
    }

下面看Transmitter#replay(@Nonnull Object captured) 重放逻辑

    @Nonnull
        public static Object replay(@Nonnull Object captured) {
            @SuppressWarnings("unchecked")
// 主线程传递过来的引用
            Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
// 当前子线程的TTL引用用于返回后 还原
            Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();
                // backup
                backup.put(threadLocal, threadLocal.get());
                // clear the TTL values that is not in captured
                // avoid the extra TTL values after replay when run task
//清除掉可能失效和旧的子线程的TTL对象引用,为什么这么做,目前不太清楚
                if (!capturedMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }
            // set values to captured TTL
// 我们上一篇文章以及当前文章上面提到,在thread new的时候调用initialValue和childValue 方法时,会将主线程的TTL对象引用传递给子线程,但是不同装饰器增强时,子线程里的TTL对象中的业务对象引用是一直不变的,一直是第一次传递过来的业务对象的值,而主线程的业务对象变更子线程感知不到,但是TTL对象也一直是一个引用这里将其旧的TTL引用
// 放入主线程新得 TTL中的业务对象引用,实际因为子线程的TTL对象引用和主线程的TTL对象是一样的,只不过主线程更新了业务对象引用子线程感知不到,因为java内存模型的原因,所以这里直接重新操作一次 子线程的TTL对象更新 *业务对象引用* 重复了一次主线程的操作
            setTtlValuesTo(capturedMap);
            // call beforeExecute callback
// 这里其实是一个模板方法,包括目标对象执行前也就是重放,及目标对象执行后,还原的实际的一个钩子
            doExecuteCallback(true);
            return backup;
        }

我们来看看 setTtlValuesTo(capturedMap); 实际就是重复了主线程的操作,使用相同的TTL对象引用对业务对象引用进行更新

     private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
            for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
                @SuppressWarnings("unchecked")
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
                threadLocal.set(entry.getValue());
            }
        }

看看钩子方法,可以用于我们扩展TTL对象进行钩子回调

    private static void doExecuteCallback(boolean isBefore) {
        for (Map.Entry<TransmittableThreadLocal<?>, ?> entry : holder.get().entrySet()) {
            TransmittableThreadLocal<?> threadLocal = entry.getKey();

            try {
// 两个模板方法钩子
                if (isBefore) threadLocal.beforeExecute();
                else threadLocal.afterExecute();
            } catch (Throwable t) {
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, "TTL exception when " + (isBefore ? "beforeExecute" : "afterExecute") + ", cause: " + t.toString(), t);
                }
            }
        }
    }

doExecuteCallback的模板方法钩子

目前TTL对象中是空实现。如果继承扩展TTL对象可能用到噢
然后是 还原方法Transmitter#restore(@Nonnull Object backup)逻辑和上面的replay方法基本相同不过逻辑是反过来的小伙伴可以自行看代码

总结
  1. 利用装饰器对ExectorService包装后一步步的继续利用装饰器一直装饰到要执行的目标对象接口例如Runnable,Callable等对初始化,执行前,执行后三个时机进行增强
  2. 重写了InheritableThreadLocal#childValue 方法来传递 TTL定义的一个中转map对象 key为 TTL对象
  3. 利用了主子线程传递 TTL对象的引用一致,同时用以TTL对象为key的map进行重放,直接对主线程传递过来的TTL对象业务对象引用进行更新,因为子对象的引用相同相当于对子线程的TTL的业务对象引用更新。感觉用其它集合也可以,但是看代码map可以在重放的同时更方便的清理子线程的多余的TTL对象,保证主子线程的TTL对应一致性。
  4. 提供了 一些模板方法提高了扩展性 例如beforeExecute ,afterExecute
  5. 提供了屏蔽ForkJoin工作线程屏蔽InheritableThreadLocal的传递,帮助开发期间及时发现threadLocal的问题
其它问题,java8提供的parallelStream 并行流和CompletableFuture 都是使用ForkJoin框架实现,使用TTL还是会有问题

在TTL源码没有看到关于forkJoin的增强,但是发现了TtlForkJoinPoolHelper类,提供了DisableInheritableForkJoinWorkerThreadFactory 的支持,为了屏蔽掉InheritableThreadLocal的传递防止开发测试时theadLocal错误传递的假象。

// ForkJoinWorkerThreadFactory 的装饰器
public interface DisableInheritableForkJoinWorkerThreadFactory extends ForkJoinWorkerThreadFactory {
    /**
     * Unwrap {@link DisableInheritableThreadFactory} to the original/underneath one.
     */
    @Nonnull
    ForkJoinWorkerThreadFactory unwrap();
}

ForkJoin的逻辑大家自行查询资料,因为存在工作窃取等逻辑理论上是无法避免的ThreadLocal错乱问题。所以TTL提供了屏蔽装饰器,但是forkJoin的工作线程也可能是主线程,所以使用TTL的屏蔽逻辑只能屏蔽掉ForkJoin的工作线程,无法避免ForkJoin直接使用主线程执行任务单元时还是有正确的threadLocal对象引用。但是这样也足够开发测试期间及时发现threadLocal的问题了。
经过我网上搜索我们可以替换掉ForkJoin默认的ForkJoinWorkerThreadFactory,增强线程创建逻辑。

  private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
// ForkJoin会有一个扩展逻辑,这里如果获取到指定的线程工厂类则不会使用默认的。但是当前makeCommonPool 方法在 static {} 代码块中执行,经过测试直接System.setProperty无法掌控好加载顺序,可能获取不到自定义的系统变量,索性直接通过jvm启动参数指定
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
// 如果有自定义的线程工厂会初始化
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = defaultForkJoinWorkerThreadFactory;
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }

但是看到TTL包内的DisableInheritableForkJoinWorkerThreadFactoryWrapper 线程工厂装饰器并没有构造方法,并且不是public不能继承,也就是直接指定这个类不能被正常加载后newInstance(),又不能继承,可能只是一个示例?那么我自定义一个类复制它的逻辑

class DisableInheritableForkJoinWorkerThreadFactoryWrapper implements DisableInheritableForkJoinWorkerThreadFactory {
    final ForkJoinWorkerThreadFactory threadFactory;
    public DisableInheritableForkJoinWorkerThreadFactoryWrapper(@Nonnull ForkJoinWorkerThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }
    @Override
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
// 看到这里在new thread时进行了 TTL对象的清理
//这个执行时机其实还是在主线程中,如果正常不执行这个代码子线程会拿到一个旧的主线程的TTL对象引用,但是这里清除了,就不会拿到了,方便开发测试阶段发现问题
        final Object backup = TransmittableThreadLocal.Transmitter.clear();
        try {
            return threadFactory.newThread(pool);
        } finally {
// 执行完后进行还原
            TransmittableThreadLocal.Transmitter.restore(backup);
        }
    }
    @Nonnull
    @Override
    public ForkJoinWorkerThreadFactory unwrap() {
        return threadFactory;
    }
}

我们自定义仿照上述类,直接复制的,区别是提供了构造方法,可以让ForkJoinPool#makeCommonPool方法可以加载扩展工厂,并且直接指定被增强的默认ForkJoinWorkerThreadFactory

public  class CustomForkJoinThreadFactory implements DisableInheritableForkJoinWorkerThreadFactory {
// 被增强的默认的线程工厂
    final ForkJoinWorkerThreadFactory threadFactory = ForkJoinPool.defaultForkJoinWorkerThreadFactory;
// 有无参构造才可以 加载成功噢
    public CustomForkJoinThreadFactory() {
    }
    @Override
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
        final Object backup = TransmittableThreadLocal.Transmitter.clear();
        try {
            return threadFactory.newThread(pool);
        } finally {
            TransmittableThreadLocal.Transmitter.restore(backup);
        }
    }

    @Nonnull
    @Override
    public ForkJoinWorkerThreadFactory unwrap() {
        return threadFactory;
    }
}

jvm启动参数(如果有办法在ForkJoinPool的static加载前System.setProperty也可以)

-Djava.util.concurrent.ForkJoinPool.common.threadFactory=xxx.xxx.xxx.CustomForkJoinThreadFactory

其实TTL是支持 forkJoin的线程间传递的,由于我没有看官方文档,也没有仔细研究一下源码中agent目录,看来是大意了,感谢TTL作者,阅读了一下文档再回来试了试果然可以使用。下一篇文章去研究一下作者如何通过java agent技术实现无感的装饰器,以及如何实现当前文章提到的 捕捉,重放,恢复动作。

TransmittableThreadLocal通过javaAgent技术实现线程传递(并且支持ForkJoin) - 简书 (jianshu.com)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容