2、java8: 流
reactor
流是JavaAPI的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理。
示例
示例一:取热量小于400,并按卡路里排序
- Java7
public static List<String> getLowCaloricDishesNamesInJava7(List<Dish> dishes){
List<Dish> lowCaloricDishes = new ArrayList<>();
for(Dish d: dishes){
if(d.getCalories() < 400){
lowCaloricDishes.add(d);
}
}
List<String> lowCaloricDishesName = new ArrayList<>();
Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
public int compare(Dish d1, Dish d2){
return Integer.compare(d1.getCalories(), d2.getCalories());
}
});
for(Dish d: lowCaloricDishes){
lowCaloricDishesName.add(d.getName());
}
return lowCaloricDishesName;
}
- java8实现
public static List<String> getLowCaloricDishesNamesInJava8(List<Dish> dishes){
return dishes.stream()
.filter(d -> d.getCalories() < 400)
.sorted(comparing(Dish::getCalories))
.map(Dish::getName)
.collect(toList());
}
为了利用多核架构并行执行这段代码,你只需要把stream()换成parallelStream():
流概念
简短的定义就是“从支持数据处理操作的源生成的元素序列”。
特点:
只遍历一次:一个流只能遍历一次
内部迭代:steams库使用内部迭代把迭代做了,不需要通过用户编写代码进行外部迭代
操作
中间操作和终端操作
操作分类
筛选:filter/distinct
截短流:limit
跳过元素:skip
映射:map、flatmap
查找:anyMatch/noneMatch/findAny/findFirst
归约:reduce/collect
map/flatmap的区别:
List<Integer> mapResult = Stream.of(1, 2)
.map(number -> number + 1).collect(toList());
List<Integer> flatMapResult = Stream.of(Arrays.asList(1, 2), Arrays.asList(3, 4))
.flatMap(numbers -> numbers.stream()).collect(toList());
map:
操作流中的每一个元素,(入参:出参=1:1)
flatmap:
仍操作流中的每一个元素,但流中的元素是列表,flatmap的Function入参是列表,出参是是流,将流合并作为执行结果是)(入参:出参=1:n)
一言以蔽之,flatmap方法让你把一个流中的每个值都换成另一个流,然后把所有的流连接起来成为一个流。
Arrays.asList("Hello", "World").stream()
.map(line -> line.split(""))
.flatMap(line -> Arrays.stream(line))
.distinct()
.forEach(System.out::println);
执行过程如下图:
归约求最大值、最小值、和
//求和
List<Integer> numbers = Arrays.asList(3, 4, 5, 1, 2);
int sum = numbers.stream().reduce(0, (a, b) -> a + b);
int sum1 = numbers.stream().reduce(0, (a, b) -> Integer.sum(a, b));
int sum2 = numbers.stream().reduce(0, Integer::sum);
int sum3 = numbers.stream().mapToInt(Integer::intValue).sum();
//最大值
int max = numbers.stream().reduce(0, Integer::max);
//最小值
Optional<Integer> min = numbers.stream().reduce(Integer::min);
构建流
由值、数组、集合生成流水
由文件生成流
由函数生成流:创建无限流
用流收集数据
收集器
收集器用于将stream中的元素做汇总,传递给collect方法的参数是Collector接口的一个实现。
collect(Collectors.toList()):按顺序给每个元素生成一个列表;
collect(Collectors.groupingBy(Transaction::getCurrency)):生成一个map,它的键是货币,值是等于该货币的列表。
按币种对交易进行分组:
//建立累积交易分组的Map
Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>();
//迭代Transaction的List
for (Transaction transaction : transactions) {
//提取Transaction的货币
Currency currency = transaction.getCurrency();
List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency);
//如果分组Map中没有这种货币的条目,就创建一个
if (transactionsForCurrency == null) {
transactionsForCurrency = new ArrayList<>();
transactionsByCurrencies.put(currency, transactionsForCurrency);
}
//将当前遍历的Transaction加入同一货币的Transaction的List
transactionsForCurrency.add(transaction);
}
Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream().collect(groupingBy(Transaction::getCurrency));
预定义的收集器
从Collectors类提供的工厂方法(例如groupingBy)创建的收集器。
它们主要提供了三大功能:
- 将流元素归约和汇总为一个值
- 元素分组
- 元素分区
自定义收集器
自已实现Collector接口实现收集器功能
归约和汇总
//总和
long howManyDishes = menu.stream().collect(counting());
//平均
double avgCalories = menu.stream().collect(averagingInt(Dish::getCalories));
//最大值
Comparator<Dish> dishCaloriesComparator = Comparator.comparingInt(Dish::getCalories);
Optional<Dish> mostCaloricDish = menu.stream().collect(maxBy(dishCaloriesComparator));
//汇总 最大、最小、平均、总和 IntSummaryStatistics{count=9, sum=4300, min=120, average=477.777778, max=800}
IntSummaryStatistics intSummaryStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));
连接字符串
//连接字符串 1,2,3
String str = Arrays.asList(1, 2, 3).stream().map(String::valueOf).collect(joining(","));
int sum1 = menu.stream().collect(Collectors.summingInt(Dish::getCalories));
int sum2 = menu.stream().collect(Collectors.reducing(0, Dish::getCalories, (i, j) -> i + j));
int sum3 = menu.stream().collect(Collectors.reducing(0, Dish::getCalories, Integer::sum));
int sum4 = menu.stream().map(Dish::getCalories).reduce(Integer::sum).get();
int sum5 = menu.stream().mapToInt(Dish::getCalories).sum();
注意:stream接口的collect和reduce方法通常可以获取相同的结果,尽可能的为手头的问题探索不同的解决方案,但在通用的方案中,始终选择最专门化的一个。
分组
分组示例:
//在鱼类型分类
Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(groupingBy(Dish::getType));
//把热量不到400卡路里的菜划分为“低热量”(diet),热量400到700卡路里的菜划为“普通”(normal),高于700卡路里的划为“高热量”(fat)。
Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(
groupingBy(dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
}));
多级分组:
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishedByTypeAndCaloricLevel = menu.stream().collect(
groupingBy(Dish::getType,
groupingBy((Dish dish) -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
})
)
);
多级分组过程:
按子组收集数据
//每类菜有多少个
Map<Dish.Type, Long> typesCount = menu.stream().collect(groupingBy(Dish::getType, counting()));
//每类菜的总卡路里数
Map<Dish.Type, Integer> sumCaloriesByType = menu.stream().collect(groupingBy(Dish::getType,
summingInt(Dish::getCalories)));
//每类菜的最高热量的菜
Map<Dish.Type, Optional<Dish>> mostCaloriesByType = menu.stream().collect(groupingBy(Dish::getType,
maxBy(Comparator.comparingInt(Dish::getCalories))));
//每类菜的最高热量的菜-去除Optional
Map<Dish.Type, Dish> mostCaloricDishesByTypeWithoutOptional = menu.stream().collect(
groupingBy(Dish::getType,
collectingAndThen(
maxBy(Comparator.comparingInt(Dish::getCalories)),
Optional::get)));
分区
分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组——true是一组,false是一组。
Map<Boolean, List<Dish>> partitionByVegeterian = menu.stream().collect(partitioningBy(Dish::isVegetarian));
结果:{false=[pork, beef, chicken, prawns, salmon], true=[French fries, rice, season fruit, pizza]}
并行流
可以通过对收集源调用parallelStream方法来把集合转换为并行流。
并行流内部使用了默认的ForkJoinPool(分支/合并框架),它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()得到的。
// 使用这个属性可以修改默认的线程数
System.setProperty(“java.util.concurrent.ForkJoinPool.common.parallelism”,”20”);
这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议你不要修改它。
public static long parallelSum(List<Long> list) {
return list.stream().parallel().reduce(Long::sum).get();
}
使用并行流时注意事项:
1、要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高
2、尽量不要有共享变更的修改,如果有,要注意线程安全(加锁的话,会出现中午执行)