这里以集合、数组为源构建流,只讨论串行而先忽略并行方式,如果你对Lambda表达式和函数式接口不是很了解,可以参考这篇
//以集合为数据源
Collection.java
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
会被子类复写
default Spliterator<E> spliterator() {
return Spliterators.spliterator(this, 0);
}
以ArrayList为列,调用spliterator()方法生成一个可分割迭代器(splitable iterator),用于对数据源进行遍历和分区
ArrayList.java
public Spliterator<E> spliterator() {
return new ArrayListSpliterator<>(this, 0, -1, 0);
}
ArrayListSpliterator(ArrayList<E> list, int origin, int fence,int expectedModCount) {
this.list = list; // OK if null unless traversed
this.index = origin;
this.fence = fence;
this.expectedModCount = expectedModCount;
}
迭代器在串流使用到的主要方法就是遍历
//ArrayList.java
public void forEachRemaining(Consumer<? super E> action) {
int i, hi, mc; // hoist accesses and checks from loop
ArrayList<E> lst; Object[] a;
if (action == null)
throw new NullPointerException();
if ((lst = list) != null && (a = lst.elementData) != null) {
if ((hi = fence) < 0) {
mc = lst.modCount;
hi = lst.size;
}
else
mc = expectedModCount;
if ((i = index) >= 0 && (index = hi) <= a.length) {
for (; i < hi; ++i) {
@SuppressWarnings("unchecked") E e = (E) a[i];
action.accept(e);
}
if (lst.modCount == mc)
return;
}
}
throw new ConcurrentModificationException();
}
再看以可变长参数为数据源
Stream.java
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
Arrays.java
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}
public static <T> Spliterator<T> spliterator(T[] array, int startInclusive, int endExclusive) {
return Spliterators.spliterator(array, startInclusive, endExclusive,Spliterator.ORDERED | Spliterator.IMMUTABLE);
}
由Spliterators中方法实现
Spliterators.java
public static <T> Spliterator<T> spliterator(Object[] array, int fromIndex, int toIndex,
int additionalCharacteristics) {
...
return new ArraySpliterator<>(array, fromIndex, toIndex, additionalCharacteristics);
}
public ArraySpliterator(Object[] array, int origin, int fence, int additionalCharacteristics) {
this.array = array;
this.index = origin;
this.fence = fence;
this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED;
}
//ArraySpliterator的迭代方式
public void forEachRemaining(Consumer<? super T> action) {
Object[] a; int i, hi; // hoist accesses and checks from loop
if (action == null)
throw new NullPointerException();
if ((a = array).length >= (hi = fence) &&
(i = index) >= 0 && i < (index = hi)) {
//挨个发射数组中的数据
do { action.accept((T)a[i]); } while (++i < hi);
}
}
除了这两种,Stream还能使用iterate、generate方法构造无限流,暂不讨论。
上述都以返回的Spliterator作为参数,使用工具类StreamSupport的方法stream构建一个源头。
StreamSupport.java
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
...
//Head是ReferencePipeline的静态内部类,也是其子类,ReferencePipeline是AbstractPipeline子类
return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);
}
看一下组织结构,参考https://www.cnblogs.com/CarpenterLee/p/6637118.html
Head构造方法调用父类AbstractPipeline的构造方法来初始化源头
AbstractPipeline(Spliterator<?> source,int sourceFlags, boolean parallel) {
this.previousStage = null;
//源头持用Spliterator引用
this.sourceSpliterator = source;
//sourceStage就是源头的引用,这里使用自己初始化
this.sourceStage = this;
//先忽略
this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
this.depth = 0;
this.parallel = parallel;
}
再看一张全家福,出自//www.greatytc.com/p/ee5712de3927。和Stream同一级别的还有几个定制的基础类型,ReferencePipeline也是一样,
到此,数据源的迭代方式和源头的构建就完成了,下面来看看Stream 的中间操作(二)。