Java - Lambda 表达式与 Stream 接口

Java - Lambda 表达式与 Stream 接口

sschrodinger

2019/10/28


引用

java 8 Stream 原理深度解析 - Dorae

深入理解 Java Stream 流水线 - CarpenterLee


Lambda 表达式


lamda 表达式提供了丰富的语法糖,简化自己的程序,并优化自己的程序速度。

Lambda 实例

Lambda 表达式仅支持函数式编程接口,所谓函数式接口,指的是只有一个抽象方法的接口。

函数式接口可以被隐式转换为 Lambda 表达式。函数式接口可以用 @FunctionalInterface 注解标识。

Java 中有大量的函数式编程接口,如 RunnableCallable 等。

JDK 1.8 之后,又添加了一个 Java 包存放函数式接口,如下:

// java.util.function.*

public interface Consumer<T>{
    /**
     * 函数式接口,无返回值,接受一个参数,代表一个消费者
     **/
    void accept(T t); 
    // ...
}

public interface Supplier<T>{
    /**
     * 函数式接口,有返回值,无参数,代表一个生产者
     **/
    T get();
}

public interface Function<T, R>{
    /**
     * 函数式接口,有返回值,有参数
     **/
    R apply(T t);
    // ...
}

// ...

举个例子,当我们需要创建一个线程时,标准的写法如下(使用匿名内部类):

public class Demo {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
               System.out.println("hello,world");
            }
        }).start();
    }
}

lambda 表达式可以更加简略的写代码,如上代码的等效 lambda 表达式如下:

public class Demo {
    public static void main(String[] args) {
        new Thread
        (
            () -> { System.out.println("hello,world");}
        ).start();
    }
}

Lambda 语法

在 Java 中,使用 (paramters) -> expression; 或者 (paramters) -> {statements;} 来定义 lambda 表达式。其中,有部分写法可以省略,规则如下:

note

  • 可选类型声明:不需要声明参数类型,编译器可以统一识别参数值。
  • 可选的参数圆括号:一个参数无需定义圆括号,但多个参数需要定义圆括号。
  • 可选的大括号:如果主体包含了一个语句,就不需要使用大括号。
  • 可选的返回关键字:如果主体只有一个表达式返回值则编译器会自动返回值,大括号需要指定明表达式返回了一个数值。

lambda 表达式在只有一条代码时还可以引用其他方法或构造器并自动调用,可以省略参数传递,代码更加简洁,引用方法的语法需要使用 :: 符号。lambda 表达式提供了四种引用方法和构造器的方式:

  1. 引用对象的方法 类::实例方法
  2. 引用类方法 类::类方法
  3. 引用特定对象的方法 特定对象::实例方法
  4. 引用类的构造器 类::new

最常见的方法,比如说遍历打印字符串或者排序,如下:

public class LambdaDemo {

    public static void main(String[] args) {
        List<String> list = new LinkedList<>();
        list.add("1");
        list.add("2");
        list.sort(String::compareTo);
        list.forEach(System.out::println);
    }

}

其中 forEach 的实现如下:

default void forEach(Consumer<? super T> action) {
    Objects.requireNonNull(action);
    for (T t : this) {
        action.accept(t);
    }
}

复合 lambda 表达式

实现原理

lambda 表达式是通过 MethodHandlerinvokeDynamic 实现的。


Stream 接口


stream 接口主要是为了快速处理一些聚合操作,如下一个实实例:

返回以字符 A 开头的字符串的最长长度。

最传统的写法如下:

public long maxLength(List<String> data) {
    List<String> list = new LinkedList<>();
    for (String s : data) {
        if (s.startsWith("A")) list.add(s);
    }
    long max = -1;
    for (String s: list) {
        if (max < s.length()) max = s.length();
    }
    return max;
}

通过两次循环获得最大的值,但是,这样写因为有两次循环,所以会有效率上的损失,改进写法如下:

public long maxLength(List<String> data) {
    long max = -1;
    for (String s: data) {
        if (s.startsWith("A")) {
            if (max < s.length()) max = s.length();
        }
    }
    return max;
}

Stream 接口等效于第二种方式,在一次迭代中尽可能多的执行用户指定的操作,避免多次循环导致的效率问题。如下:

public long maxLength(List<String> data) {
    return data.stream().filter(s -> s.startsWith("A")).mapToLong(String::length).max().getAsLong();
}

Stream 接口总共分为两种操作,一种为中间操作(Intermediate operations),一种为结束操作(Terminal operations)。

中间操作具体分为两种,为不带状态的中间操作(Stateless)和带状态的中间操作(Stateful)。不带状态的中间操作指元素的处理不受到前面元素的影响,而带状态的中间操作必须要等到所有元素处理完之后才能知道结果。

结束操作也分为两种,为短路操作非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如找到第一个满足条件的元素。非短路操作要找到所有满足条件的元素才能完成。

