java8-流

2、java8: 流

reactor

流是JavaAPI的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理。

示例

示例一:取热量小于400,并按卡路里排序

  1. Java7
    public static List<String> getLowCaloricDishesNamesInJava7(List<Dish> dishes){
        List<Dish> lowCaloricDishes = new ArrayList<>();
        for(Dish d: dishes){
            if(d.getCalories() < 400){
                lowCaloricDishes.add(d);
            }
        }
        List<String> lowCaloricDishesName = new ArrayList<>();
        Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
            public int compare(Dish d1, Dish d2){
                return Integer.compare(d1.getCalories(), d2.getCalories());
            }
        });
        for(Dish d: lowCaloricDishes){
            lowCaloricDishesName.add(d.getName());
        }
        return lowCaloricDishesName;
    }
  1. java8实现
    public static List<String> getLowCaloricDishesNamesInJava8(List<Dish> dishes){
        return dishes.stream()
                .filter(d -> d.getCalories() < 400)
                .sorted(comparing(Dish::getCalories))
                .map(Dish::getName)
                .collect(toList());
    }

为了利用多核架构并行执行这段代码,你只需要把stream()换成parallelStream():

流概念

简短的定义就是“从支持数据处理操作的源生成的元素序列”。


流的构成.png

特点:
只遍历一次:一个流只能遍历一次
内部迭代:steams库使用内部迭代把迭代做了,不需要通过用户编写代码进行外部迭代

操作

中间操作和终端操作

中间操作和终端操作.png

操作分类

筛选:filter/distinct
截短流:limit
跳过元素:skip
映射:map、flatmap
查找:anyMatch/noneMatch/findAny/findFirst
归约:reduce/collect

map/flatmap的区别:

        List<Integer> mapResult = Stream.of(1, 2)
                .map(number -> number + 1).collect(toList());

        List<Integer> flatMapResult = Stream.of(Arrays.asList(1, 2), Arrays.asList(3, 4))
                .flatMap(numbers -> numbers.stream()).collect(toList());

map:
操作流中的每一个元素,(入参:出参=1:1)
flatmap:
仍操作流中的每一个元素,但流中的元素是列表,flatmap的Function入参是列表,出参是是流,将流合并作为执行结果是)(入参:出参=1:n)
一言以蔽之,flatmap方法让你把一个流中的每个值都换成另一个流,然后把所有的流连接起来成为一个流。

        Arrays.asList("Hello", "World").stream()
                .map(line -> line.split(""))
                .flatMap(line -> Arrays.stream(line))
                .distinct()
                .forEach(System.out::println);

执行过程如下图:


flatmap执行过程.png

归约求最大值、最小值、和

        //求和
        List<Integer> numbers = Arrays.asList(3, 4, 5, 1, 2);
        int sum = numbers.stream().reduce(0, (a, b) -> a + b);
        int sum1 = numbers.stream().reduce(0,  (a, b) -> Integer.sum(a, b));
        int sum2 = numbers.stream().reduce(0, Integer::sum);
          int sum3 = numbers.stream().mapToInt(Integer::intValue).sum();



        //最大值
        int max = numbers.stream().reduce(0, Integer::max);

        //最小值
        Optional<Integer> min = numbers.stream().reduce(Integer::min);

构建流

由值、数组、集合生成流水
由文件生成流
由函数生成流:创建无限流

用流收集数据

收集器

收集器用于将stream中的元素做汇总,传递给collect方法的参数是Collector接口的一个实现。

collect(Collectors.toList()):按顺序给每个元素生成一个列表;
collect(Collectors.groupingBy(Transaction::getCurrency)):生成一个map,它的键是货币,值是等于该货币的列表。
按币种对交易进行分组:

    //建立累积交易分组的Map
        Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>();
        //迭代Transaction的List
        for (Transaction transaction : transactions) {
            //提取Transaction的货币
            Currency currency = transaction.getCurrency();
            List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency);
            //如果分组Map中没有这种货币的条目,就创建一个
            if (transactionsForCurrency == null) {
                transactionsForCurrency = new ArrayList<>();
                transactionsByCurrencies.put(currency, transactionsForCurrency);
            }
            //将当前遍历的Transaction加入同一货币的Transaction的List
            transactionsForCurrency.add(transaction);
        }
        Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream().collect(groupingBy(Transaction::getCurrency));

预定义的收集器
从Collectors类提供的工厂方法(例如groupingBy)创建的收集器。
它们主要提供了三大功能:

  • 将流元素归约和汇总为一个值
  • 元素分组
  • 元素分区
    自定义收集器
    自已实现Collector接口实现收集器功能

