学习Java 8 Stream Api (4) - Stream 终端操作之 collect

Stream API

上篇内容我们学习了Stream的大部分终端操作,我们这篇着重了解下Stream中重要的终端操作:collect。

collect 方法

序号 支持的类 方法定义 方法说明
1 Stream<T> <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner); 对此流的元素执行 mutable reduction操作。
2 Stream<T> <R, A> R collect(Collector<? super T, A, R> collector); 使用 Collector对此流的元素执行 mutable reduction Collector。

以下代码见 StreamTerminalOperationTransformTest。

实现3参数转换接口

序号1的方法,传递了3个参数,参数1为创建新结果容器的函数;参数2为累加器函数,将参数1和流内元素执行累加操作;参数3为组合器函数,并行执行时会使用该函数。

同步执行时,该方法相当于执行:

R result = supplier.get();
for (T element : this stream) {
  accumulator.accept(result, element);
}
return result;

我们编写如下代码,看下实际效果

// 使用collect方法实现字符串连接
log.info("拼接字符串为:{}",
        Stream.of("I", "love", "you", "too")
                .collect(StringBuilder::new, (b1, b2) -> {
                    log.info("累加执行:{} + {}", b1, b2);
                    b1.append(b2);
                }, (b1, b2) -> {
                    log.info("组合执行:{} ++ {}", b1, b2);
                    b1.append(b2);
                })
                .toString());

以上代码将输出如下日志:

