并行执行任务的Fork/Join框架

背书中(引用书上的话):Java7中提供了用于并行执行任务的Fork/Join框架, 可以把任务分成若干个分任务,最终汇总每个分任务的结果得到总任务的结果。这篇我们来看看Fork/Join框架。

先举个栗子

一个字符串数组,需要把每个元素中的*字符的索引返回,并求和(自己编了个栗子,没有撒实际意义),用Fork/Join框架来实现,可以定义一个处理字符串数组的总任务,然后把总任务拆分,把数组中每个字符串交给子任务去处理,然后等待子任务执行完毕,汇总结果,并返回:

package thread.ForkJoin;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * @Description: .
 * @Author: ZhaoWeiNan .
 * @CreatedTime: 2017/6/21 .
 * @Version: 1.0 .
 */
public class StringTask extends RecursiveTask<Integer>{
    //要处理的字符串
    private String dest;

    public StringTask(String dest) {
        this.dest = dest;
    }
    //父类RecursiveTask是一个抽象类,所以需要实现compute方法
    @Override
    protected Integer compute() {
        if (dest == null || "".equals(dest))
            return 0;
        //判断字符串中 * 的索引,并返回
        return dest.indexOf("*");
    }
}

class ArrayTask extends RecursiveTask<Integer>{
    //需要处理的字符串数组
    private String[] array;

    public ArrayTask(String[] array) {
        this.array = array;
    }

    @Override
    protected Integer compute() {
        if (array == null || array.length < 1)
            return 0;

        //申明一个StringTask变量,作为子任务
        StringTask stringTask;
        //定义一个子任务队列,用于任务执行完毕后,获取子任务的执行结果
        List<StringTask> list = new ArrayList<>();
        int sum = 0;
        //把字符串数组的中每一个字符串分给多个StringTask子任务去处理
        for (String s : array){
            //创建一个变量,作为子任务去处理字符串
            stringTask = new StringTask(s);
            //执行子任务
            stringTask.fork();
            //加入子任务队列
            list.add(stringTask);
        }

        for (StringTask task : list){
            //等子任务执行完毕,获取子任务执行的结果,并累加
            sum += task.join();
        }

        return sum;
    }
}

class Demo{