如下:

Stream 操作分类
    |
    |---中间操作(Intermediate operations)   
    |           |
    |           |---无状态(Stateless)
    |           |       |---unordered() // 返回一个无序的流
    |           |       |---filter() // 按照条件过滤
    |           |       |---map() // 处理流,产生新流
    |           |       |---mapToInt() // 获得一个整数流
    |           |       |---mapToLong() // 获得一个长整型流
    |           |       |---mapToDouble() // 获得一个双实数流
    |           |       |---flatMap() // 扁平化流
    |           |       |---flatMapToInt() // 扁平化流到整形
    |           |       |---flatMapToLong() // 扁平化流到长整形
    |           |       |---flatMapToDouble() // 扁平化流到双实数类型
    |           |       |---peek() // 没有返回值的 map
    |           |
    |           |---有状态(Stateful)   
    |                   |---distinct() // 实现非重复流
    |                   |---sorted() // 排序
    |                   |---limit() // 取前 x 元素
    |                   |---skip() // 跳过前 x 元素
    |
    |
    |---结束操作(Terminal operations)
                |
                |---非短路操作
                |       |---forEach() // 遍历(使用 parallelStream 不保证顺序)
                |       |---forEachOrdered() // 遍历(保证顺序)
                |       |---toArray() // 转换成数组
                |       |---reduce() // 合并流,比如说对流中所有元素求和
                |       |               data.stream().reduce((a, b) -> a + b);
                |       |---collect() // 收集流
                |       |---max() // 求最大值
                |       |---min() // 求最小值
                |       |---count() // 统计个数
                |
                |---短路操作(short-circuiting)  
                        |---anyMatch() // 任意一个匹配,返回 true
                        |---allMatch() // 所有匹配,返回 true
                        |---noneMatch() // 所有都不匹配,返回 true
                        |---findFirst() // 找到第一个出现的元素并返回
                        |---findAny() // 找到任意出现的元素并返回

Stream 接口实现原理

Stream 接口采用流水线的方式执行整个过程,要构建一个流水线,需要解决三个问题:

  • 流水线构建
  • 流水线方法保存
  • 流水线操作执行
  • 流水线结果保存

流水线构建 使用 pipeline,Java Stream 中,流水线的中间节点使用 PipelineHelper 来代表一个 stage,每个 stage 就是一个用户指定的操作,如 mapfilter 等。多个不同的 stage 连接成一个双向链表,这个双向链表就封装了用户的全部操作。

如下实例:

data.stream().filter(s -> s.startsWith("A")).mapToLong(String::length).max();

构成的流水线如下:

                     |---------| ----> |---------| -----> |---------| ------> |---------|
                     |   Head  |       |filter op|        | map  op |         | max  op |
                     |---------| <---- |---------| <----- |---------| <------ |---------|
                     
data source ---------> stage 0 ---------> stage 1 ---------> stage 2 ---------> stage 3
             stream()           filter()          mapToLong()           max()

流水线方法保存 采用的是 sink 接口,提供了最重要的四个函数,如下:

interface Sink<T> extends Consumer<T> {

    // 开始遍历元素之前调用该方法,通知Sink做好准备。
    default void begin(long size) {}

    // 所有元素遍历完成之后调用,通知Sink没有更多的元素了。
    default void end() {}

    // 是否可以结束操作,可以让短路操作尽早结束。
    default boolean cancellationRequested() {
        return false;
    }
    
    // 遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。
    default void accept(int value) {
        throw new IllegalStateException("called wrong accept method");
    }

    default void accept(long value) {
        throw new IllegalStateException("called wrong accept method");
    }

    default void accept(double value) {
        throw new IllegalStateException("called wrong accept method");
    }
    
    // ... 
}

一般来说,重写这些函数就可以实现聚合逻辑。例如 filter 实现如下:

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                // 自己开始的同时,激活下一链
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                // 预测是否满足要求,满足要求则将其丢给下一链处理
                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

对于稍微复杂的有状态操作,如排序,如下:

private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    private ArrayList<T> list;

    // 创建一个临时的链表,保存结果
    @Override
    public void begin(long size) {
        if (size >= Nodes.MAX_ARRAY_SIZE)
            throw new IllegalArgumentException(Nodes.BAD_SIZE);
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }

    // 对临时表进行排序
    // 激活下一链处理
    // 发送数据到下一链
    @Override
    public void end() {
        list.sort(comparator);
        downstream.begin(list.size());
        if (!cancellationWasRequested) {
            list.forEach(downstream::accept);
        }
        else {
            for (T t : list) {
                if (downstream.cancellationRequested()) break;
                downstream.accept(t);
            }
        }
        downstream.end();
        list = null;
    }

    // 上一连将其所有的元素加入到临时表中
    @Override
    public void accept(T t) {
        list.add(t);
    }
        
    // ...
}