归约和汇总

          //总和
        long howManyDishes = menu.stream().collect(counting());

        //平均
        double avgCalories = menu.stream().collect(averagingInt(Dish::getCalories));
        
          //最大值
        Comparator<Dish> dishCaloriesComparator = Comparator.comparingInt(Dish::getCalories);
        Optional<Dish> mostCaloricDish = menu.stream().collect(maxBy(dishCaloriesComparator));

        //汇总 最大、最小、平均、总和  IntSummaryStatistics{count=9, sum=4300, min=120, average=477.777778, max=800}
        IntSummaryStatistics intSummaryStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));

连接字符串

        //连接字符串 1,2,3
        String str = Arrays.asList(1, 2, 3).stream().map(String::valueOf).collect(joining(","));
        int sum1 = menu.stream().collect(Collectors.summingInt(Dish::getCalories));
        int sum2 = menu.stream().collect(Collectors.reducing(0, Dish::getCalories, (i, j) -> i + j));
        int sum3 = menu.stream().collect(Collectors.reducing(0, Dish::getCalories, Integer::sum));
        int sum4 = menu.stream().map(Dish::getCalories).reduce(Integer::sum).get();
        int sum5 = menu.stream().mapToInt(Dish::getCalories).sum();

注意:stream接口的collect和reduce方法通常可以获取相同的结果,尽可能的为手头的问题探索不同的解决方案,但在通用的方案中,始终选择最专门化的一个。

分组

分组示例:

        //在鱼类型分类
        Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(groupingBy(Dish::getType));
        
        //把热量不到400卡路里的菜划分为“低热量”(diet),热量400到700卡路里的菜划为“普通”(normal),高于700卡路里的划为“高热量”(fat)。
        Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(
                groupingBy(dish -> {
                    if (dish.getCalories() <= 400) return CaloricLevel.DIET;
                    else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
                    else return CaloricLevel.FAT;
                }));

多级分组:

  Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishedByTypeAndCaloricLevel = menu.stream().collect(
                groupingBy(Dish::getType,
                        groupingBy((Dish dish) -> {
                            if (dish.getCalories() <= 400) return CaloricLevel.DIET;
                            else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
                            else return CaloricLevel.FAT;
                        })
                )
        );

多级分组过程:


多级分组.png

按子组收集数据

        //每类菜有多少个
        Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting()));

        //每类菜的总卡路里数
        Map<Dish.Type, Integer> sumCaloriesByType = menu.stream().collect(groupingBy(Dish::getType,
                summingInt(Dish::getCalories)));

        //每类菜的最高热量的菜
        Map<Dish.Type, Optional<Dish>> mostCaloriesByType = menu.stream().collect(groupingBy(Dish::getType,
                maxBy(Comparator.comparingInt(Dish::getCalories))));

        //每类菜的最高热量的菜-去除Optional
        Map<Dish.Type, Dish> mostCaloricDishesByTypeWithoutOptional = menu.stream().collect(
                groupingBy(Dish::getType,
                        collectingAndThen(
                                maxBy(Comparator.comparingInt(Dish::getCalories)),
                                Optional::get)));

分区

分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组——true是一组,false是一组。

        Map<Boolean, List<Dish>> partitionByVegeterian = menu.stream().collect(partitioningBy(Dish::isVegetarian));

结果:{false=[pork, beef, chicken, prawns, salmon], true=[French fries, rice, season fruit, pizza]}

并行流

可以通过对收集源调用parallelStream方法来把集合转换为并行流。
并行流内部使用了默认的ForkJoinPool(分支/合并框架),它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()得到的。
// 使用这个属性可以修改默认的线程数
System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,”20”);
这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议你不要修改它。

public static long parallelSum(List<Long> list) {
    return list.stream().parallel().reduce(Long::sum).get();
}

使用并行流时注意事项:
1、要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高
2、尽量不要有共享变更的修改,如果有,要注意线程安全(加锁的话,会出现中午执行)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 207,113评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,644评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,340评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,449评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,445评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,166评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,442评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,105评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,601评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,066评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,161评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,792评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,351评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,352评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,584评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,618评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,916评论 2 344

推荐阅读更多精彩内容

  • Java8 in action 没有共享的可变数据,将方法和函数即代码传递给其他方法的能力就是我们平常所说的函数式...
    铁牛很铁阅读 1,213评论 1 2
  • 原文地址: 深蓝至尊 一. 流式处理简介 在我接触到java8流式处理的时候,我的第一感觉是流式处理让集合操作变得...
    咻咻咻i阅读 1,135评论 0 0
  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 5,189评论 0 9
  • Java 8 数据流教程 原文:Java 8 Stream Tutorial 译者:飞龙 协议:CC BY-NC-...
    布客飞龙阅读 936评论 1 46
  • 生活其实就是日复一日,没有那么多的惊天动地,即使你使出一记重拳,可能依然还是那么无聊。 这个世界上,总有一些认真而...
    元初阅读 484评论 0 1