    public static void main(String[] args){
        //初始化字符串数组
        String[] array = new String[]{"#####*####","##*########","###*#######","#*############"};
        //创建一个总任务,处理字符串数组
        ArrayTask arrayTask = new ArrayTask(array);
        //创建执行任务的线程池ForkJoinPool对象
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //执行总任务
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(arrayTask);

        //返回任务的结果
        try {
            System.out.println(forkJoinTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

代码放到了开源中国:http://git.oschina.net/zhaoweinan/forkjoin,有兴趣的小伙伴可以拿去
总的来说,Fork/Join框架就是一个用来并行执行任务的框架,可以把一个大任务,分成若干个子任务,等各个子任务执行完毕,可以把他们的执行结果获取到,并汇聚,起到了并行执行任务作用。

Fork/Join框架的构成

1.ForkJoinPool

ForkJoinPool类图

ForkJoinPool继承了AbstractExecutorService抽象类,AbstractExecutorService实现了ExecutorService接口,由此看来ForkJoinPool也是线程池家族的一员,


过滤了下方法,只显示了公共方法,并截取了一下

ForkJoinPool使用invoke、execute、submit用来执行任务。

2.ForkJoinTask

ForkJoinTask类图

ForkJoinTask是Fork/Join框架使用的任务类,实现了Future接口,我们一般使用它的两个子类RecursiveTask和RecursiveAction,


RecursiveAction类图
public abstract class RecursiveAction extends ForkJoinTask<Void> {
    private static final long serialVersionUID = 5232453952276485070L;

RecursiveAction适用于没有返回结果的任务,

RecursiveTask类图
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;

RecursiveTask适用于有返回值的任务。

Work-Stealing (工作窃取)

粗略说一下Work-Stealing,ForkJoinPool具有 Work-Stealing (工作窃取)的能力,什么意思呢?就拿文章开头的栗子来说,把处理字符串数组的大任务,分成了若干个处理字符串的子任务,这些子任务线程执行完毕后,不会闲着,回去执行别的子任务,通俗的来说,Work-Stealing (工作窃取)就是线程从其他队列里面获取任务来执行。

Work-Stealing的优点

充分利用了线程,提高了线程并行执行任务的效率,并减少了线程间竞争带来的系统开销。

Work-Stealing的缺点

存在竞争的情况,而且占用了更多的系统资源。

Fork/Join框架原理

ForkJoinPool分析

贴一张ForkJoinPool的类图


大小不好调整,就截取一般吧

注意箭头所指的两个属性,
ForkJoinTask<?>数组submissionQueue,存放程序加到ForkJoinPool的任务

    private ForkJoinTask<?>[] submissionQueue;

ForkJoinWorkerThread类继承了Thread,是一个线程类, ForkJoinWorkerThread[] workers就是一个线程数组,负责去执行submissionQueue中的任务

    ForkJoinWorkerThread[] workers;

    .....
    public class ForkJoinWorkerThread extends Thread

ForkJoinTask分析

fork方法

获取当前ForkJoinWorkerThread线程,调用ForkJoinWorkerThread的pushTask方法执行ForkJoinTask任务

   public final ForkJoinTask<V> fork() {
        //获取当前ForkJoinWorkerThread线程,调用ForkJoinWorkerThread的pushTask方法执行任务
        ((ForkJoinWorkerThread) Thread.currentThread())
                .pushTask(this);
        return this;
    }

再来看看ForkJoinWorkerThread的pushTask方法:

final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                //调用线程池ForkJoinPool的signalWork方法
                pool.signalWork();
            else if (s == m)
                growQueue();
        }
    }

ForkJoinWorkerThread的pushTask方法把任务ForkJoinTask加到了ForkJoinTask[]任务数组中,并调用了ForkJoinPool线程池的signalWork方法唤醒线程或者创建一个线程去执行任务,粗略的贴一下signalWork的关键代码:

private void addWorker() {
        Throwable ex = null;
        ForkJoinWorkerThread t = null;
        try {
            t = factory.newThread(this);
        } catch (Throwable e) {
            ex = e;
        }
        if (t == null) {  // null or exceptional factory return
            long c;       // adjust counts
            do {} while (!UNSAFE.compareAndSwapLong
                         (this, ctlOffset, c = ctl,
                          (((c - AC_UNIT) & AC_MASK) |
                           ((c - TC_UNIT) & TC_MASK) |
                           (c & ~(AC_MASK|TC_MASK)))));
            // Propagate exception if originating from an external caller
            if (!tryTerminate(false) && ex != null &&
                !(Thread.currentThread() instanceof ForkJoinWorkerThread))
                UNSAFE.throwException(ex);
        }
        else
            t.start();
    }

最终调用到了这里,来执行任务。

join方法

从文章开头的栗子来看,join方法会阻塞当前线程,等待获取任务执行的结果

    //百度了这四种状态的含义
    private static final int NORMAL      = -1;   //NORMAL已完成
    private static final int CANCELLED   = -2;  //CANCELLED已取消
    private static final int EXCEPTIONAL = -3;  //EXCEPTIONAL出现异常
    private static final int SIGNAL      =  1;  //SIGNAL信号

     public final V join() {
        //先调用doJoin方法判断上面定义的四个状态
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
    }

join方法先调用doJoin方法判断任务的状态,看看doJoin方法,

   private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        //获取当前ForkJoinWorkerThread线程
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            //如果状态是小于0 也就是 -1,-2,-3 分别代表已完成、已取消、出现异常
            //直接返回状态
            if ((s = status) < 0)
                return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                //如果状态为1|SIGNAL|信号
                //执行任务
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    //出现异常,把状态改为-3|EXCEPTIONAL|出现异常,返回
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    //执行成功,把状态改为-1|NORMAL|已完成,返回
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }

doJoin查看任务的状态,如果状态是-1|NORMAL|已完成,-2|CANCELLED|已取消,-3|EXCEPTIONAL|出现异常,证明任务已经执行完毕,返回状态位,如果状态是 1|SIGNAL|信号,则去执行任务,如果执行成功返回-1|NORMAL|已完成,出现异常返回-3|EXCEPTIONAL|出现异常。
再来看看返回结果的reportResult方法和getRawResult方法:

private V reportResult() {
        int s; Throwable ex;
        //如果状态为-2|CANCELLED|已取消,抛出一个CancellationException异常
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            UNSAFE.throwException(ex);
        //调用getRawResult方法返回结果
        return getRawResult();
    }

reportResult方法,先会判断状态,如果状态为-2|CANCELLED|已取消,则抛出一个CancellationException异常,否则调用getRawResult方法返回结果:

    public abstract V getRawResult();

getRawResult方法在ForkJoinTask类是抽象方法,具体实现在他的两子类中。
RecursiveAction子类:

    public final Void getRawResult() { return null; }

所以说RecursiveAction子类使用于没有返回值的任务。
RecursiveTask子类:

public final V getRawResult() {
        return result;
    }

RecursiveTask子类适用于有返回值的任务。

并行执行任务的Fork/Join框架是说完了。
欢迎大家来交流,指出文中一些说错的地方,让我加深认识。
谢谢大家!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容

  • 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架,采用类似于分治...
    码农历险记阅读 2,208评论 0 2
  • 一、多线程 说明下线程的状态 java中的线程一共有 5 种状态。 NEW:这种情况指的是,通过 New 关键字创...
    Java旅行者阅读 4,665评论 0 44
  • 摘要 这篇论文描述了Fork/Join框架的设计、实现以及性能。这个框架通过(递归的)把问题划分为子任务,然后并行...
    itonyli阅读 1,157评论 0 5
  • 导读目录 线程组(ThreadGroup) 线程池(Thread Pool) Fork/Join框架和Execut...
    ql2012jz阅读 1,448评论 0 0
  • 安老师今天让我们制作小卡片,小卡片长是21厘米宽度7厘米。为了让我们练习五以内的加减法,妈妈给我找了许多烟盒,让我...
    张余蔚阅读 471评论 0 0