函数式编程的概念
简单说,"函数式编程"是一种"编程范式"(programming paradigm),也就是如何编写程序的方法论。
它属于"结构化编程"的一种,主要思想是把运算过程尽量写成一系列嵌套的函数调用
为什么要使用函数式编程,和命令是编程有什么不同点和优点呢?
最重要的就是他们的关注点不一样,命令式编程里面我们关注的是怎么样做,而函数式编程里我们关注的是做什么,也就是说在命令式编程里你要告诉程序要怎么样做才能实现一个功能,而函数式编程里面你不需要告诉它怎么样做,你只告诉它你要实现什么样的功能,而不需要关注实现的细节,这样说还是有些抽象,我么们来以一个具体里例子来说明一下。
- 假设我们现在有个需求,需要在一堆数字里面找到最小的一个,你会怎么做呢?
命令式编程
public class LombokTest {
public static void main(String[] args) {
int[] nums = {5,52,99,68,3,-12};
int min = Integer.MAX_VALUE;
for(int i : nums){
if(i < min){
min = i;
}
}
System.out.println("最小值="+min);
}
}
结果
最小值=-12
这种解决思路就是命令式编程,我们要定义变量,我们要用 for 循环,要把每个数字跟最小值对比一下,如果符合还要把最小值在更新一下,这就是命令式编程,我们要告诉程序所有实现的细节。
函数式编程
// jdk 1.8
int asInt = IntStream.of(nums).min().getAsInt();
System.out.println("jdk1.8 函数式编程="+asInt);
结果
jdk1.8 函数式编程=-12
我们可以看到结果是一样
在比如这一堆数字比较大,可能有几亿条,在命令式编程里面,使用一个 for 循环性能可能不达标,我们可能要使用多线程,要定义线程池,要把数据拆分,最后得到一个最小值,这要做还是比较麻烦的我们所有的工作都需要自己去做,而在函数式编程里面在这种并行的环境下你不需要做这么多细节,只需要调用一个函数,就可以得到想要的结果
// 并行
int pInt = IntStream.of(nums).parallel().min().getAsInt();
System.out.println("parallel result = "+pInt);
结果
parallel result = -12
我们看到结果还是一样的 ,这样写是不是爽多了?
函数式编程还有一个比较明显的有点,使用 lombok 表达式可以使代码更加的简短,好读。
- 比如线程的创建
jdk1.8 之前
new Thread(new Runnable(){
@Override
public void run() {
System.out.println("jdk 1.8 之前创建线程");
}
}).start();
jdk1.8 之后
new Thread(() -> {
System.out.println("jdk 1.8 之后创建线程");
}).start();
函数式带输入输出参数的写法
所谓的函数式接口,是指只有一个抽象方法(这里注意不是只有一个方法,1.8以及后 interface中可以有默认实现的方法关键字是default,如果他有两个及以上抽象方法,就不是函数式接口)的接口,jdk8帮我们提供了一个注解,帮助我们检查编译时是否合格 @FunctionInterface,允许有端个 default 修饰的非抽象方法,default关键字在API中的解释为,意思是默认方法能够向库的接口添加新功能,并确保与为这些接口的旧版本编写的代码兼容。并且不用在其子类进行逐个实现。
@FunctionalInterface
interface NumsFunciton{
int numSums(int i);
// 默认实现的方法 不需要在子类中实现
default int bool(int b){
return b * 5;
}
default int boola(int b){
return b * 8;
}
}
public class LombokTest2 {
public static void main(String[] args) {
// 一
NumsFunciton n = (i) -> i * 5;
// 二 由于只有一个参数 括号可以省略 这种最常见
NumsFunciton n1 = i -> i * 6;
// 三
NumsFunciton n2 = (int i) -> i * 7;
// 四
NumsFunciton n4 = (int i) -> {
return i * 8;
};
System.out.println(n.bool(9));
}
}
注意函数式接口多层继承 default 默认方法覆盖问题
@FunctionalInterface
interface NumsFunciton{
int numSums(int i);
// 默认实现的方法 不需要在子类中实现
default int add(){
return 18;
}
}
@FunctionalInterface
interface NumsFunciton2{
int numSums(int i);
// 默认实现的方法 不需要在子类中实现
default int add(){
return 19;
}
}
@FunctionalInterface
interface NumsFunciton3 extends NumsFunciton,NumsFunciton2{
@Override
default int add() {
return NumsFunciton.super.add();
}
}
这里可以指定调用哪个函数接口的默认方法
java 8 提供的函数式接口
interface IMoneyFormat{
// 格式化
String format(int i);
}
class MyMoney{
private final int money;
public MyMoney(int money){
this.money = money;
}
// 打印
public void printMoney(IMoneyFormat moneyFormat){
System.out.println("我的小目标:"+moneyFormat.format(money));
}
}
public class LombokTest3 {
public static void main(String[] args) {
// 初始化
MyMoney myMoney = new MyMoney(999999999);
// 使用函数式
myMoney.printMoney(i -> new DecimalFormat("#,###").format(i));
}
}
Loamb 表达式其实不关心实现的那个接口,所以他不需要知道接口的名字,也不需要知道它的方法,它是需需要知道输入什么,输出是什么,这一输入的是 int 类型,输出的是 String 类型,所有我们不需要定义接口,只需要输入是什么输出是什么
java 8 提供的函数式接口优化
class MyMoney{
private final int money;
public MyMoney(int money){
this.money = money;
}
// 使用 jdk 1.8 函数式接口 Function
public void printMoney(Function<Integer,String> moneyFormat){
System.out.println("我的小目标:"+moneyFormat.apply(this.money));
}
}
public class LombokTest3 {
public static void main(String[] args) {
// 初始化
MyMoney myMoney = new MyMoney(999999999);
// 使用函数式
myMoney.printMoney(i -> new DecimalFormat("#,###").format(i));
}
}
其结果是一样的,除了不需要定义接口外,还可以支持连式编程
// 初始化
MyMoney myMoney = new MyMoney(999999999);
// 使用函数式
// myMoney.printMoney(i -> new DecimalFormat("#,###").format(i));
Function<Integer,String> moneyFormat = i -> new DecimalFormat("#,###").format(i);
// 函数式接口的链式编程
myMoney.printMoney(moneyFormat.andThen(s -> "人民币:"+s));
结果
我的小目标:人民币:999,999,999
常见的函数式接口
函数式接口简单示例
System.out.println("=================== 断言函数接口 ===================");
Predicate<Integer> predicate = i -> i > 10;
System.out.println(predicate.test(9));
System.out.println("=================== 消费函数接口 没有返回值 ===================");
Consumer<String> consumer = s -> System.out.println(s);
consumer.accept("哈哈哈");
System.out.println("=================== 该接口就一个抽象方法get方法,该接口在JAVA8之函数式接口返回实例篇中第一个示例就是利用的该接口 ===================");
System.out.println("=================== 不用传入任何参数,直接返回一个泛型T的实例.就如同无参构造一样 ===================");
Supplier<LombokTest4> supplier = LombokTest4 :: new;
LombokTest4 r1 = supplier.get();
LombokTest4 r2 = supplier.get();
System.out.println("实例1="+r1);
System.out.println("实例2="+r2);
System.out.println("实例2=实例2,"+(r1==r2));
System.out.println("=================== 消费函数接口 没有返回值 ===================");
BiFunction<Integer,Integer,String> biFunction = (x,y) -> {
return x+y+"";
};
System.out.println(biFunction.apply(5, 8));
结果
=================== 断言函数接口 ===================
false
=================== 消费函数接口 没有返回值 ===================
哈哈哈
=================== 该接口就一个抽象方法get方法,该接口在JAVA8之函数式接口返回实例篇中第一个示例就是利用的该接口 ===================
=================== 不用传入任何参数,直接返回一个泛型T的实例.就如同无参构造一样 ===================
实例1=com.lombok.LombokTest4@76fb509a
实例2=com.lombok.LombokTest4@300ffa5d
实例2=实例2,false
=================== 消费函数接口 没有返回值 ===================
13
方法的引用
class Dog{
private String name = "哮天犬";
// 默认十斤狗粮
private int food = 100;
public Dog(){}
public Dog(String name){
this.name = name;
}
/**
* 狗叫的静态方法
* @param dog
*/
public static void bark(Dog dog){
System.out.println(dog.name + "叫了");
}
/**
* 吃狗粮
* jdk 默认会把当前实例传入到非静态方法,参数名为 this,位置是第一个
* 所以子在使用类名引用的时候实际上是有两个参数
* 所以在费静态方法中可以使用 this 关键字
* @param num
* @return
*/
public int eat(Dog this,int num){
System.out.println("吃了"+num+"斤狗粮");
this.food -= num;
return this.food;
}
@Override
public String toString() {
return this.name;
}
}
public class LombokTest5 {
public static void main(String[] args) {
// 方法调用
Consumer<String> consumer = s -> System.out.println(s);
consumer.accept("-----");
// lombok 是一个匿名的函数,左边是方法参数,右边是执行体
// 当执行体的参数和箭头左边的类型是一样的时候就可以缩写,写成方法引用的形式
Consumer<String> consumer1 = System.out::println;
consumer1.accept("这是方法调用");
System.out.println("================= 静态方法的方法引用 类名+方法名 ================");
Consumer<Dog> dog = Dog::bark;
dog.accept(new Dog());
System.out.println("================= 非静态方法的方法引用 对象实例+方法名 ================");
Dog d = new Dog();
Function<Integer,Integer> function = d::eat;
System.out.println("还剩下"+function.apply(3)+"斤狗粮");
System.out.println("================= 可以看到方法的输入输出类型一样,可以使用一元函数式接口 对象实例+方法名 ================");
UnaryOperator<Integer> unaryOperator = d::eat;
System.out.println("还剩下"+unaryOperator.apply(5)+"斤狗粮");
System.out.println("================= 还可以使用基本函数式接口 对象实例+方法名 ================");
IntUnaryOperator intUnaryOperator = d::eat;
System.out.println("还剩下"+intUnaryOperator.applyAsInt(1)+"斤狗粮");
System.out.println("================= 非静态方法的方法引用 类名+方法名 ================");
System.out.println("================= jdk 默认会把当前实例传入到非静态方法,参数名为 this,位置是第一个 ================");
System.out.println("================= 所以子在使用类名引用的时候实际上是有两个参数 ================");
System.out.println("================= 所以在费静态方法中可以使用 this 关键字 ================");
System.out.println();
System.out.println(" public int eat(Dog this,int num){ ");
System.out.println(" System.out.println(\"吃了\"+num+\"斤狗粮\");");
System.out.println(" this.food -= num; ");
System.out.println(" this.food -= num;");
System.out.println(" return this.food;");
System.out.println(" } ");
BiFunction<Dog,Integer,Integer> biFunction = Dog::eat;
System.out.println("还剩下"+biFunction.apply(new Dog(), 8)+"斤狗粮");
System.out.println("================= 没有参数的构造函数的方法引用 ================");
Supplier<Dog> supplier = Dog::new;
System.out.println("创建新的对象:"+supplier.get());
System.out.println("================= 带参数的构造函数的方法引用 ================");
Function<String,Dog> func = Dog::new;
System.out.println("创建新的对象:"+func.apply("旺财"));
}
}
结果
-----
这是方法调用
================= 静态方法的方法引用 类名+方法名 ================
哮天犬叫了
================= 非静态方法的方法引用 对象实例+方法名 ================
吃了3斤狗粮
还剩下97斤狗粮
================= 可以看到方法的输入输出类型一样,可以使用一元函数式接口 对象实例+方法名 ================
吃了5斤狗粮
还剩下92斤狗粮
================= 还可以使用基本函数式接口 对象实例+方法名 ================
吃了1斤狗粮
还剩下91斤狗粮
================= 非静态方法的方法引用 类名+方法名 ================
================= jdk 默认会把当前实例传入到非静态方法,参数名为 this,位置是第一个 ================
================= 所以子在使用类名引用的时候实际上是有两个参数 ================
================= 所以在费静态方法中可以使用 this 关键字 ================
public int eat(Dog this,int num){
System.out.println("吃了"+num+"斤狗粮");
this.food -= num;
this.food -= num;
return this.food;
}
吃了8斤狗粮
还剩下92斤狗粮
================= 没有参数的构造函数的方法引用 ================
创建新的对象:哮天犬
================= 带参数的构造函数的方法引用 ================
创建新的对象:旺财
类型推断
lombok 表达式是一个匿名函数,它最终返回了一个实现了指定接口的对象,所以要告诉它实现了哪一个接口,否则就会报错,这就是类型推断
@FunctionalInterface
interface IMath{
int add(int x,int y);
}
@FunctionalInterface
interface IMath2{
int add(int x,int y);
}
public class LombokTeset6 {
public static void main(String[] args) {
// 变量类型定义
IMath iMath = (x,y) -> x+y;
System.out.println(iMath.add(1,3));
// 数组里
IMath[] iMaths = {(x,y)->x+y,(x,y)->x*y};
System.out.println(iMaths[0].add(1,2));
System.out.println(iMaths[1].add(3,4));
// 强转
Object obj = (IMath)(x,y) -> x+y;
// 通过类型返回
IMath iMath1 = createIMath();
// 通过方法传入 等价于 变量类型定义
LombokTeset6 lombokTeset6 = new LombokTeset6();
// 注意如果有方法重载需要指定调用哪一个方法,强转即可
lombokTeset6.test((IMath) (x,y) -> x+y);
lombokTeset6.test((IMath2) (x,y) -> x+y);
}
public void test(IMath math){}
public void test(IMath2 math){}
public static IMath createIMath(){
return (x,y) -> x / y;
}
}
变量引用
String str = "aaa";
Consumer<String> consumer = s -> System.out.println(s+str);
这样写是没有问题的,大家都知道 jdk 1.8 之前匿名类应用外部变量必须是 final 的类型,jdk 1.8 之后就不用了吗?实际上也是需要写的,只是在 jdk 1.8 默认可以不写,不加 final 关键字,是实际上变量也是不能修改的,试着改一下
提示已经很清晰了,必须是一个 final 或者实际上使 final 就是说初始化之后就不能最修改了。
那么为什么匿名类引用外部变量不能修改呢?实际上在 java 里面参数传参传的是值而不是引用,外部变量和引用的变量都指向了同一个对象,当匿名类应用外部外部变量的时候,如果外部变量改变了那么其实应用的变量和外部变量是没有任何关系了,那么就有可能导致程序的不准确性,那么这时引用外部变量又有什么意义呢?所以外部变量是不能修改的。
级联表达式和柯里化
/**
* 级联表达式
*/
public class LombokTest8 {
public static void main(String[] args) {
System.out.println("==================== 级联表达式 ==================");
/**
* 例如: x -> y -> x+y
* 我们知道 lombda 表达式 左边为参数,右边是函数,所有者个表倒是最终会返回一个这样的函数
* 这里暂且认为是 Integer 类型
* Function<Integer,Function<Integer,Integer>> function = x ->y -> x+y;
*/
// 这就是级联表达式
Function<Integer,Function<Integer,Integer>> function = x ->y -> x+y;
System.out.println(function.apply(1).apply(2));
System.out.println("==================== 柯里化 ==================");
System.out.println("==================== 柯里化:把多个参数的函数转换为只有一个参数的函数 ==================");
System.out.println("==================== 柯里化的目的:函数的标准化 ==================");
Function<Integer,Function<Integer,Function<Integer ,Integer>>> f = x -> y -> z -> x * y * z;
System.out.println(f.apply(1).apply(2).apply(3));
System.out.println("==================== 柯里化的循环调用 ==================");
int num[] = {4,5,6};
for (int i = 0; i < num.length; i++) {
if(f instanceof Function){
Object obj = f.apply(num[i]);
if(obj instanceof Function){
f =(Function) obj;
}else{
System.out.println("调用结果为:"+obj);
}
}
}
}
}
结果
==================== 级联表达式 ==================
3
==================== 柯里化 ==================
==================== 柯里化:把多个参数的函数转换为只有一个参数的函数 ==================
==================== 柯里化的目的:函数的标准化 ==================
6
==================== 柯里化的循环调用 ==================
调用结果为:120
stream 流编程
概念
首先它是一个高级的迭代器(Iterator),它不是一个数据结构,不是一个集合,它不会存放数据,stream 关注的是怎样把数据高效的处理,它其实就是把数据在流水线里面处理,它和普通的 Iterator 不同的是,它可以并行遍历,普通的 Iterator 只能是串行,在一个线程中执行
外部迭代和内部迭代
使用for等进行迭代我们叫做外部迭代,使用stream流迭代叫做内部迭代,内部迭代有什么好处,当数量很大是我们不需要对数据进行拆分
int[] nums = {4,5,6};
// 外部迭代
int sum = 0;
for (int num : nums) {
sum += num;
}
System.out.println("结果是:"+sum);
// 使用 stream 的内部迭代
// 这个 of 可以理解为 new
int sum2 = IntStream.of(nums).sum();
System.out.println("结果是:"+sum2);
结果都是一样的
中间操作/种植操作和惰性求值
中间操作:返回stream的操作
终止操作:得到特定的结果
惰性求值:终止没有调用的情况下,中间操作不会执行
public class StreamTest {
public static void main(String[] args) {
int[] nums = {4,5,6};
// 外部迭代
int sum = 0;
for (int num : nums) {
sum += num;
}
System.out.println("结果是:"+sum);
// 使用 stream 的内部迭代
// 这个 of 可以理解为 new
int sum2 = IntStream.of(nums).sum();
System.out.println("结果是:"+sum2);
// map 就是中间操作,返回 stream 的的操作
// sum 就是总之操作
int result = IntStream.of(nums).map(StreamTest::mulitplyNum).sum();
System.out.println("结果为:"+result);
System.out.println("===================== 惰性求值就是始终没有调用的情况下,中间操作就会执行 =====================");
// 这一没有调用 sum 终止操作不会执行
IntStream.of(nums).map(StreamTest::mulitplyNum);
}
public static int mulitplyNum(int i){
System.out.println("执行了乘以 8 的操作");
return i * 8;
}
}
流的创建
List<String> list = new ArrayList<String>();
list.add("a");
list.add("b");
list.add("c");
// 从集合创建
list.stream().forEach(System.out::print);
list.parallelStream();
// 从数组创建
Arrays.stream(new int[]{3,6,9});
// 创建数字流
IntStream.of(4,5,6);
IntStream.rangeClosed(1,10);
// 使用 random 创建一个无限流
new Random().ints().limit(10);
// 自己生产流
Random ran = new Random();
Stream.generate(() -> ran.nextInt()).limit(20);
中间操作
无状态,有状态操作
无状态操作就是表示当前的操作跟其他元素没有依赖关系,有状态就是表示当前的结果需要依赖有些其他的元素,好比一个排序操作,就需要依赖所有的元素都计算完毕,它才有一个最终的排序结果,这就是一个有状态的操作。
public static void main(String[] args) {
String str = "my name is 007 v";
System.out.println("================= 把每个单词的长度调出来 =================");
// filter 里是一个断言,比如只打印长度大于 1 的长度
Stream.of(str.split(" ")).filter(s -> s.length() > 1)
.map(s -> s+":"+s.length()+",").forEach(System.out::print);
System.out.println();
// flatMap A -> B 属性(是个集合),最终得到所有的 A 元素里面的所有 B 属性集合
// intStream / longStream 并不是 Stream 的子类,所以要进行装箱 boxed
Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(i -> System.out.print(i+","));
System.out.println();
// 数组 char 类型
Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(i -> System.out.print((char)i.intValue()+","));
System.out.println();
System.out.println("================= peek 用户 debug ,是个中间操作,和 forEach 是终止操作 =================");
// 这里这里会打印两次,peek 一次 ,forEach 一次
Stream.of(str.split(" ")).peek(System.out::println).forEach(System.out::println);
System.out.println("================= limit 使用,主要用于无限流 =================");
// 创建一个无限流,不出出错,但是不会停止
// new Random().ints().forEach(System.out::println);
// 创建 10 个 大于 100 小于 1000 的数
new Random().ints().filter(i -> i > 100 && i < 1000).limit(10)
.forEach(System.out::println);
}
结果
================= 把每个单词的长度调出来 =================
my:2,name:4,is:2,007:3,
109,121,110,97,109,101,105,115,48,48,55,118,
m,y,n,a,m,e,i,s,0,0,7,v,
================= peek 用户 debug ,是个中间操作,和 forEach 是终止操作 =================
my
my
name
name
is
is
007
007
v
v
================= limit 使用,主要用户无限流 =================
[681, 341, 299, 861, 934, 365, 277, 747, 759, 690]
终止操作
public static void main(String[] args) {
String str = "my name is 007";
System.out.println();
System.out.println("================= 使用并行 (顺序被打乱了) =================");
str.chars().parallel().forEach(i -> System.out.print((char)i));
System.out.println();
System.out.println("================= 使用forEachOrdered 保证顺序 =================");
str.chars().parallel().forEachOrdered(i -> System.out.print((char)i));
System.out.println();
System.out.println("================= 收集到 List =================");
List<String> collect = Stream.of(str.split(" ")).collect(Collectors.toList());
System.out.println(collect.toString());
System.out.println("================= 使用 reduce 拼接字符串 =================");
Optional<String> reduce = Stream.of(str.split(" ")).reduce((x, y) -> x + "-" + y);
// 这种如果返回空 会抛异常 throw new NoSuchElementException("No value present");
System.out.println(reduce.get());
// 如果没有返回空串
System.out.println(reduce.orElse(""));
// 带初始值化值得 reduce
String reduce2 = Stream.of(str.split(" ")).reduce("", (x, y) -> x + "|" + y);
System.out.println(reduce2);
// 获取单词总长度
Integer reduce1 = Stream.of(str.split(" ")).map(s -> s.length()).reduce(0, (x, y) -> x + y);
System.out.println("获取字符串长度:"+reduce1);
System.out.println("================= max 的使用 =================");
Optional<String> max = Stream.of(str.split(" ")).max((s1, s2) -> s1.length() - s2.length());
System.out.println("长度最长的单词= "+max.get());
System.out.println("================= 使用 findFirst 短路操作 =================");
// ints() 是无限流 是不会终止的 使用 短路操作会使其终止
OptionalInt first = new Random().ints().findFirst();
System.out.println(first.getAsInt());
}
结果
================= 使用并行 (顺序被打乱了) =================
is07 0namemy
================= 使用forEachOrdered 保证顺序 =================
my name is 007
================= 收集到 List =================
[my, name, is, 007]
================= 使用 reduce 拼接字符串 =================
my-name-is-007
my-name-is-007
|my|name|is|007
获取字符串长度:11
================= max 的使用 =================
长度最长的单词= name
================= 使用 findFirst 短路操作 =================
-555829999
并行流
public class StreamTest5 {
public static void main(String[] args) {
System.out.println("============ peek 串行流 单线程 ==============");
// 单线程
IntStream.range(1,5).peek(StreamTest5::debug).count();
System.out.println("============ parallel 并行流 多线程 ==============");
// 多线程并行调用 parallel
IntStream.range(1,5 ).parallel().peek(StreamTest5::debug).count();
System.out.println("============ 现在要实现一个这样的效果,先并行,在串行 ==============");
System.out.println("============ 多次调用 parallel / sequential ,以最后此一次调用为准 ==============");
IntStream.range(1,5)
//调用 parallel 产生并行流
.parallel().peek(StreamTest5::debug)
// 调用 sequential 产生串行流
.sequential().peek(StreamTest5::debug)
.count();
System.out.println("============ 并行流使用的线程池是:ForkJoinPool.commonPool ==============");
System.out.println("============ 默认线程数是当前 CPU 的个数 ==============");
System.out.println("============ 使用 System.setProperty(\"java.util.concurrent.ForkJoinPoll.common.parallelism\",\"20\") 修改默认线程数 ==============");
System.setProperty("java.util.concurrent.ForkJoinPoll.common.parallelism","20");
IntStream.range(1,5).parallel().peek(StreamTest5::debug2).count();
System.out.println("============ 使用自己的线程池,不使用默认的线程池,防止任务被阻塞 ==============");
ForkJoinPool pool = new ForkJoinPool(20);
pool.submit(() -> IntStream.range(1,5).parallel()
.peek(StreamTest5::debug2).count());
// 关闭线程池
pool.shutdown();
// 让守护线程等待,防止主线程运行完 看不到效果
synchronized (pool){
try {
pool.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void debug(int i){
System.out.println("debug:"+i);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void debug2(int i){
System.out.println(Thread.currentThread().getName()+" debug:"+i);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果
============ peek 串行流 单线程 ==============
debug:1
debug:2
debug:3
debug:4
============ parallel 并行流 多线程 ==============
debug:3
debug:2
debug:1
debug:4
============ 现在要实现一个这样的效果,先并行,在串行 ==============
============ 多次调用 parallel / sequential ,以最后此一次调用为准 ==============
debug:1
debug:1
debug:2
debug:2
debug:3
debug:3
debug:4
debug:4
============ 并行流使用的线程池是:ForkJoinPool.commonPool ==============
============ 默认线程数是当前 CPU 的个数 ==============
============ 使用 System.setProperty("java.util.concurrent.ForkJoinPoll.common.parallelism","20") 修改默认线程数 ==============
main debug:3
ForkJoinPool.commonPool-worker-0 debug:2
main debug:4
ForkJoinPool.commonPool-worker-0 debug:1
============ 使用自己的线程池,不使用默认的线程池,防止任务被阻塞 ==============
ForkJoinPool-1-worker-25 debug:3
ForkJoinPool-1-worker-18 debug:2
ForkJoinPool-1-worker-4 debug:1
ForkJoinPool-1-worker-11 debug:4
收集器
class Student{
private int id; // 编号
private String name; // 年龄
private int grade; // 班级
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getGrade() {
return grade;
}
public void setGrade(int grade) {
this.grade = grade;
}
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", grade=" + grade +
'}';
}
public Student(int id, String name, int grade) {
this.id = id;
this.name = name;
this.grade = grade;
}
}
public class StreamTest6 {
public StreamTest6() {
}
public static void main(String[] args) {
List<Student> students = Arrays.asList(
new Student(1,"张三",1),
new Student(2,"李四",2),
new Student(3,"王五",3),
new Student(4,"赵六",3),
new Student(5,"张三",2));
System.out.println("===================== 得到所有学生的 id ====================");
System.out.println("===================== s -> s.getAge() == Student::getAge ====================");
System.out.println("===================== Student::getAge 不会多生成一个类似 lambda$0 这样的函数,推荐使用 ====================");
// 转 List
List<Integer> ids = students.stream().map(Student::getId).collect(Collectors.toList());
System.out.println("id列表:"+ids);
System.out.println("===================== 得到所有学生的 姓名 ====================");
// 转 Set
Set<String> names = students.stream().map(Student::getName).collect(Collectors.toSet());
System.out.println("姓名列表:"+names);
// 转 TreeSet
students.stream().map(Student::getName).collect(Collectors.toCollection(TreeSet::new));
System.out.println("姓名列表:"+names);
System.out.println("===================== 统计汇总信息 ====================");
IntSummaryStatistics summaryStatistics = students.stream().collect(Collectors.summarizingInt(Student::getId));
System.out.println("年龄汇总信息:"+summaryStatistics);
System.out.println("===================== 分块(根据条件分成,true/false 两块) ====================");
Map<Boolean, List<Student>> collect = students.stream().collect(Collectors.partitioningBy(s -> s.getName() == "赵六"));
System.out.println(collect);
System.out.println("===================== 分组 ====================");
Map<Integer, List<Student>> group = students.stream().collect(Collectors.groupingBy(Student::getGrade));
System.out.println(group);
System.out.println("===================== 分组 得到每个班级学生的列表 ====================");
Map<Integer, Long> groupCount = students.stream().collect(Collectors.groupingBy(Student::getGrade, Collectors.counting()));
System.out.println("每个班级学生的人数:"+groupCount);
}
}
结果
===================== 得到所有学生的 id ====================
===================== s -> s.getAge() == Student::getAge ====================
===================== Student::getAge 不会多生成一个类似 lambda$0 这样的函数,推荐使用 ====================
id列表:[1, 2, 3, 4, 5]
===================== 得到所有学生的 姓名 ====================
姓名列表:[李四, 张三, 王五, 赵六]
姓名列表:[李四, 张三, 王五, 赵六]
===================== 统计汇总信息 ====================
年龄汇总信息:IntSummaryStatistics{count=5, sum=15, min=1, average=3.000000, max=5}
===================== 分块(根据条件分成,true/false 两块) ====================
{false=[Student{id=1, name='张三', grade=1}, Student{id=2, name='李四', grade=2}, Student{id=3, name='王五', grade=3}, Student{id=5, name='张三', grade=2}], true=[Student{id=4, name='赵六', grade=3}]}
===================== 分组 ====================
{1=[Student{id=1, name='张三', grade=1}], 2=[Student{id=2, name='李四', grade=2}, Student{id=5, name='张三', grade=2}], 3=[Student{id=3, name='王五', grade=3}, Student{id=4, name='赵六', grade=3}]}
===================== 分组 得到每个班级学生的列表 ====================
每个班级学生的人数:{1=1, 2=2, 3=2}
JDK9 Reactive Stream
概念
Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范。响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的倡议。 它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。更确切地说,Reactive流目的是“找到最小的一组接口,方法和协议,用来描述必要的操作和实体以实现这样的目标:以非阻塞背压方式实现数据的异步流”。
被压
“背压(反压)back pressure”概念很关键。首先异步消费者会向生产者订阅接收消息,然后当有新的信息可用时,消费者会通过之前订阅时提供的回调函数被再次激活调用。如果生产者发出的信息比消费者能够处理消息最大量还要多,消费者可能会被迫一直在抓消息,耗费越来越多的资源,埋下潜在的崩溃风险。为了防止这一点,需要有一种机制使消费者可以通知生产者,降低消息的生成速度。生产者可以采用多种策略来实现这一要求,这种机制称为背压。
响应式流模型非常简单——订阅者向发布者发送多个元素的异步请求,发布者向订阅者异步发送多个或稍少的元素。响应式流会在pull模型和push模型流处理机制之间动态切换。 当订阅者较慢时,它使用pull模型,当订阅者更快时使用push模型。
简单来说,在响应式流下订阅者可以与发布者沟通,如果使用JMS就应该知道,订阅者只能被动接收发布者所产生的消息数据。这就好比没有水龙头的水管一样,我只能被动接收水管里流过来的水,无法关闭也无法减少。而响应式流就相当于给水管加了个水龙头,在消费者这边可以控制水流的增加、减少及关闭。
Reactive Stream 主要接口
JDK9 通过java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 类来实现响应式流。在JDK9里Reactive Stream的主要接口声明在Flow类里,Flow 类中定义了四个嵌套的静态接口,用于建立流量控制的组件,发布者在其中生成一个或多个供订阅者使用的数据项:
- Publisher:数据项发布者、生产者
- Subscriber:数据项订阅者、消费者
- Subscription:发布者与订阅者之间的关系纽带,订阅令牌
- Processor:数据处理器只能被动接收水管里流过来的水,无法关闭也无法减少。而响应式流就相当于给水管加了个水龙头,在消费者这边可以控制水流的增加、减少及关闭。
具体案例
public class Jdk9Test {
public static void main(String[] args) throws InterruptedException {
// 1. 定义发布者,发布的数据类型是 Integer
// 直接使用 jdk 自带的 SubmissionPublisher,他实现了 Publisher 接口
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<Integer>();
// 2. 定义订阅者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<Integer>(){
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存订阅关系,需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受一个数据,处理
System.out.println("接受到数据:"+item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完调用 request 在请求一个数据
this.subscription.request(1);
// 或者已经达到了目标,调用 cancel 告诉发布者不要在接受数据了
//this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们也可以告诉发布者,后面不需要在接收数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
// 3. 发布者和订阅者建立订阅关系
publisher.subscribe(subscriber);
// 4. 产生数据,并发布
// 这里忽略数据产生的过程
for (int i = 0; i < 2; i++) {
System.out.println("生成数据:"+i);
// submit 是个 block 方法
publisher.submit(i);
}
// 5. 结束后 关闭发布者
// 正式环境,应该放 finally 或者 try-resouce 确保关闭
publisher.close();
// 主线程延迟停止,否则数据没有消费就退出了
Thread.currentThread().join(1000);
// debug 的时候,下面这行需要有断点
// 否则主线程结束无法 debug
System.out.println();
}
}
结果
生成数据:0
生成数据:1
接受到数据:0
接受到数据:1
处理完了!
/**
* 带 process 的 flow demo
*/
class MyPocessor extends SubmissionPublisher<String> implements Flow.Processor<Integer,String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存订阅关系,需要用他们来给发布者效应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一个数据,处理
System.out.println("处理器接收到了数据:"+item);
// 过滤掉小于 0 的数据,然后发布出去
if(item > 0){
this.submit("转换后的数据:"+item);
}
// 处理完调用 request 在请求一个数据
this.subscription.request(1);
// 或者已经达到了目标,调用 cancel 告诉发布者不要在接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者,后面不在接收数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理器处理完了!");
// 关不发布者
this.close();
}
}
public class Jdk9Test2 {
public static void main(String[] args) throws InterruptedException {
// 1. 定义发布者,发布数据类型是 Integer
// 直接使用 jdk 自带的 SubmissionPublisher
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<Integer>();
// 2. 定义处理器,对数据进行过滤,并转换为 String 类型
MyPocessor pocessor = new MyPocessor();
// 3. 发布者和处理器建立订阅关系
publisher.subscribe(pocessor);
// 4. 定于最终订阅者,消费 String 数据类型
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>(){
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存订阅关系,需要用它来给发布者发布响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(String item) {
// 接收到一个数据,处理
System.out.println("接收到数据:"+item);
// 处理完调用 request 在请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标,调用 cancel 告诉发布者不要在接收数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者,后面不在接收数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完(发布者关闭了)
System.out.println("处理完了!");
}
};
// 5. 处理器和最终订阅者建立订阅关系
pocessor.subscribe(subscriber);
// 6. 产生数据,并发布
// 这里忽略数据产生过程
publisher.submit(999);
publisher.submit(666);
// 7. 结束后,关闭发布者
// 正式环境应该放 finally 或者 try-resouce 确保关闭
publisher.close();
// 主线程延迟停止,否则数据没有消费就退出
Thread.currentThread().join(1000);
System.out.println();
}
}
结果
处理器接收到了数据:999
处理器接收到了数据:666
处理器处理完了!
接收到数据:转换后的数据:999
接收到数据:转换后的数据:666
处理完了!
Spring WebFlux
概念
它是 spring5 提出的一种新的开发 web 的技术站,它是非阻塞的开发模式,它运行的 netty 或者说 servlet3.1 的容器里面,它可以支持非常多的并发量,也就是说我们现在开发 web 服务多了一个选择,可以使用以前的 mvc 开发模式,也可以使用现在新的 webFlux 开发模式。
WebFlux 和 Spring MVC 关系
- 首先WebFlux 是 non-blocking 非阻塞的开发模式而传统的 mvc 是同步的阻塞开发模式,非阻塞就是我们可以在一个线程里面处理更加多的请求,而以前老的模式是一个请求对应容器里的一个线程,这是最大的不同点
- 第二个不通点就是运行的环境不一样,老的开发模式是基于 Servlet API 所以它必须运行在 Servlet 容器里面,而 WebFlux 开发模式是基于Reactive Streams 响应式流,它可以运行在 Servlet3.1 之后的容器,也就是支持一部 Servlet 的容器或者说运行在 netty 上面,其实 Spring 5 默认的容器就是 netty,
- 第三个不同点,现在新的 WebFlux 都是不支持关系型数据库的,我们以前用的 mysql ,oracle等,这些数据库都暂时无法使用
优势
最大的优势就是可以支持非常高的并发量,我们的应用随着发展并发量可能会越来越大,并发量高了之后,以前的应用可能就承受不了了,这个时候我们就要进行扩展,扩展分为两种,一种叫水平扩展,一种叫垂直扩展,简单来说水平扩展就是加人,垂直扩展就是加班,家人当人的比较容易的,好比你以前一个节点处理不了这个请求,那么我们就加几个节点,在处理不了就在加,一个加上去,那么垂直扩展呢,就是你人还是一个人,对应我们的技术就是线程还是这么多线程但是你要处理更加多的请求,那么我们就可以采用 WebFlux 这种异步的模式,可以让他在相同的线程下面支持更加多的一个请求就可以达到一个垂直扩展的目的,所以它是非常重要的一个,因为你水平扩展它对资源的要求会比较高,所以需要更多的机器,花更多的钱,但我们的垂直扩展它机器还是那么多机器,所以我们 SE 在设计框架的时候,当你的请求量上去了之后,我们第一步就可以把以前的同步 servlet ,就是以前的 mvc 这种模式改成我们现在新的 WebFlux 模式,然后在进行水平扩展,先垂直扩展在水平扩展,这就更加好的利用我们现有的资源,处理更加高的并发量。
异步 Servlet
为什么要使用异步 setvlet? 同步的 servlet 阻塞了什么?
- 同步 servlet
@WebServlet(urlPatterns = "/asynServlet")
public class SyncServelt extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
doPost(req, resp);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
long startTime = System.currentTimeMillis();
// 执行业务代码
doSomeThing(req,resp);
long endTime = System.currentTimeMillis();
System.out.println("总耗时:"+(endTime-startTime)+"毫秒");
}
// 业务代码
private void doSomeThing(HttpServletRequest request,HttpServletResponse response){
try {
// 睡眠 5 秒
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
response.getWriter().append("done");
} catch (IOException e) {
e.printStackTrace();
}
}
}
结果
总耗时:5007毫秒
可以看到同步的 servlet 从请求开始到请求结束总共耗时了 5 秒左右,从这里我们就可知道同步 servlet 到底阻塞了什么?其实就是阻塞了 tomcat 容器的 servlet 线程,当一个请求到达 tomcat 容器之后,tomcat 容器会给每一个请求开启一个线程去处理,而线程里面会调用具体的 servlet 去处理,当你使用同步 servlet 的时候,你的业务代码话费多长时间,servlet 线程就要等待多长时间,这就是阻塞
- 异步 servlet
@WebServlet(asyncSupported = true,urlPatterns = "/asyncServlet")
public class AsyncServlet extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
doPost(req,resp);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
long startTime = System.currentTimeMillis();
// 开启异步
AsyncContext asyncContext = req.startAsync();
// 执行业务代码
CompletableFuture.runAsync(() -> doSomeThing(asyncContext,asyncContext.getRequest(),asyncContext.getResponse()));
long endTime = System.currentTimeMillis();
System.out.println("总耗时:"+(endTime-startTime)+"毫秒");
}
// 业务代码
private void doSomeThing(AsyncContext asyncContext, ServletRequest request, ServletResponse response){
try {
// 睡眠 5 秒
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
response.getWriter().append("done");
} catch (IOException e) {
e.printStackTrace();
}
// 业务代码处理完毕,通知结束
asyncContext.complete();
}
}
结果
总耗时:0毫秒
从这里可以看出,异步的 servlet 几乎没有好使,这就是异步 servlet 的作用,它主要作用就是不会阻塞 tomcat servlet 线程,它可以把一些耗时的业务放在一个独立的线程池里面,那么我们的 servlet 线程就会立马返回,可以去处理下一个请求,所以它就会使用比较少的线程来达到一个比较高的吞吐量,这就是异步 servlet 的工作机制
总结
同步和异步都是后台服务端的概念,服务器后台才有异步这个概念,对于浏览器来说所有都是同步,不管后台是同步 servlet 还是异步 servlet 它前台都要花费相同的时间,第二个就是同步 servlet 到底阻塞了什么,它其实是阻塞了 tomcat servlet 的线程,使用异步 servlet 之后 ,servlet 线程就会立马返回处理下一个请求,所以它就可以达到高并发。
异步 servlet 是怎样工作的
WebFlux 开发
reactor = jdk 8 stream + jdk 9 reactive stream
Mono 0 - 1 个元素
Flux 0 - N 个元素
例子
public static void main(String[] args) {
Subscriber<Integer> subscriber = new Subscriber<Integer>(){
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系,需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受一个数据,处理
System.out.println("接受到数据:"+item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完调用 request 在请求一个数据
this.subscription.request(1);
// 或者已经达到了目标,调用 cancel 告诉发布者不要在接受数据了
//this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们也可以告诉发布者,后面不需要在接收数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
// reactor = jdk8 stream + jdk9 reactive stream
// Mono 0-1 个元素
// Flux 0-N 个元素
String[] strs = {"1","2","3"};
// 这里就是 jdk 8 的 Stream
Flux.fromArray(strs).map(s -> Integer.parseInt(s))
// 最终操作
// 这里就是 jdk9 的 reactive stream
.subscribe(subscriber);
}
reactor 是一个流,也是一个发布者,而它的最终操作就是订阅
下面这两个请求究竟有什么区别呢?
@RestController
@Slf4j
public class TestController {
@GetMapping("/1")
public String get1(){
log.info("get1 开始");
String str = createStr();
log.info("get1 结束");
return str;
}
@GetMapping("/2")
public Mono<String> get2(){
log.info("get2 开始");
Mono<String> str = Mono.fromSupplier(() -> createStr());
log.info("get2 结束");
return str;
}
private String createStr(){
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "create STR";
}
}
get1 请求
可以开到总共花的大概 3 秒钟时间
get2 请求
可以看到 新的 WebFlux 基本上是没有耗时的,从这里就能说明新的 WebFlux 返回的 Mono 实际上是一个流,由于它没有调用最终操作所以不会阻塞线程,而老的模式调用的操作花了多久,它在 Controller 里占了多久,而新的模式它其实是一个惰性求值所以它这个 Controller 不会占用那么久,这就是新的 WebFlux 返回 Mono 的意义。
SSE ( Server-Sent Events)
我们知道 Flux 可以返回多次数据,但是 http 协议是一问一答的形式,它是做到如何多次返回的呢?实际上它用的就是 Html5 SSE
示例
@WebServlet("/sse")
public class SSE extends HttpServlet {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
doPost(req,resp);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// 必须
resp.setContentType("text/event-stream");
resp.setCharacterEncoding("UTF-8");
for (int i = 0; i < 5; i++) {
// 格式 : data: + 数据 + 2个回车
resp.getWriter().write("data:"+i+"\n\n");
resp.getWriter().flush();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
结果
可以看到结果逐条返回
前端示例
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<script type="text/javascript">
// 初始化,参数为 url
// 依赖 H5
var sse = new EventSource("sse");
sse.onmessage = function(e){
console.log("message",e.data,e);
}
</script>
<body>
<H1>SSE</H1>
</body>
</html>
结果
SSE 有个特点会自动的重连,所以会不停的输出
- 另一种写法
后端
// 另一种写法 指定 标识世间
resp.getWriter().write("event:me\n");
前段
sse.addEventListener("me", function (e) {
console.log("me event",e.data,e)
});
结果
结果也是一样,也会从新连接,那么如何关闭呢?
就可以加一个判断
sse.addEventListener("me", function (e) {
console.log("me event",e.data,e)
if(e.data == 3){
sse.close();
}
});
WebFlux 示例(使用 mongodb 数据库)
- 添加 mongodb 数据库依赖
<!-- mongodb -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
- springboot 启动类添加 mongodb 支持
@EnableReactiveMongoRepositories // 开启mongodb
- 定义对象
@Document(collection = "user")
@Data
public class User {
@Id
private String id;
private String name;
private int age;
}
- 创建仓库
@Repository
public interface UserRepository extends ReactiveMongoRepository<User,String> {
}
- 编写 controller
@RestController
@RequestMapping("/user")
public class UserController {
// 以前 mvn 注入在 WebFlux 中不推荐,官方推荐使用构造方法注入
/*@Autowired
private UserRepository userRepository;*/
private final UserRepository repository;
// 官方推荐这种注入方式,这种和 spring 的耦合度会更加的底
public UserController(UserRepository repository){
this.repository = repository;
}
@GetMapping("/")
public Flux<User> getAll(){
return repository.findAll();
}
/**
* 这里推荐在返回 Flux ,就是返回多个数据的时候,我们都写两个方法
* 第一个就是一次性返回的
* 第二种就是 SSE 向流一样的
* @return
*/
@GetMapping(value = "/stream/all",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamGetAll(){
return repository.findAll();
}
}
- 启动 mongodb
-
调试
可以看到是通的
完整的 CRUD
@RestController
@RequestMapping("/user")
public class UserController {
// 以前 mvn 注入在 WebFlux 中不推荐,官方推荐使用构造方法注入
/*@Autowired
private UserRepository userRepository;*/
private final UserRepository repository;
// 官方推荐这种注入方式,这种和 spring 的耦合度会更加的底
public UserController(UserRepository repository){
this.repository = repository;
}
/**
* 以数组形式 一次性返回数据
* @return
*/
@GetMapping("/")
public Flux<User> getAll(){
return repository.findAll();
}
/**
* 这里推荐在返回 Flux ,就是返回多个数据的时候,我们都写两个方法
* 第一个就是一次性返回的
* 第二种就是 SSE 向流一样的
* @return
*/
@GetMapping(value = "/stream/all",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamGetAll(){
return repository.findAll();
}
/**
*
* 添加
* @param user
* @return
*/
@PostMapping("/")
public Mono<User> createUser(@RequestBody User user){
// 这里要注意 在 spring data jpa 里面
// 新增和修改都是 save 方法,有 id 是修改,没有 id 是新增
user.setId(null);
return this.repository.save(user);
}
/**
* 删除
* @param id
* @return
*/
@DeleteMapping("/{id}")
public Mono<ResponseEntity<Void>> deleteUser(@PathVariable("id")String id){
// deleteById 方法是没有返回值的,不能判断数据是否存在
// this.repository.deleteById(id)
return this.repository.findById(id)
// 当你要操作数据,并且要返回一个 Mono 这个时候使用 flatMap
// 如果不操作只是装换数据,使用 map
.flatMap(user -> this.repository.delete(user)
.then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK))))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
/**
* 修改数据、
* 存在的时候返回 200 和修改的数据,不存在的时候返回 404
* @param id
* @param user
* @return
*/
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> updateUser(@PathVariable("id")String id,
@RequestBody User user){
// 先根据用户 id 查询是否存在
return this.repository.findById(id)
// flatMap 操作数据
.flatMap(u -> {
u.setAge(user.getAge());
u.setName(user.getName());
return this.repository.save(u);
})
// map : 转换数据
.map(u -> new ResponseEntity<User>(u,HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
/**
* 根据 id 查找用户
* 存在返回用户信息,不存在返回 404
* @param id
* @return
*/
@GetMapping("/{id}")
public Mono<ResponseEntity<User>> findUserById(@PathVariable("id")String id){
return this.repository.findById(id)
// 存在返回用户信息
.map(u -> new ResponseEntity<User>(u,HttpStatus.OK))
// 不存在返回 404
.defaultIfEmpty(new ResponseEntity<User>(HttpStatus.NOT_FOUND));
}
}
根据年龄区间查询用户信息
数据访问层 Repository
/**
* 根据年龄查找用户信息
* @param start
* @param end
* @return
*/
Flux<User> findByAgeBetween(int start,int end);
@Query("{'age':{'$gte':20,'$lte':30}}")
Flux<User> findUserBySQL();
controller 层
/**
* 根据年龄查找用户(SSE 以流的形式返回)
* @param start
* @param end
* @return
*/
@GetMapping(value="/age/stream/{start}/{end}",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamFindUserByAge(@PathVariable("start")int start,
@PathVariable("end")int end){
return this.repository.findByAgeBetween(start,end);
}
/**
* 手写 sql
* @return
*/
@GetMapping(value="/age/sql")
public Flux<User> findUserBySQL(){
return this.repository.findUserBySQL();
}
参数校验
User 类加上,校验注解
@Document(collection = "user")
@Data
public class User {
@Id
private String id;
@NotBlank
private String name;
// 年龄最小为 10 ,最大为 100
@Range(min = 10,max = 100)
private int age;
}
为添加方法加上校验注解
/**
*
* 添加
* @param user
* @return
*/
@PostMapping("/")
public Mono<User> createUser(@RequestBody @Validated User user){
// 这里要注意 在 spring data jpa 里面
// 新增和修改都是 save 方法,有 id 是修改,没有 id 是新增
user.setId(null);
return this.repository.save(user);
}
编写异常处理类
/**
* 异常处理切面
*/
@ControllerAdvice
public class CheckAdvice {
@ExceptionHandler(MethodArgumentNotValidException.class)
public ResponseEntity<String> handleBindingException(MethodArgumentNotValidException e){
return new ResponseEntity<String>(toStr(e), HttpStatus.BAD_REQUEST);
}
/**
* 将异常转换为 字符串
* @param ex
* @return
*/
private String toStr(MethodArgumentNotValidException ex) {
return ex.getBindingResult().getFieldErrors().stream()
// 将异常转换为字符串调用 map
.map(e -> e.getField()+":"+e.getDefaultMessage())
// 将数组转换为字符串调用 reduce
.reduce("",(s1,s2) -> s1+"\n"+s2);
}
}
测试
{
"name":"",
"age":9
}
name:不能为空
age:需要在10和100之间
自定义异常(校验用户名是否合法)
编写校验逻辑类
public class CheckUtil {
// 定义不允许的常量
private static final String[] INVALID_NAMES = {"admin","administrator"};
/**
* 校验用户名是否合法,不合法抛出异常
* @param value
*/
public static void checkName(String value) {
Stream.of(INVALID_NAMES).filter(name -> name.equalsIgnoreCase(value))
// findAny 找到任何一个, ifPresent 如果存在就抛出异常
.findAny().ifPresent(v -> {
throw new CheckException("name", v);
});
}
}
为添加用户方法添加校验逻辑
@PostMapping("/")
public Mono<User> createUser(@RequestBody @Validated User user){
// 这里要注意 在 spring data jpa 里面
// 新增和修改都是 save 方法,有 id 是修改,没有 id 是新增
user.setId(null);
// 检查用户名是否合法
CheckUtil.checkName(user.getName());
return this.repository.save(user);
}
自定义异常类
@Data
public class CheckException extends RuntimeException {
private String fieldName; // 字段名
private String fieldValue; // 错误内容
public CheckException(String fieldName, String fieldValue) {
super();
this.fieldName = fieldName;
this.fieldValue = fieldValue;
}
public CheckException() {
super();
}
public CheckException(String message) {
super(message);
}
public CheckException(String message, Throwable cause) {
super(message, cause);
}
public CheckException(Throwable cause) {
super(cause);
}
protected CheckException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
异常处理类中捕获自定义异常
/**
* 用户名校验自定义异常
* @param e
* @return
*/
@ExceptionHandler(CheckException.class)
public ResponseEntity<String> handleBindingCheckNameException(CheckException e){
return new ResponseEntity<String>(toStr(e),HttpStatus.BAD_REQUEST);
}
private String toStr(CheckException ex) {
return ex.getFieldName()+":"+"错误的值"+ ex.getFieldValue();
}
测试
{
"name":"admin",
"age":25
}
name:错误的值admin
使用 Router Functions 开发
WebFlux 可以运行在以前老的 Servlet 容器,也可以运行在 Servlet 3.1 之后的容器,或者运行在 Netty 上面,那么他第一步要把这连个容器的一些共同点抽离出来
- ServletRequest 对应 HttpServletRequet
- ServletResponse 对应 HttpServletResponse
使用 Router Functions 开发一般需要以下几步
- 开发 HandlerFunction ,这个 HandlerFunction 是输入 ServletRequest 返回 ServletResponse
- 接着要开发 Router Funciton 它是把我们的请求 url 和 HandlerFunction 对应起来
- 接着我们会把 RouterFunction 包装成 HttpHandler
- 最后交给 Server 处理,这里的 Server 指的就是 Servlet3.1 之后的容器,或者 Netty
案例
- 实体类
@Document(collection = "user")
@Data
public class User {
@Id
private String id;
@NotBlank
private String name;
// 年龄最小为 10 ,最大为 100
@Range(min = 10,max = 100)
private int age;
}
- 数据访问层
@Repository
public interface UserRepository extends ReactiveMongoRepository<User,String> {
/**
* 根据年龄查找用户信息
* @param start
* @param end
* @return
*/
Flux<User> findByAgeBetween(int start, int end);
@Query("{'age':{'$gte':20,'$lte':30}}")
Flux<User> findUserBySQL();
}
- HandlerFunction
@Component
public class UserHandler {
private final UserRepository repository;
public UserHandler(UserRepository repository){
this.repository = repository;
}
/**
* 得到所有用户
* @param request
* @return
*/
public Mono<ServerResponse> getAllUser(ServerRequest request){
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(this.repository.findAll(), User.class);
}
/**
* 创建用户
* @param request
* @return
*/
public Mono<ServerResponse> createUser(ServerRequest request){
// 获取用户提交的数据
Mono<User> userMono = request.bodyToMono(User.class);
return userMono.flatMap(u -> {
// 校验用户信息
CheckUtil.checkName(u.getName());
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(this.repository.save(u), User.class);
});
}
public Mono<ServerResponse> deleteByUserId(ServerRequest request){
// 获取 rest 参数 id
// 相当于获取 url 中 @PathVariable("id)
String id = request.pathVariable("id");
// 查询是否存在
return this.repository.findById(id)
// 如果存在删除
.flatMap(user -> this.repository.delete(user)
// 返回
.then(ServerResponse.ok().build()))
// 不存在
.switchIfEmpty(ServerResponse.notFound().build());
}
}
- router Function 把 url 和 Handler Function 对应起来
@Configuration
public class AllRouters {
/***
* spring-boot-starter-web 不能和 spring-boot-starter-webflux 同时引入否则转发无效
* @param userHandler
* @return
*/
@Bean
RouterFunction<ServerResponse> userRouter(UserHandler userHandler){
return RouterFunctions.nest(
// 相当于类上面的 @RequestMapping()
RequestPredicates.path("/router/user"),
// 得到所有用户
RouterFunctions.route(RequestPredicates.GET("/"), userHandler::getAllUser)
// 创建用户
.andRoute(
RequestPredicates.POST("/").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
userHandler::createUser)
// 删除用户
.andRoute(RequestPredicates.DELETE("/{id}"), userHandler::deleteByUserId)
);
}
}
自定义异常校验
- 用户名校验类
public class CheckUtil {
// 定义不允许的常量
private static final String[] INVALID_NAMES = {"admin","administrator"};
/**
* 校验用户名是否合法,不合法抛出异常
* @param value
*/
public static void checkName(String value) {
Stream.of(INVALID_NAMES).filter(name -> name.equalsIgnoreCase(value))
// findAny 找到任何一个, ifPresent 如果存在就抛出异常
.findAny().ifPresent(v -> {
throw new CheckException("name", v);
});
}
}
- 自定义异常类
@Data
public class CheckException extends RuntimeException {
private String fieldName; // 字段名
private String fieldValue; // 错误内容
public CheckException(String fieldName, String fieldValue) {
super();
this.fieldName = fieldName;
this.fieldValue = fieldValue;
}
public CheckException() {
super();
}
public CheckException(String message) {
super(message);
}
public CheckException(String message, Throwable cause) {
super(message, cause);
}
public CheckException(Throwable cause) {
super(cause);
}
protected CheckException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
- 统一异常处理
/**
* 异常处理切面
*/
@Component
// 由于默认里有多个异常处理,所以我们要把我们的异常处理器优先级别调高
// 否则不会工作
@Order(-2)
public class CheckAdvice implements WebExceptionHandler {
@Override
public Mono<Void> handle(ServerWebExchange serverWebExchange, Throwable throwable) {
ServerHttpResponse response = serverWebExchange.getResponse();
// 设置响应头 400
response.setStatusCode(HttpStatus.BAD_REQUEST);
// 设置返回异常
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
// 异常信息
String errorMsg = toStr(throwable);
DataBuffer db = response.bufferFactory().wrap(errorMsg.getBytes());
return response.writeWith(Mono.just(db));
}
private String toStr(Throwable ex) {
// 已知异常
if(ex instanceof CheckException){
CheckException e = (CheckException) ex;
return e.getFieldName()+": invalid value "+e.getFieldValue();
}else{
// 未知异常
ex.printStackTrace();
return ex.toString();
}
}
}
WebFlux 客户端声明式 RestClient 框架开发
添加依赖
<!-- WebFlux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
创建框架基础类
创建注解,保存服务端地址信息
/**
* 服务器相关的信息
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ApiServer {
// 保存服务端请求接口
String value() default "";
}
创建方法调用信息类
/**
* 方法调用信息类
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MethodInfo {
/**
* 请求url
*/
private String url;
/**
* 请求方法
*/
private HttpMethod method;
/**
* 请求参数 (url 上)
*/
private Map<String,Object> params;
/**
* 请求 body
*/
private Mono<?> body;
private Class<?> bodyElementType;
/**
* 返回是 flux 还是 mono
*/
private boolean returnFlux;
/**
* 返回对象的类型
*/
private Class<?> returnElementType;
}
创建服务器信息类
/**
* 服务器信息类
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ServerInfo {
/**
* 服务器 url
*/
private String url;
}
创建动态代理类接口
/**
* 创建代理类接口
*/
public interface ProxyCreator {
/**
* 创建代理类
* @param type
* @return
*/
Object createProxy(Class<?> type);
}
创建请求类接口 Handler
/**
* 请求调用 handler
*/
public interface RestHandler {
/**
* 初始化服务器信息
* @param serverInfo
*/
void init(ServerInfo serverInfo);
/**
* 调用 rest 请求,返回接口
* @param methodInfo
*/
Object invokeRest(MethodInfo methodInfo);
}
创建 jdk 动态代理类实现动态代理接口类
/**
* 使用 jdk 动态代理实现代理类
*/
@Slf4j
public class JDKProxyCreator implements ProxyCreator {
/**
* 创建代理类
*
* @param type
* @return
*/
@Override
public Object createProxy(Class<?> type) {
log.info("createProxy:"+type);
// 根据接口获取得到 api 服务器信息
ServerInfo serverInfo = extractServerInfo(type);
log.info("serverInfo:"+serverInfo);
// 给每一个代理类一个实现
RestHandler restHandler = new WebClientRestHandler();
// 初始化服务器信息(初始化 webclient)
restHandler.init(serverInfo);
return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{type}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 根据方法和参数得到调用信息
MethodInfo methodInfo = extractMethodInfo(method,args);
log.info("methodInfo:"+methodInfo);
// 调用 rest
return restHandler.invokeRest(methodInfo);
}
/**
* 根据方法定义和调用参数得到调用的相关信息
* @param method
* @param args
* @return
*/
private MethodInfo extractMethodInfo(Method method, Object[] args) {
MethodInfo methodInfo = new MethodInfo();
// 得到请求 url 和 方法
extractUrlAndMethod(method,methodInfo);
// 得到请求的 param 和 body
extractRequestParamAndBody(method,args,methodInfo);
// 提取返回对象信息
extractReturnInfo(method,methodInfo);
return methodInfo;
}
/**
* 提取返回对象信息
* @param method
* @param methodInfo
*/
private void extractReturnInfo(Method method, MethodInfo methodInfo) {
// 返回是 Flux 还是 Mono
// isAssignableFrom 判断类型是否是某个类的子类
// instanceof 判断实例是否是某个类的子类
boolean isFlux = method.getReturnType().isAssignableFrom(Flux.class);
methodInfo.setReturnFlux(isFlux);
// 得到返回对象的实例类型
Class<?> elementType = extractElementType(method.getGenericReturnType());
methodInfo.setReturnElementType(elementType);
}
/**
* 得到泛型类型的实际类型
* @param genericReturnType
* @return
*/
private Class<?> extractElementType(Type genericReturnType) {
Type[] actualTypeArguments = ((ParameterizedType) genericReturnType).getActualTypeArguments();
return (Class<?>) actualTypeArguments[0];
}
/**
* 得到请求的 param 和 body
* @param method
* @param args
* @param methodInfo
*/
private void extractRequestParamAndBody(Method method, Object[] args, MethodInfo methodInfo) {
// 参数和值对应的 map
Map<String,Object> params = new LinkedHashMap<String, Object>();
methodInfo.setParams(params);
// 得到调用的参数或者 body
Parameter[] parameters = method.getParameters();
for (int i = 0; i < parameters.length; i++) {
// 参数是否带 @PathVariabole 注解
PathVariable able = parameters[i].getAnnotation(PathVariable.class);
if(able != null){
params.put(able.value(), args[i]);
}
// 是否带了 requestBody
RequestBody requestBody = parameters[i].getAnnotation(RequestBody.class);
if(requestBody != null){
methodInfo.setBody((Mono<?>)args[i]);
// 得到对象的实际类型
methodInfo.setBodyElementType(extractElementType(parameters[i].getParameterizedType()));
}
}
}
/**
* 得到请求的 url 和方法
* @param method
* @param methodInfo
*/
private void extractUrlAndMethod(Method method,MethodInfo methodInfo){
// 得到请求 url 和请求方法
Annotation[] annotations = method.getAnnotations();
for (Annotation annotation : annotations){
// GET
if(annotation instanceof GetMapping){
GetMapping getMapping = (GetMapping) annotation;
methodInfo.setUrl(getMapping.value()[0]);
methodInfo.setMethod(HttpMethod.GET);
}else if(annotation instanceof PostMapping){
// POST
PostMapping postMapping = (PostMapping) annotation;
methodInfo.setUrl(postMapping.value()[0]);
methodInfo.setMethod(HttpMethod.POST);
}else if(annotation instanceof DeleteMapping){
// DELETE
DeleteMapping deleteMapping = (DeleteMapping) annotation;
methodInfo.setUrl(deleteMapping.value()[0]);
methodInfo.setMethod(HttpMethod.DELETE);
}
}
}
});
}
/**
* 提取服务器信息
* @param type
* @return
*/
private ServerInfo extractServerInfo(Class<?> type) {
ServerInfo serverInfo = new ServerInfo();
ApiServer apiServer = type.getAnnotation(ApiServer.class);
// 设置服务器 url
serverInfo.setUrl(apiServer.value());
return serverInfo;
}
}
创建用户类
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {
private String id;
private String name;
private int age;
}
创建服务端请求接口类(地址是上面 WebFlux 完整 CRUD 项目)
/**
* 服务端请求接口
*/
@ApiServer("http://localhost:8080/user")
public interface IUserApi {
@GetMapping("/")
Flux<User> getAllUser();
@GetMapping("/{id}")
Mono<User> getUserById(@PathVariable("id")String id);
@DeleteMapping("/{id}")
Mono<Void> deleteUserById(@PathVariable("id")String id);
@PostMapping("/")
Mono<User> createUser(@RequestBody Mono<User> user);
}
创建请求调用接口类实现类,这里使用 WebClient
/**
* WebClientRestHandler
*/
public class WebClientRestHandler implements RestHandler {
private WebClient webClient;
/**
* 初始化 webClient
* @param serverInfo
*/
@Override
public void init(ServerInfo serverInfo) {
this.webClient = WebClient.create(serverInfo.getUrl());
}
/**
* 处理 rest 请求
* @param methodInfo
*/
@Override
public Object invokeRest(MethodInfo methodInfo) {
// 返回结果
Object result = null;
WebClient.RequestBodySpec request = this.webClient
// 请求方法类型
.method(methodInfo.getMethod())
// 请求 url
// 不带参数
//.uri(methodInfo.getUrl())
// 带参数
.uri(methodInfo.getUrl(), methodInfo.getParams())
// 请求类型
.accept(MediaType.APPLICATION_JSON);
WebClient.ResponseSpec retrieve = null;
// 发出请求
// 判断是否带了 body
if(methodInfo.getBody() != null){
retrieve = request.body(methodInfo.getBody(), methodInfo.getBodyElementType()).retrieve();
}else{
retrieve = request.retrieve();
}
// 异常处理
retrieve.onStatus(status -> status.value() == 404, response -> Mono.just(new RuntimeException("not found")));
// 处理 body
if(methodInfo.isReturnFlux()){
result = retrieve.bodyToFlux(methodInfo.getReturnElementType());
}else{
result = retrieve.bodyToMono(methodInfo.getReturnElementType());
}
return result;
}
}
创建配置类完成代理类初始化
@Configuration
public class ProxyConfiguration {
/**
* 创建jdk 工具类
* @return
*/
@Bean
ProxyCreator jdkProxyCreator(){
return new JDKProxyCreator();
}
@Bean
FactoryBean<IUserApi> userApi(ProxyCreator proxyCreator){
return new FactoryBean<IUserApi>() {
// 返回代理对象
@Override
public IUserApi getObject() throws Exception {
return (IUserApi) proxyCreator.createProxy(this.getObjectType());
}
@Override
public Class<?> getObjectType() {
return IUserApi.class;
}
};
}
}
创建测试类 ,测试 CRUD
@RestController
public class TestController {
@Autowired
private IUserApi userApi;
@GetMapping("/")
public void test(){
// 获取所有用户信息
//userApi.getAllUser().subscribe(System.out::println);
// 根据 id 查询用户信息
String id = "5f2a8077432a9634dba7b65c";
//userApi.getUserById(id).subscribe(System.out::println);
// 创建用户信息
//userApi.createUser(Mono.just(User.builder().name("张四丰").age(18).build()))
//.subscribe(System.out::println);
// 修改用户
userApi.updateUser(id, Mono.just(User.builder().name("张五峰").age(19).build())).subscribe(System.out::println);
userApi.getAllUser().subscribe(System.out::println);
}
}