Stream Head的构建(一)
Stream 的中间操作(二)
上两篇完成了源头和中间流的构造,这篇看看终止操作forEach是如何将整个过程串起来的。
List<String> names= Arrays.asList("one", "two", "three", "four");
names.stream()
.filter(s -> s.length() > 2)
.map(String::toUpperCase)
.forEach(System.out::println);
从上文可以看出forEach是在stream2上调用的
ReferencePipeline.java
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
首先工具类ForEachOps使用Consumer构建一个终止操作作为evaluate参数
ForEachOps.java
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action, boolean ordered) {
...
//OfRef说明是引用类型,还有OfInt等基本类型,都是对ForEachOp在特定数据类型上定制
return new ForEachOp.OfRef<>(action, ordered);
}
//OfRef 是ForEachOp子类
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
ForEachOp和OfRef都比较简洁,看几个主要方法
ForEachOp实现了TerminalOp的evaluateSequential和TerminalSink的get方法
public <S> Void evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator){
//this指的就是ForEachOp.OfRef
//下文能看到helper.wrapAndCopyInto(this, spliterator)返回的就是this
//因为不需要返回值,所以get方法返回的就是下面的null,。
return helper.wrapAndCopyInto(this, spliterator).get();
}
public Void get() {
return null;
}
//OfRef 实现TerminalSink的accept
public void accept(T t) {
//此例consumer就是打印的方法
consumer.accept(t);
}
接下来是evaluate方法的调用
AbstractPipeline.java
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
...
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
//非并行,走的是这个方法
:terminalOp.evaluateSequential(this,sourceSpliterator(terminalOp.getOpFlags()));
}
先看sourceSpliterator方法
private Spliterator<?> sourceSpliterator(int terminalFlags) {
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
// 上篇说为什么中间流要有源头的引用,作用在这里,通过源头的引用,获取迭代器.
spliterator = sourceStage.sourceSpliterator;
//置为空,下次再调用进入else抛出异常
sourceStage.sourceSpliterator = null;
}
else if (sourceStage.sourceSupplier != null) {
...
}
else {
//防止再次调用
throw new IllegalStateException(MSG_CONSUMED);
}
...
return spliterator;
}
接下来带着两个参数进入上文所说的evaluateSequential方法,真正实现方法是helper.wrapAndCopyInto(this, spliterator)
AbstractPipeline.java
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
public final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
...
//此处AbstractPipeline类型的 p就是map方法返回的stream2,
for ( AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
//参数sink就是上面的ForEachOp.OfRef
//循环起来形成的就是单向链表
sink = p.opWrapSink(..., sink);
}
return (Sink<P_IN>) sink;
}
再看一下双向链表的数据结构,Sink是这样串起来的
回到中间流的opWrapSink方法,发现返回的实例是ChainedReference
Sink.java
//Sink接口是Comsume的子类,ChainedReference实现了Sink接口,里面的方法先不用管
static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
protected final Sink<? super E_OUT> downstream;
//以下游Sink为参数,并持有引用
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
...
}
到此中间流对源元素的操作方法就从尾到头串起来了,只差最后一步copyInto(sink)
AbstractPipeline.java
@Override
//wrappedSink就是上面wrapSink返回值
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
...
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
//这里才开始所有的计算。begin 先通知各Sink开始准备
wrappedSink.begin(spliterator.getExactSizeIfKnown());
//如第一篇所述,调用迭代器此方法发射元素
spliterator.forEachRemaining(wrappedSink);
//通知结束
wrappedSink.end();
}
else {
...
}
}
//wrappedSink.begin 就是filter方法中Sink.ChainedReference复写的begin
public void begin(long size) {
downstream.begin(-1);
}
//然后调用map方法中Sink.ChainedReference的begin ,没有复写
public void begin(long size) {
downstream.begin(size);
}
//最后是OfRef继承的begin
default void begin(long size) {}
返回看看ArrayListSpliterator迭代器的遍历
ArrayList.java
public void forEachRemaining(Consumer<? super E> action) {
int i, hi, mc; // hoist accesses and checks from loop
ArrayList<E> lst; Object[] a;
if (action == null)
throw new NullPointerException();
if ((lst = list) != null && (a = lst.elementData) != null) {
if ((hi = fence) < 0) {
mc = lst.modCount;
hi = lst.size;
}
else
mc = expectedModCount;
if ((i = index) >= 0 && (index = hi) <= a.length) {
for (; i < hi; ++i) {
@SuppressWarnings("unchecked") E e = (E) a[i];
//把元素交给Sink
action.accept(e);
}
if (lst.modCount == mc)
return;
}
}
throw new ConcurrentModificationException();
}
如前面所述Sink是Consumer子类,实现了accept方法
//wrappedSink.accept就是filter方法中Sink.ChainedReference复写的accept
public void accept(P_OUT u) {
//如果满足条件就会调用下游map的同名方法
if (predicate.test(u))
downstream.accept(u);
}
//然后是map的accept ,先映射,再调用下游accept
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
//最后是OfRef的accept,
public void accept(T t) {
//到此被打印出来
consumer.accept(t);
}
//虽然accept方法属于不同流,却能被一次调用,这是性能所在。
至此,Java 流式构建和操作的基本框架就显示出来了。