Java Stream的终止操作(三)

Stream Head的构建(一)
Stream 的中间操作(二)
上两篇完成了源头和中间流的构造,这篇看看终止操作forEach是如何将整个过程串起来的。

List<String> names= Arrays.asList("one", "two", "three", "four");
        names.stream()
                .filter(s -> s.length() > 2)
                .map(String::toUpperCase)
                .forEach(System.out::println);

从上文可以看出forEach是在stream2上调用的

ReferencePipeline.java
public void forEach(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }

首先工具类ForEachOps使用Consumer构建一个终止操作作为evaluate参数

ForEachOps.java
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,  boolean ordered) {
       ...
        //OfRef说明是引用类型,还有OfInt等基本类型,都是对ForEachOp在特定数据类型上定制
        return new ForEachOp.OfRef<>(action, ordered);
    }
//OfRef 是ForEachOp子类
 OfRef(Consumer<? super T> consumer, boolean ordered) {
                super(ordered);
                this.consumer = consumer;
      }

ForEachOp和OfRef都比较简洁,看几个主要方法

ForEachOp实现了TerminalOp的evaluateSequential和TerminalSink的get方法
 public <S> Void evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator){
          //this指的就是ForEachOp.OfRef
          //下文能看到helper.wrapAndCopyInto(this, spliterator)返回的就是this
          //因为不需要返回值,所以get方法返回的就是下面的null,。
            return helper.wrapAndCopyInto(this, spliterator).get();
        }

  public Void get() {
            return null;
        }
//OfRef 实现TerminalSink的accept
   public void accept(T t) {
        //此例consumer就是打印的方法
          consumer.accept(t);
        }

接下来是evaluate方法的调用

AbstractPipeline.java
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
       ...
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
              //非并行,走的是这个方法
              :terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));
    }

先看sourceSpliterator方法

private Spliterator<?> sourceSpliterator(int terminalFlags) {
        Spliterator<?> spliterator = null;
        if (sourceStage.sourceSpliterator != null) {
        // 上篇说为什么中间流要有源头的引用,作用在这里,通过源头的引用,获取迭代器.
            spliterator = sourceStage.sourceSpliterator;
        //置为空,下次再调用进入else抛出异常
            sourceStage.sourceSpliterator = null;
        }
        else if (sourceStage.sourceSupplier != null) {
           ...
        }
        else {
          //防止再次调用
            throw new IllegalStateException(MSG_CONSUMED);
        }

       ...
        return spliterator;
    }

接下来带着两个参数进入上文所说的evaluateSequential方法,真正实现方法是helper.wrapAndCopyInto(this, spliterator)

AbstractPipeline.java
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

 public final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
       ...
        //此处AbstractPipeline类型的 p就是map方法返回的stream2,
        for ( AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        //参数sink就是上面的ForEachOp.OfRef
        //循环起来形成的就是单向链表
            sink = p.opWrapSink(..., sink);
        }
        return (Sink<P_IN>) sink;
    }

再看一下双向链表的数据结构,Sink是这样串起来的


stream wrap_sink.png

回到中间流的opWrapSink方法,发现返回的实例是ChainedReference

Sink.java
//Sink接口是Comsume的子类,ChainedReference实现了Sink接口,里面的方法先不用管
static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
        protected final Sink<? super E_OUT> downstream;
        //以下游Sink为参数,并持有引用
        public ChainedReference(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        public void end() {
            downstream.end();
        }
        ...
    }

到此中间流对源元素的操作方法就从尾到头串起来了,只差最后一步copyInto(sink)

AbstractPipeline.java
 @Override
          //wrappedSink就是上面wrapSink返回值
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
       ...
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            //这里才开始所有的计算。begin 先通知各Sink开始准备
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            //如第一篇所述,调用迭代器此方法发射元素
            spliterator.forEachRemaining(wrappedSink);
            //通知结束
            wrappedSink.end();
        }
        else {
           ...
        }
    }

//wrappedSink.begin 就是filter方法中Sink.ChainedReference复写的begin
   public void begin(long size) {
         downstream.begin(-1);
    }
  //然后调用map方法中Sink.ChainedReference的begin ,没有复写
   public void begin(long size) {
            downstream.begin(size);
    }
//最后是OfRef继承的begin
    default void begin(long size) {}

返回看看ArrayListSpliterator迭代器的遍历

ArrayList.java
 public void forEachRemaining(Consumer<? super E> action) {
            int i, hi, mc; // hoist accesses and checks from loop
            ArrayList<E> lst; Object[] a;
            if (action == null)
                throw new NullPointerException();
            if ((lst = list) != null && (a = lst.elementData) != null) {
                if ((hi = fence) < 0) {
                    mc = lst.modCount;
                    hi = lst.size;
                }
                else
                    mc = expectedModCount;
                if ((i = index) >= 0 && (index = hi) <= a.length) {
                    for (; i < hi; ++i) {
                        @SuppressWarnings("unchecked") E e = (E) a[i];
                        //把元素交给Sink
                        action.accept(e);
                    }
                    if (lst.modCount == mc)
                        return;
                }
            }
            throw new ConcurrentModificationException();
        }

如前面所述Sink是Consumer子类,实现了accept方法

 //wrappedSink.accept就是filter方法中Sink.ChainedReference复写的accept
  public void accept(P_OUT u) {
    //如果满足条件就会调用下游map的同名方法
      if (predicate.test(u))
             downstream.accept(u);
      }
  //然后是map的accept ,先映射,再调用下游accept
   public void accept(P_OUT u) {
             downstream.accept(mapper.apply(u));
      }
//最后是OfRef的accept,
   public void accept(T t) {
            //到此被打印出来
             consumer.accept(t);
     }
//虽然accept方法属于不同流,却能被一次调用,这是性能所在。

至此,Java 流式构建和操作的基本框架就显示出来了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,639评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,277评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,221评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,474评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,570评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,816评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,957评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,718评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,176评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,511评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,646评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,322评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,934评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,755评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,987评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,358评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,514评论 2 348