

public class UseExchanger {
    private static final Exchanger<Set<String>> exchange
            = new Exchanger<Set<String>>();

    public static void main(String[] args) {

        new Thread(new Runnable() {
            public void run() {
                Set<String> setA = new HashSet<String>();//存放数据的容器
                try {
                    for (String s : setA) {
                        System.out.print(s + " ");

                    setA = exchange.exchange(setA);//交换set
                    for (String s : setA) {
                        System.out.print(s+ " ");

                } catch (InterruptedException e) {

        new Thread(new Runnable() {
            public void run() {
                Set<String> setB = new HashSet<String>();//存放数据的容器
                try {
                    for (String s : setB) {
                        System.out.print(s+ " ");

                    setB = exchange.exchange(setB);//交换set
                    for (String s : setB) {
                        System.out.print(s+ " ");

                } catch (InterruptedException e) {


Zhang Li 

Zhang Li 


A synchronization point at which threads can pair and swap 
elements within pairs. Each thread presents some object on entry to 
the exchange method, matches with a partner thread, and receives 
its partner's object on return. An Exchanger may be viewed as a 
bidirectional form of a SynchronousQueue. Exchangers may be 
useful in applications such as genetic algorithms and pipeline 

Sample Usage: Here are the highlights of a class that uses an 
Exchanger to swap buffers between threads so that the thread filling 
the buffer gets a freshly emptied one when it needs it, handing off the 
filled one to the thread emptying the buffer.



 class FillAndEmpty {
   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
   DataBuffer initialEmptyBuffer = ... a made-up type
   DataBuffer initialFullBuffer = ...

   class FillingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialEmptyBuffer;
       try {
         while (currentBuffer != null) {
           if (currentBuffer.isFull())
             currentBuffer = exchanger.exchange(currentBuffer);
       } catch (InterruptedException ex) { ... handle ... }

   class EmptyingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialFullBuffer;
       try {
         while (currentBuffer != null) {
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
       } catch (InterruptedException ex) { ... handle ...}

   void start() {
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
Overview: The core algorithm is, for an exchange "slot",
and a participant (caller) with an item:


for (;;) {
  if (slot is empty) {                       // offer
    place item in a Node;
    if (can CAS slot from empty to node) {
      wait for release;
      return matching item in node;
  else if (can CAS slot from node to empty) { // release
    get the item in node;
    set matching item in node;
    release waiting thread;
  // else retry on CAS failure
This is among the simplest forms of a "dual data structure" --
see Scott and Scherer's DISC 04 paper and

This works great in principle. But in practice, like many
algorithms centered on atomic updates to a single location, it
scales horribly when there are more than a few participants
using the same Exchanger. So the implementation instead uses a
form of elimination arena, that spreads out this contention by
arranging that some threads typically use different slots,
while still ensuring that eventually, any two parties will be
able to exchange items. That is, we cannot completely partition
across threads, but instead give threads arena indices that
will on average grow under contention and shrink under lack of
contention. We approach this by defining the Nodes that we need
anyway as ThreadLocals, and include in them per-thread index
and related bookkeeping state. (We can safely reuse per-thread
nodes rather than creating them fresh each time because slots
alternate between pointing to a node vs null, so cannot
encounter ABA problems. However, we do need some care in
resetting them between uses.)

这是“dual data structure”中最简单的形式 - 参见Scott和Scherer的DISC 04论文。

这在原则上很有效。但实际上,就像许多以单个位置的原子更新为中心的算法一样,当有多个参与者使用相同的Exchanger时,它会出现可怕的扩展。因此,实现使用的是elimination arena形式,通过安排一些线程使用不同的slots来分散争用,并且仍然保证最终任何两方能够交换数据。也即,不能完全跨线程进行分区,而是给线程提供areana索引,该索引在竞争情况下增长在缺少争用时收缩。通过定义需要的Nodes作为ThreadLocals来实现,并将线程索引和相关簿记状态包含在里面。(可以安全地重用线程结点,而不是每次创建它们,因为slots在指向结点与null之间交替,因此不会遇到ABA问题。但需,在使用之间重置的时候需要小心。)

Implementing an effective arena requires allocating a bunch of
space, so we only do so upon detecting contention (except on
uniprocessors, where they wouldn't help, so aren't used).
Otherwise, exchanges use the single-slot slotExchange method.
On contention, not only must the slots be in different
locations, but the locations must not encounter memory
contention due to being on the same cache line (or more
generally, the same coherence unit).  Because, as of this
writing, there is no way to determine cacheline size, we define
a value that is enough for common platforms.  Additionally,
extra care elsewhere is taken to avoid other false/unintended
sharing and to enhance locality, including adding padding (via
sun.misc.Contended) to Nodes, embedding "bound" as an Exchanger
field, and reworking some park/unpark mechanics compared to
LockSupport versions.

The arena starts out with only one used slot. We expand the
effective arena size by tracking collisions; i.e., failed CASes
while trying to exchange. By nature of the above algorithm, the
only kinds of collision that reliably indicate contention are
when two attempted releases collide -- one of two attempted
offers can legitimately fail to CAS without indicating
contention by more than one other thread. (Note: it is possible
but not worthwhile to more precisely detect contention by
reading slot values after CAS failures.)  When a thread has
collided at each slot within the current arena bound, it tries
to expand the arena size by one. We track collisions within
bounds by using a version (sequence) number on the "bound"
field, and conservatively reset collision counts when a
participant notices that bound has been updated (in either



The effective arena size is reduced (when there is more than
one slot) by giving up on waiting after a while and trying to
decrement the arena size on expiration. The value of "a while"
is an empirical matter.  We implement by piggybacking on the
use of spin->yield->block that is essential for reasonable
waiting performance anyway -- in a busy exchanger, offers are
usually almost immediately released, in which case context
switching on multiprocessors is extremely slow/wasteful.  Arena
waits just omit the blocking part, and instead cancel. The spin
count is empirically chosen to be a value that avoids blocking
99% of the time under maximum sustained exchange rates on a
range of test machines. Spins and yields entail some limited
randomness (using a cheap xorshift) to avoid regular patterns
that can induce unproductive grow/shrink cycles. (Using a
pseudorandom also helps regularize spin cycle duration by
making branches unpredictable.)  Also, during an offer, a
waiter can "know" that it will be released when its slot has
changed, but cannot yet proceed until match is set.  In the
mean time it cannot cancel the offer, so instead spins/yields.
Note: It is possible to avoid this secondary check by changing
the linearization point to be a CAS of the match field (as done
in one case in the Scott & Scherer DISC paper), which also
increases asynchrony a bit, at the expense of poorer collision
detection and inability to always reuse per-thread nodes. So
the current scheme is typically a better tradeoff.

通过在一段时间后放弃等待并尝试在到期时减小arena大小来减少有效的arena大小。“一段时间”值是一个经验问题。通过捎带使用spin-> yield-> block来实现,这对于合理的等待性能至关重要——在繁忙的exchanger中,offers总是立即释放,这种情况下,多处理器上的上下文切花极其缓慢和浪费。arena等待指示省略了blocking部分,改用取消。根据经验选择自旋的次数,在一系列测试机器上以最大持续交换速率的99%时间避免阻塞。自旋和yields需要一些有限的随机性(使用简易的xorshift)以避免可能导致无效的增长、收缩循环的规则模式。(也可以使用伪随机数)。此外,在offer期间,当槽改变时等待者可以知道其将会被释放,但是直到匹配设置前还不能继续。在此期间,不能取消offer,所以只能自旋或者yields。

注意:可以通过将线性化点更改为匹配域的CAS(在Scott & Scherer DISC论文中的一个案例中所做的那样)来避免这种二次检查,这也会增加异步性。但代价是较弱的冲突检测和无法重用线程结点。因此,目前的方案通常是更好的权衡。

On collisions, indices traverse the arena cyclically in reverse
order, restarting at the maximum index (which will tend to be
sparsest) when bounds change. (On expirations, indices instead
are halved until reaching 0.) It is possible (and has been
tried) to use randomized, prime-value-stepped, or double-hash
style traversal instead of simple cyclic traversal to reduce
bunching.  But empirically, whatever benefits these may have
don't overcome their added overhead: We are managing operations
that occur very quickly unless there is sustained contention,
so simpler/faster control policies work better than more
accurate but slower ones.

Because we use expiration for arena size control, we cannot
throw TimeoutExceptions in the timed version of the public
exchange method until the arena size has shrunken to zero (or
the arena isn't enabled). This may delay response to timeout
but is still within spec.

Essentially all of the implementation is in methods
slotExchange and arenaExchange. These have similar overall
structure, but differ in too many details to combine. The
slotExchange method uses the single Exchanger field "slot"
rather than arena array elements. However, it still needs
minimal collision detection to trigger arena construction.
(The messiest part is making sure interrupt status and
InterruptedExceptions come out right during transitions when
both methods may be called. This is done by using null return
as a sentinel to recheck interrupt status.)



基本上所有的实现都在slotExchange和arenaExchange方法中。它们具有相似的整体结构,但因为太多细节不同而不能合并。slotExchange方法使用单个Exchanger域slot而不是arena数组元素。但是其仍然需要最小的碰撞检测来触发arena构造。(最混乱的部分是在转换期间两种方法都可能调用时确保中断状态和InterruptedExceptions正确。这是通过使用null return作为重新检查中断状态的标记来完成的。)

As is too common in this sort of code, methods are monolithic
because most of the logic relies on reads of fields that are
maintained as local variables so can't be nicely factored --
mainly, here, bulky spin->yield->block/cancel code), and
heavily dependent on intrinsics (Unsafe) to use inlined
embedded CAS and related memory access operations (that tend
not to be as readily inlined by dynamic compilers when they are
hidden behind other methods that would more nicely name and
encapsulate the intended effects). This includes the use of
putOrderedX to clear fields of the per-thread Nodes between
uses. Note that field Node.item is not declared as volatile
even though it is read by releasing threads, because they only
do so after CAS operations that must precede access, and all
uses by the owning thread are otherwise acceptably ordered by
other operations. (Because the actual points of atomicity are
slot CASes, it would also be legal for the write to Node.match
in a release to be weaker than a full volatile write. However,
this is not done because it could allow further postponement of
the write, delaying progress.)




    public Exchanger() {
        participant = new Participant();
     * Per-thread state
    private final Participant participant;

     * Elimination array; null until enabled (within slotExchange).
     * Element accesses use emulation of volatile gets and CAS.
    private volatile Node[] arena;

     * Slot used until contention detected.
    private volatile Node slot;

     * The index of the largest valid arena position, OR'ed with SEQ
     * number in high bits, incremented on each update.  The initial
     * update from 0 to SEQ is used to ensure that the arena array is
     * constructed only once.
    private volatile int bound;
    /** The corresponding thread local class */
    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }

     * Nodes hold partially exchanged data, plus other per-thread
     * bookkeeping. Padded via @sun.misc.Contended to reduce memory
     * contention.
    @sun.misc.Contended static final class Node {
        int index;              // Arena index
        int bound;              // Last recorded value of Exchanger.bound
        int collides;           // Number of CAS failures at current bound
        int hash;               // Pseudo-random for spins
        Object item;            // This thread's current item
        volatile Object match;  // Item provided by releasing thread
        volatile Thread parked; // Set to this thread when parked, else null





  • 有其他线程到达交换点
  • 其他线程中断了当前线程
    public V exchange(V x) throws InterruptedException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x; // translate null args
        if ((arena != null ||
             (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() || // disambiguates null return
              (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;

4.1 slotExchange

     * Exchange function used until arenas enabled. See above for explanation.
     * @param item the item to exchange
     * @param timed true if the wait is timed
     * @param ns if timed, the maximum wait time, else 0L
     * @return the other thread's item; or null if either the arena
     * was enabled or the thread was interrupted before completion; or
     * TIMED_OUT if timed and timed out
    private final Object slotExchange(Object item, boolean timed, long ns) {
        Node p = participant.get();
        Thread t = Thread.currentThread();
        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
            return null;

        for (Node q;;) {
            if ((q = slot) != null) {
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                    return v;
                // create arena on contention, but continue until slot null
                if (NCPU > 1 && bound == 0 &&
                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT];
            else if (arena != null)
                return null; // caller must reroute to arenaExchange
            else {
                p.item = item;
                if (U.compareAndSwapObject(this, SLOT, null, p))
                p.item = null;

        // await release
        int h = p.hash;
        long end = timed ? System.nanoTime() + ns : 0L;
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;
        while ((v = p.match) == null) {
            if (spins > 0) {
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
            else if (slot != p)
                spins = SPINS;
            else if (!t.isInterrupted() && arena == null &&
                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    U.park(false, ns);
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        return v;


  • 设置participant中存储的Node.item = item,然后设置slot为p
  • 自旋加yield
  • 将p.parked设置为当前线程,然后阻塞LockSupport.park(this);



  • 取出slot:q = slot,将slot CAS设置为null
  • 将q.match这只后到达线程的item,并唤醒阻塞的线程LockSupport.unpark(q.parked)
  • 然后返回q.item
  • 此时,前面阻塞的线程被唤醒:
    将p.parked p.match p.item设置为null

4.2 arenaExchange

     * Exchange function when arenas enabled. See above for explanation.
     * @param item the (non-null) item to exchange
     * @param timed true if the wait is timed
     * @param ns if timed, the maximum wait time, else 0L
     * @return the other thread's item; or null if interrupted; or
     * TIMED_OUT if timed and timed out
    private final Object arenaExchange(Object item, boolean timed, long ns) {
        Node[] a = arena;
        int alen = a.length;
        Node p = participant.get();
        for (int i = p.index;;) {                      // access slot at i
            int b, m, c;
            int j = (i << ASHIFT) + ((1 << ASHIFT) - 1);
            if (j < 0 || j >= alen)
                j = alen - 1;
            Node q = (Node)AA.getAcquire(a, j);
            if (q != null && AA.compareAndSet(a, j, q, null)) {
                Object v = q.item;                     // release
                q.match = item;
                Thread w = q.parked;
                if (w != null)
                return v;
            else if (i <= (m = (b = bound) & MMASK) && q == null) {
                p.item = item;                         // offer
                if (AA.compareAndSet(a, j, null, p)) {
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    Thread t = Thread.currentThread(); // wait
                    for (int h = p.hash, spins = SPINS;;) {
                        Object v = p.match;
                        if (v != null) {
                            MATCH.setRelease(p, null);
                            p.item = null;             // clear for next use
                            p.hash = h;
                            return v;
                        else if (spins > 0) {
                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            if (h == 0)                // initialize hash
                                h = SPINS | (int)t.getId();
                            else if (h < 0 &&          // approx 50% true
                                     (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield();        // two yields per wait
                        else if (AA.getAcquire(a, j) != p)
                            spins = SPINS;       // releaser hasn't set match yet
                        else if (!t.isInterrupted() && m == 0 &&
                                 (!timed ||
                                  (ns = end - System.nanoTime()) > 0L)) {
                            p.parked = t;              // minimize window
                            if (AA.getAcquire(a, j) == p) {
                                if (ns == 0L)
                                    LockSupport.parkNanos(this, ns);
                            p.parked = null;
                        else if (AA.getAcquire(a, j) == p &&
                                 AA.compareAndSet(a, j, p, null)) {
                            if (m != 0)                // try to shrink
                                BOUND.compareAndSet(this, b, b + SEQ - 1);
                            p.item = null;
                            p.hash = h;
                            i = p.index >>>= 1;        // descend
                            if (Thread.interrupted())
                                return null;
                            if (timed && m == 0 && ns <= 0L)
                                return TIMED_OUT;
                            break;                     // expired; restart
                    p.item = null;                     // clear offer
            else {
                if (p.bound != b) {                    // stale; reset
                    p.bound = b;
                    p.collides = 0;
                    i = (i != m || m == 0) ? m : m - 1;
                else if ((c = p.collides) < m || m == FULL ||
                         !BOUND.compareAndSet(this, b, b + SEQ + 1)) {
                    p.collides = c + 1;
                    i = (i == 0) ? m : i - 1;          // cyclically traverse
                    i = m + 1;                         // grow
                p.index = i;


实际运行中,大约会有50%几率(arenaExchange注释中有提到)得到负数。又由于代码中在随机数是负数时减少spins,所以默认的1024次spins实际会被执行大约2048次,换句话说for循环大约执行2048次后,spins变成0,而不是1024次。同时,代码中会判断spins和 (SPINS >>> 1) – 1的位操作结果来执行yield,注释提到会执行两次。为什么是两次?因为这两次分别是spins为1 << 9和0的时候,只有这两次时位操作结果才是0。

2)对于false sharing的处理:
1.8引入的Contended标注,在类上设置这个标注之后,类的前后会有128字节(是的,字节)的padding。假如你在类上设置了这个标注,那么这个类构成的数组是否也解决了false sharing问题?答案是否定的。这涉及到Java对象的内存布局。Java的数组首先有一个数组头,然后是每个对象object的指针,这个指针在Java 1.8默认启动指针压缩的情况下是4个字节。所以你对数组元素进行CAS的话,还是会碰到false sharing问题,即便你在对象上设置了Contended。记住,Contended只能保证连续的对象内容之间不会有false sharing问题,但是不能处理连续对象指针之间的问题。所以Exchanger中是这样初始化数组的

arena = new Node[(FULL + 2) << ASHIFT];


Exchanger选择使用threadlocal和数组而不是原来dual data structure的链表。用链表的话,就无法用threadlocal,因为用于交换的节点会被其他线程访问和修改,主要是next,一方面生成周期不可控,另外一方可能会有ABA问题。但是用链表就不需要现在Exchanger里这么复杂的扩大和缩小策略,因为新来的线程在失败时会创建新节点,然后等待其他线程来访。Exchanger在讲到等待match时候优先选择用threadlocal,这是一个tradeoff。扩大和缩小的策略如下:

  • 逻辑数组初始大小为0
  • 当线程无法通过slotExchange交换时开始arenaExchange,这往往是第三个线程
  • 数组逻辑大小为0时,CAS bound长度加1,版本加1。由于数组预先全部创建完成,所以不会有物理扩大,只是逻辑上的
  • 在数组上进行类似slotExchange的操作
  • 如果有更多的线程进来,会扩大逻辑大小直到最大值FULL
  • 如果线程等不到配对线程,会到当前数组索引号/2的位置继续尝试交换,同时缩小数组逻辑大小(减1),版本加1;如果当前索引号时0,则会park
  • arenaExchange一旦被开启,之后所有请求都不会进入slotExchange


