
1 概述


阅读过Java ThreadLocal源码的都知道其实现原理,Java的每个Thread实例都有ThreadLocal.ThreadLocalMap类型的字段threadLocals,我们使用的ThreadLocal对象实例则作为key,实际的值为value,ThreadLocalMap使用数组保存key-value,具体的数据结构是hash列表并使用线性探测再散列解决hash冲突。还有一点要注意的是上述key-value组成的EntryWeakReference子类,所以在对ThreadLocal进行setget时会删除被GC回收的无效Entry,在使用不当时可能会造成内存泄漏,另外使用hash列表实现的底层数据结构也具有较高的时间复杂度。



2 FTL相关数据结构和类结构

Netty为了采用FTL,在Java Thread的基础上实现了自己的FastThreadLocalThread。为了理解FTL,我们需要关注FastThreadLocalThread的两个字段cleanupFastThreadLocalsthreadLocalMap,其中cleanupFastThreadLocals和FTL是否能主动清理有关,我们后面会介绍,threadLocalMap则类似JavaThread类的threadLocals,用于保存该FastThreadLocalThread持有的所有FTL数据。

2.1 InternalThreadLocalMap


private InternalThreadLocalMap() {
private static Object[] newIndexedVariableTable() {
    Object[] array = new Object[32];
    Arrays.fill(array, UNSET);
    return array;

InternalThreadLocalMap源码可以发现,为了避免伪共享(false sharing)问题,其使用了缓存行填充技术,在类定义中声明了如下long字段进行填充,具体可以参考Disruptor相关文献,基本上所有介绍Disruptor的文章都会提到伪共享问题,在Java8中则可以使用@sun.misc.Contended注解避免伪共享问题。

// Cache line padding (must be public)
// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;



static final AtomicInteger nextIndex = new AtomicInteger();

public static int nextVariableIndex() {
    int index = nextIndex.getAndIncrement();
    if (index < 0) {
        throw new IllegalStateException("too many thread-local indexed variables");
    return index;


private void expandIndexedVariableTableAndSet(int index, Object value) {
    Object[] oldArray = indexedVariables;
    final int oldCapacity = oldArray.length;
    int newCapacity = index;
    newCapacity |= newCapacity >>>  1;
    newCapacity |= newCapacity >>>  2;
    newCapacity |= newCapacity >>>  4;
    newCapacity |= newCapacity >>>  8;
    newCapacity |= newCapacity >>> 16;
    newCapacity ++;

    Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
    Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
    newArray[index] = value;
    indexedVariables = newArray;

2.2 FTL类结构


* Returns the initial value for this thread-local variable.
protected V initialValue() throws Exception {
    return null;

* Invoked when this thread local variable is removed by {@link #remove()}.
protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }



private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();

private final int index;

private final int cleanerFlagIndex;

public FastThreadLocal() {
    index = InternalThreadLocalMap.nextVariableIndex();
    cleanerFlagIndex = InternalThreadLocalMap.nextVariableIndex();


* Returns the current value for the current thread
public final V get() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    //获取数组在该FTL index下标处的元素
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;

    V value = initialize(threadLocalMap);
    return value;

 private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
        v = initialValue();
    } catch (Exception e) {
    threadLocalMap.setIndexedVariable(index, v);
    addToVariablesToRemove(threadLocalMap, this);
    return v;


public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();

private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
    InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
    if (threadLocalMap == null) {
        thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
    return threadLocalMap;

private static InternalThreadLocalMap slowGet() {
    ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
    InternalThreadLocalMap ret = slowThreadLocalMap.get();
    if (ret == null) {
        ret = new InternalThreadLocalMap();
    return ret;

static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();

3 FTL数据设置或获取

3.1 数据放置


* Set the value for the current thread.
public final void set(V value) {
    if (value != InternalThreadLocalMap.UNSET) {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        if (setKnownNotUnset(threadLocalMap, value)) {
    } else {

* @return see {@link InternalThreadLocalMap#setIndexedVariable(int, Object)}.
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
    if (threadLocalMap.setIndexedVariable(index, value)) {
        addToVariablesToRemove(threadLocalMap, this);
        return true;
    return false;

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    Set<FastThreadLocal<?>> variablesToRemove;
    if (v == InternalThreadLocalMap.UNSET || v == null) {
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
    } else {
        variablesToRemove = (Set<FastThreadLocal<?>>) v;

3.2 数据获取


4 清理

4.1 主动清理


public FastThreadLocalThread() {
    cleanupFastThreadLocals = false;

public FastThreadLocalThread(Runnable target) {
    cleanupFastThreadLocals = true;

public FastThreadLocalThread(ThreadGroup group, Runnable target) {
    super(group, FastThreadLocalRunnable.wrap(target));
    cleanupFastThreadLocals = true;

public FastThreadLocalThread(String name) {
    cleanupFastThreadLocals = false;

public FastThreadLocalThread(ThreadGroup group, String name) {
    super(group, name);
    cleanupFastThreadLocals = false;

public FastThreadLocalThread(Runnable target, String name) {
    super(FastThreadLocalRunnable.wrap(target), name);
    cleanupFastThreadLocals = true;

public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) {
    super(group, FastThreadLocalRunnable.wrap(target), name);
    cleanupFastThreadLocals = true;

public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) {
    super(group, FastThreadLocalRunnable.wrap(target), name, stackSize);
    cleanupFastThreadLocals = true;

final class FastThreadLocalRunnable implements Runnable {
    private final Runnable runnable;

    private FastThreadLocalRunnable(Runnable runnable) {
        this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");

    public void run() {
        try {
        } finally {

    static Runnable wrap(Runnable runnable) {
        return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);


* Removes all {@link FastThreadLocal} variables bound to the current thread.  This operation is useful when you
* are in a container environment, and you don't want to leave the thread local variables in the threads you do not
* manage.
public static void removeAll() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
    if (threadLocalMap == null) {

    try {
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
        if (v != null && v != InternalThreadLocalMap.UNSET) {
            Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
            FastThreadLocal<?>[] variablesToRemoveArray =
                    variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
            for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
    } finally {

* Sets the value to uninitialized for the specified thread local map;
* a proceeding call to get() will trigger a call to initialValue().
* The specified thread local map must be for the current thread.
public final void remove(InternalThreadLocalMap threadLocalMap) {
    if (threadLocalMap == null) {
    Object v = threadLocalMap.removeIndexedVariable(index);
    removeFromVariablesToRemove(threadLocalMap, this);

    if (v != InternalThreadLocalMap.UNSET) {
        try {
            onRemoval((V) v);
        } catch (Exception e) {


4.2 后台线程清理

没能被FastThreadLocalRunnable.wrap方法修饰的Runnable任务,无法通过finally语句块做到主动清理,因此FastThreadLocalThread.cleanupFastThreadLocals = false,上面在介绍FastThreadLocalsetget方法时,发现都调用了registerCleaner方法,这个方法就是将清理任务注册到专门负责清理工作的后台线程中,下面看其源码:

private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
    Thread current = Thread.currentThread();
    if (FastThreadLocalThread.willCleanupFastThreadLocals(current) ||
        threadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) {
    // removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime
    // of the thread, and this Object will be discarded if the associated thread is GCed.
    threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE);

    // We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
    // and FastThreadLocal.onRemoval(...) will be called.
    ObjectCleaner.register(current, new Runnable() {
        public void run() {

            // It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
            // the Thread is collected by GC. In this case the ThreadLocal will be gone away already.


* Register the given {@link Object} for which the {@link Runnable} will be executed once there are no references
* to the object anymore.
* This should only be used if there are no other ways to execute some cleanup once the Object is not reachable
* anymore because it is not a cheap way to handle the cleanup.
public static void register(Object object, Runnable cleanupTask) {
    AutomaticCleanerReference reference = new AutomaticCleanerReference(object,
            ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));
    // Its important to add the reference to the LIVE_SET before we access CLEANER_RUNNING to ensure correct
    // behavior in multi-threaded environments.

    // Check if there is already a cleaner running.
    if (CLEANER_RUNNING.compareAndSet(false, true)) {
        //该线程实际工作由Runnable CLEANER_TASK定义
        final Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);
        // Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited
        // classloader.
        // See:
        // -
        // -
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            public Void run() {
                return null;

        // Mark this as a daemon thread to ensure that we the JVM can exit if this is the only thread that is
        // running.


private static final Runnable CLEANER_TASK = new Runnable() {
    public void run() {
        boolean interrupted = false;
        for (;;) {
            // Keep on processing as long as the LIVE_SET is not empty and once it becomes empty
            // See if we can let this thread complete.
            while (!LIVE_SET.isEmpty()) {
                final AutomaticCleanerReference reference;
                try {
                    reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS);
                } catch (InterruptedException ex) {
                    // Just consume and move on
                    interrupted = true;
                if (reference != null) {
                    try {
                    } catch (Throwable ignored) {
                        // ignore exceptions, and don't log in case the logger throws an exception, blocks, or has
                        // other unexpected side effects.

            // Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct
            // behavior in multi-threaded environments.
            //下面compareAndSet(false, true)失败,
            if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
                // There was nothing added after we set STARTED to false or some other cleanup Thread
                // was started already so its safe to let this Thread complete now.
        if (interrupted) {
            // As we caught the InterruptedException above we should mark the Thread as interrupted.

5 阅读源码发现的一个问题


public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = indexedVariables;
    //index为34, lookup.length为32,所以进入else进行扩容
    if (index < lookup.length) {
        Object oldValue = lookup[index];
        lookup[index] = value;
        return oldValue == UNSET;
    } else {
        expandIndexedVariableTableAndSet(index, value);
        return true;
