线程间交换数据的Exchanger

今天给大家介绍一个并发包中的线程工具Exchanger,他的主要作用是用来进行线程之间的数据交换的,一起来看看吧。

Exchanger

背书中:Exchanger是一个用来进行线程之间的数据交换的工具类,它提供了一个同步点,在这个同步点,两个线程可以互相交换数据,当一个线程执行exchange()方法时,会等待第二个线程执行exchange()方法,这个时刻就应该是书中所说的同步点,这两个线程就可以互相交换数据。

举个栗子

就拿今天晚上吃饭做例子吧,a去买鸡翅包饭,b去买了汉堡,然后互相替换了一个,上代码吧

package thread.exchanger;

import java.util.concurrent.Exchanger;

/**
 * @author ZhaoWeinan
 * @date 2018/10/16
 * @description
 */
public class ExchangerDemo {

    private static Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args){
        new Thread(new Runnable() {
            @Override
            public void run() {
                String a = "我买了鸡翅包饭!";
                System.out.println(Thread.currentThread().getName() + "说:" + a);
                try {
                    System.out.println(Thread.currentThread().getName() + "说:等着b买汉堡!");
                    exchanger.exchange(a);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                String b = "我买了汉堡!";
                System.out.println(Thread.currentThread().getName() + "说:" + b);

                try {
                    System.out.println(Thread.currentThread().getName() + "说:现在我们来交换吧!");
                    String a = exchanger.exchange(b);
                    //把第一个线程中 a 变量拿了过来
                    System.out.println(Thread.currentThread().getName() + "说我拿到了鸡翅包饭!");
                    System.out.println(Thread.currentThread().getName() + "说:" + a);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

这段代码很好的描述了这个场景,创建了两个线程,在线程一中定义了一个变量a,a的内容是“我买了鸡翅包饭!”,在线程二中定义了一个变量b,b的内容是“我买了汉堡!”,然后线程一调用了exchanger.exchange(a)方法,进行了阻塞,也就是书中所说的等待同步点,在线程二中,调用exchanger.exchange(b)方法,也就是说,这两个线程都到达了同步点,所以线程一获取到了线程二中的变量“我买了汉堡!”,线程二获取到了线程一中的变量“我买了鸡翅包饭!”,代码运行结果如下:


运行结果

栗子就先说到这,来看一下源码吧

看下源码

Exchanger类的结构

简单说一下,他主要有两个个内部类Node,Participant

Node类
@sun.misc.Contended static final class Node {
    // Arena的索引
    int index;
    //最后记录的Exchanger的bound属性值
    //Exchanger类中有个bound属性,用volatile修饰的
    int bound;
    //当前bound中原子操作的失败次数
    //可以推断 Exchanger是用volatile、原子操作来保证线程安全的
    int collides;
    //用于自旋的伪随机数
    int hash;
    //这个线程的数据对象吧,是用来交换的对象
    Object item;
    //用来释放线程的对象
    volatile Object match;
    //当挂起,设置为次线程,否则为空
    volatile Thread parked;
}

这个类看来主要是用来进行数据交换的类,Exchanger类中,有一个Node类型的slot属性,一个Node[]类型的arena属性,这两个就是所谓的单槽、多槽的两种模式,是在这两模式中用来交换的槽位,来看看Participant类

Participant类
 static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }

Participant类继承了ThreadLocal,主要作用就是初始化一个node对象
再来看看核心的交换方法

slotExchange单槽交换

为了方便解说,整段代码就不贴了,一段一段的说吧

 Node p = participant.get();
 Thread t = Thread.currentThread();

创建一个Node对象p,获取了当前线程t

for (Node q;;) {
            //先判断slot属性,
            //不为空,证明有线程在等待着数据交换
            if ((q = slot) != null) {
          .........
              }
        }

先判断slot属性,不为空,证明有线程在等待着数据交换

if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        U.unpark(w);
                    return v;
                }

说说这段代码,compareAndSwapObject方法是sun.misc.Unsafe类的一个方法

/* 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法
     * 的操作应该是原子的,因此提供了一种不可中断的方式更新object field。
     * @param var1 包含要修改field的对象
     * @param var2 object中field的偏移量
     * @param var4 希望field中存在的值
     * @param var5 如果期望值var4与var1的field当前值相同,设置var1的field值为这个新值
     * @return true 如果field的值被更改,则返回true             
     */
    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

有机会给大家细讲,在这主要作用是,用于比较this对象,this对象就是Exchanger的实例,this对象的SLOT偏移量上的属性与是否与q相等,如果相等则把this对象的这个属性更新为null,返回true,反之不更新,返回false,看一眼SLOT怎么定义的

private static final long SLOT;
....
SLOT = U.objectFieldOffset
                (ek.getDeclaredField("slot"));

这个SLOT对应的是“slot”字段,也就是上面那个方法比较的是Exchanger实例中的slot属性

if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        U.unpark(w);
                    return v;
                }

如果compareAndSwapObject执行成功,就把该线程挂起,然后唤起等待的线程,返回交换的结果,如果没有执行成功,那么我们来看看下面else块中的代码:

    for (Node q;;) {
            //先判断slot属性,
            //不为空,证明有线程在等待着数据交换
            if ((q = slot) != null) {
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        U.unpark(w);
                    return v;
                }
                // 存在线程间cpu的竞争,构建一个多槽位arena为解决问题
                if (NCPU > 1 && bound == 0 &&
                        U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT];
            }
            else if (arena != null)
                //如果构建多槽位,返回,使用多槽位模式解决问题
                return null;
            else {
                //继续使用compareAndSwapObject就行比较交换
               //这边入参变了,大家注意一下
              //总结下这段代码的意思就是用当前线程来占领槽位
                p.item = item;
                if (U.compareAndSwapObject(this, SLOT, null, p))
                    break;
                p.item = null;
            }
        }

如果存在线程间cpu的竞争,构建一个多槽位arena为解决问题,否则,判断当前操作是否为空,如果为空就跳出循环,否则,无线循环来走上面的流程

       //当前线程占领了槽位,等待其它线程来交换数据
        int h = p.hash;
        long end = timed ? System.nanoTime() + ns : 0L;
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;
        //循环,直到match为空
        while ((v = p.match) == null) {
            if (spins > 0) {
                //自旋
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                      //线程自旋等待中,为了避免别的线程都在等待,先让出cpu执行权
                      Thread.yield();
            }
           //其它线程来交换数据了,修改槽位
            else if (slot != p)
                spins = SPINS;
           //线程没有发生中断
           //还是单槽模式
           //而且还没有超时
            else if (!t.isInterrupted() && arena == null &&
                    (!timed || (ns = end - System.nanoTime()) > 0L)) {
                //设置BLOCKER
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    //阻塞
                    U.park(false, ns);
                //清空 BLOCKER
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            }
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                //使用compareAndSwapObject进行比较交换,如果成功跳出循环
               //如果是超时了,或者线程被中断,则返回null
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        //交换完毕,返回v
        //成功的话就是要交换的数据,如果超时、线程中断等情况返回null
        return v;

这段代码的主要意思是,当前线程占领了槽位,等待其它线程来交换数据,自旋进行等待,如果有别的线程来交换数据了,那么使用compareAndSwapObject进行比较交换,交换到数据,唤起被挂起的线程,返回交换结果,如果是超时或者线程被中断时,返回null。
这就是单槽位模式,多槽位,没有怎么研究过,相信原理差不多,文章有点冗长,就不说多槽位模式了。

应用场景

说说应用场景吧,书上说可以用用于同步任务队列,遗传算法(表示没有听过),数据校对(这个还感觉比较靠谱),说说我第一次见到他吧,是我继承别人的代码的时候,一个定时任务用到的,有很多张mysql表,需要计算汇总结果到一张表里面,使用了Exchanger,但是感觉实现方式很多,这一种并不是很惊艳,不知道大家有没有好的使用场景,可以说一下。

Exchanger就为大家简单的说到这,欢迎大家来交流,指出文中一些说错的地方,让我加深认识,愿大家没有bug,谢谢!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,393评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,790评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,391评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,703评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,613评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,003评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,507评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,158评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,300评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,256评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,274评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,984评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,569评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,662评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,899评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,268评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,840评论 2 339

推荐阅读更多精彩内容