[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加执行: + I

[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加执行:I + love

[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加执行:Ilove + you

[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加执行:Iloveyou + too

[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 拼接字符串为:Iloveyoutoo

并行执行时,该方法相当于执行:

R result1 = supplier.get();
R result2 = supplier.get();
R result3 = supplier.get();
R result4 = supplier.get();

// 累加执行,此处为并发(多线程)执行,每行代表一个线程
accumulator.accept(result1, element1);
accumulator.accept(result2, element2);
accumulator.accept(result3, element3);
accumulator.accept(result4, element4);
// ...
// accumulator.accept(resultN, elementN);

// 开始组合,此处为并发(多线程)执行,每行代表一个线程
combiner.accept(result1, result2);
combiner.accept(result3, result4);
combiner.accept(result1, result3);
// combiner.accept(result1, resultN);

return result1;

将上述的代码改为.parallel()方式调用,将输出如下日志:

[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加执行: + you

[ForkJoinPool.commonPool-worker-3] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加执行: + I

[ForkJoinPool.commonPool-worker-2] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加执行: + too

[ForkJoinPool.commonPool-worker-2] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 组合执行:you ++ too

[ForkJoinPool.commonPool-worker-1] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加执行: + love

[ForkJoinPool.commonPool-worker-1] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 组合执行:I ++ love

[ForkJoinPool.commonPool-worker-1] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 组合执行:Ilove ++ youtoo

[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 拼接字符串为:Iloveyoutoo

注意:上述日志中出现的ForkJoinPool.commonPool-worker-N为并发(多线程)执行时的线程名。

实现Collector接口

实现Collector需要实现如下4个接口:

// 一个创建并返回一个新的可变结果容器的函数。
Supplier<A> supplier();
// 将值折叠成可变结果容器的函数。
BiConsumer<A, T> accumulator();
// 一个接受两个部分结果并将其合并的函数。 
BinaryOperator<A> combiner();
// 执行从中间累积类型 A到最终结果类型 R的最终 R 。 
Function<A, R> finisher();
// 返回一个 Collector.Characteristics 类型的Set, 表示该收集容器的特征。
Set<Characteristics> characteristics();

collect方法执行时,他们的调用流程如下:

  1. 创建新的结果容器(supplier())
  2. 将新的数据元素并入结果容器(accumulator())
  3. 将两个结果容器组合成一个(combiner())
  4. 在容器上执行可选的最终变换(finisher())

简单来讲,生成容器A,通过accumulator针对A及流元素T执行累加,(如果并行存在的话)对多个A执行组合combiner,最终执行finisher后由A转换为R。对于使用者来说,A为中间变量,无关其实现细节。

我们实现一个计算整数流的平均数的Collector,代码如下:

// 使用collector实现求ping均值
log.info("[1, 2, 3, 4, 5, 6]的平均值:{}",
        Stream.of(1, 2, 3, 4, 5, 6)
                .parallel()
                .collect(new Collector<Integer, long[], Double>() {
                             @Override
                             public Supplier<long[]> supplier() {
                                 return () -> new long[2];
                             }

                             @Override
                             public BiConsumer<long[], Integer> accumulator() {
                                 return (a, t) -> {
                                     log.info("{}累加{}", a, t);
                                     a[0] += t;
                                     a[1]++;
                                 };
                             }

                             @Override
                             public BinaryOperator<long[]> combiner() {
                                 return (a, b) -> {
                                     log.info("{}组合{}", a, b);
                                     a[0] += b[0];
                                     a[1] += b[1];
                                     return a;
                                 };
                             }

                             @Override
                             public Function<long[], Double> finisher() {
                                 return (a) -> a[1] == 0 ? 0 : new Long(a[0]).doubleValue() / a[1];
                             }

                             @Override
                             public Set<Characteristics> characteristics() {
                                 Set<Characteristics> set = new HashSet<>();
                                 set.add(Characteristics.CONCURRENT);
                                 return set;
                             }
                         }
                )
);

常用Collector

通过上面的示例,我们实现了一个自定义的Collector,我们发现实现一个自定义的Collector还是比较麻烦的,需要实现5个接口。
Java 开发者们已经想到了这个问题,他们额外提供了一个 of 方法,可以通过lambda的方式创建 collector,类似 collect 中传递几个参数:提供者、累加器、组合器、完成器以及特征配置,此处我们就不细讲了。
Java 开发者们更为贴心的为我们创建了一些常用的 Collector ,让我们可以直接使用。这些常用的 Collector 实现放在 Collectors 类下,我们来了解下。

统计平均值 averagingXxx 的使用

Collectors 提供了 averagingDouble、averagingLong、averagingInt 3种统计平均值的 Collector 实现类,以下代码以 averagingInt 为例,由于使用方式相似,我们就不举例了。

// 使用collector实现求均值
log.info("[1, 2, 3, 4, 5, 6]的平均值:{}",
        Stream.of(1, 2, 3, 4, 5, 6)
        .collect(Collectors.averagingInt(n -> n))
);

统计元素个数 counting 的使用

该方法和 Stream 中的 count 方法一样。

// 使用collector获取元素数量
log.info("[1, 2, 3, 4, 5, 6]的个数:{}",
        Stream.of(1, 2, 3, 4, 5, 6)
                .collect(Collectors.counting())
);

统计总和 summingXxx 的使用

Collectors 提供了 summingDouble、summingLong、summingInt 3种统计求和值的 Collecto r实现类,同时还提供了 summarizingDouble 、 summarizingLong 、summarizingInt 3种统计对象的 Collector 实现类,以下代码以 summingInt 为例,由于使用方式相似,我们就不举例了。

// 使用collector获取总和
log.info("[1, 2, 3, 4, 5, 6]的总和:{}",
        Stream.of(1, 2, 3, 4, 5, 6)
                .collect(Collectors.summingInt(n -> n))
);

统计最小元素 minBy 的使用

// 使用collector获取最小元素
log.info("[1, 2, 3, 4, 5, 6]的最小值:{}",
        Stream.of(1, 2, 3, 4, 5, 6)
                .collect(Collectors.minBy(Integer::min))
                .get()
);

统计最大元素 maxBy 的使用

// 使用collector获取最da元素
log.info("[1, 2, 3, 4, 5, 6]的最大值:{}",
        Stream.of(1, 2, 3, 4, 5, 6)
                .collect(Collectors.maxBy(Integer::max))
                .get()
);

统计累加处理 reducing 的使用

reducing 和 Stream 中的 reduce 操作方法类似,我们就不详述了。

// 使用collector实现求均值
log.info("[1, 2, 3, 4, 5, 6]的求和:{}",
        Stream.of(1, 2, 3, 4, 5, 6)
                .collect(Collectors.reducing(0, Integer::sum))
);

转换映射 mapping 的使用

mapping 支持将 第一个参数的结果再次执行转换,即向下游传递。

log.info("[1, 2, 3, 4, 5, 6]每个增加20后的平均值:{}",
        Stream.of(1, 2, 3, 4, 5, 6)
                .collect(Collectors.mapping(n -> n + 20, Collectors.averagingInt(n -> n)))
);

转换连接 joining 的使用

joining 提供了 3 种重载方法,支持传递 分隔符、前缀、后缀等。

// 使用collector连接字符串
log.info("连接字符串为:{}",
        Stream.of("I", "love", "you", "too")
                .collect(Collectors.joining(" ", "Java, ", "!"))
);

转换为集合 toList 的使用

log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]转换为集合:{}",
        Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
                .collect(Collectors.toList())
);

转换为Map toMap 的使用

toMap 提供了 3 种重载方法,除了指定 Key 和 Value 的生成器外,区别在于对于 Key 重复时, Value的处理方式;以及初始Map的生成器。

log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]转换为Map:{}",
        Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
                .collect(Collectors.toMap(Object::toString, n -> n, Integer::sum))
);

转换为Set toSet 的使用

log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]的转换为Set:{}",
        Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
                .collect(Collectors.toSet())
);

转换为分组 groupingBy 的使用

分组函数将流中元素按某种定义分组,也提供了 2 种重载方法,支持递归向下游分组。

log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]的分组数据:{}",
        Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
                .collect(Collectors.groupingBy(n -> n))
);

转换为分区 partitioningBy 的使用

分区函数将流中元素按条件分为2组,也提供了 2 种重载方法,支持递归向下游分组。

log.info("[1, 2, 3, 4, 5, 6]的奇偶分区数据:{}",
        Stream.of(1, 2, 3, 4, 5, 6)
                .collect(Collectors.partitioningBy(n -> n %2 == 0))
);

其他方法

Collectors 中还提供了 groupingByConcurrent 、 toCollection 、 toConcurrentMap 等几种支持并发的 Collector 实现,用法基本和非并发的相同,我们就不详述了。

源码详见:https://github.com/crystalxmumu/spring-web-flux-study-note

以上是本期笔记的内容,我们下期见。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352