note

  • 从流水线的执行上来看,对于无状态的中间操作,是可以通过流水线进行并行的,但是对于有状态的操作,则必须等到所有的数据处理完之后才能够进行下一步,所以,每当有有状态的中间操作,就会多一次循环。

filter 函数为例,函数主题返回一个实现了 Stream 接口的 StatelessOp 对象,如下:

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) // ... 匿名类实现
}

StatelessOp 是属于 ReferencePipeline 的一个静态内部类。构造函数如下:

// 将上一级 stage 引用加入到该 stage 中
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
            StreamShape inputShape,
            int opFlags) {
    super(upstream, opFlags);
    assert upstream.getOutputShape() == inputShape;
}

那么实际上对于 filter 函数,是将上一级的 stage 引用(他自身)加入到了一个新的 stage 中。并返回了那个新的 stage,如下:

                                                     |--------------------------|
                 |---------|                         |  previousStage = stage n |
                 | stage n |                         |        stage n + 1       |
                 |---------|                         |--------------------------|
stream().func() ------------> stream.func().filter() ---------------------------->

流水线执行 使用结束操作唤醒。结束操作也是一个特殊的 stage。根据流水线的双向队列,不考虑结束操作时的执行,如下:

data.stream().filter(s -> s.startsWith("A")).mapToLong(String::length).max();
//   |------|>|----------------------------|>|-----------------------|>|------|
//   | Head | |        filter op           | |       map op          | |max op|
//   |------|<|----------------------------|<|-----------------------|<|------|

当执行 max 函数时,会执行如下的语句:

@Override
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
    return reduce(BinaryOperator.maxBy(comparator));
}

maxBy 函数很简单,仅仅是返回一个比较器,如下:

public static <T> BinaryOperator<T> maxBy(Comparator<? super T> comparator) {
    Objects.requireNonNull(comparator);
    return (a, b) -> comparator.compare(a, b) >= 0 ? a : b;
}

reduce 函数才是一个短路操作,如下:

@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
    return evaluate(ReduceOps.makeRef(accumulator));
}

makeRef 函数本质上来说也是生成一个 sink,如下:

public static <T> TerminalOp<T, Optional<T>> makeRef(BinaryOperator<T> operator) {
        class ReducingSink implements AccumulatingSink<T, Optional<T>, ReducingSink> {
            private boolean empty;
            private T state;

            public void begin(long size) {
                empty = true;
                state = null;
            }

            @Override
            public void accept(T t) {
                if (empty) {
                    empty = false;
                    state = t;
                } else {
                    state = operator.apply(state, t);
                }
            }

            @Override
            public Optional<T> get() {
                return empty ? Optional.empty() : Optional.of(state);
            }

            @Override
            public void combine(ReducingSink other) {
                if (!other.empty)
                    accept(other.state);
            }
        }
        return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }

所有的计算工作都是由 evaluate 函数完成的,如下:

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

实际上, stream 流计算有两种模式,一种是串行的方式,一种是并行的方式,首先看串行的方式 evaluateSequential,如下:

@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<P_IN> spliterator) {
    return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}

helper 代表的实际上是除开 max 后的最后一个 stage,即 mapToLong,可以根据如下函数栈追踪到运行的函数,如下:

copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
       /|\
        |
helper.wrapAndCopyInto(makeSink(), spliterator);

copyInto 为运行的核心函数,如下:

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

就是该函数启动了整个流水线的执行。

Stream 接口并行化

对于无状态的 stage,是可以开启并行来处理的,主要是用到了 Java 提供的 Fork/Join 框架

来看 evaluateParallel 函数,如下:

@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                 Spliterator<P_IN> spliterator) {
    return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

ReduceTaskForkJoinTask 的一个子类,invoke() 函数就是开启计算流程,get() 函数等待计算完成并返回结果。

ForkJoinTask 最重要的就是重写 compute 函数

@Override
public void compute() {
    Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
    long sizeEstimate = rs.estimateSize();
    long sizeThreshold = getTargetSize(sizeEstimate);
    boolean forkRight = false;
    @SuppressWarnings("unchecked") K task = (K) this;
    while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
        K leftChild, rightChild, taskToFork;
        task.leftChild  = leftChild = task.makeChild(ls);
        task.rightChild = rightChild = task.makeChild(rs);
        task.setPendingCount(1);
        if (forkRight) {
            forkRight = false;
            rs = ls;
            task = leftChild;
            taskToFork = rightChild;
        }
        else {
            forkRight = true;
            task = rightChild;
            taskToFork = leftChild;
        }
        taskToFork.fork();
        sizeEstimate = rs.estimateSize();
    }
    task.setLocalResult(task.doLeaf());
    task.tryComplete();
}

使用 Spliterator 迭代器产生数据,并且交叉左右节点执行 fork 函数(即分叉计算),在 tryComplete 中执行结果的合并操作,流程图如下:

并行操作

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容