项目构建
项目模板
Flink应用项目可以使用Maven或SBT来构建项目,Flink针对这些构建工具提供了相应项目模板。
Maven模板命令如下,我们只需要根据提示输入应用项目的groupId、artifactId、version和package路径即可。
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.2
目录结构和我们使用IDEA创建的目录结构基本一样,只是它会帮我们引入Flink依赖和日志依赖。
<flink.version>1.7.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
...
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
flink-java和flink-streaming-java_2.11是我们使用Java开发Flink应用程序的必要依赖。
默认也帮我们引入maven-shade-plugin插件,所以在打包的时候记得将mainClass改成自己的主类。
SBT模板可以使用以下命令获取:
sbt new tillrohrmann/flink-project.g8
SBT版本需要大于等于0.13.13版本。
应用程序依赖
Flink应用程序开发依赖项可以分为两类:
Flink核心依赖(Flink core Dependencies):它是Flink运行系统所需的类和依赖项,也就是Flink项目的核心代码和所使用的依赖。比如实现的:调度、通信、checkpoint、API等。我们上面所引入的就是Flink的核心依赖,对于核心依赖我们只需要将依赖作用范围scope设置为provided即可,也就是不将依赖打入jar包。因为对于这些核心依赖,Flink运行集群能够为我们提供。
应用程序依赖(User Application Dependencies):这部分就是我们开发应用程序所需要的一些其它依赖项,比如连接器、格式化库、Flink CEP、Flink SQL、Flink ML等。在打包应用程序的时候,我们需要将这些依赖项与我们的应用程序代码一同打入到一个jar包中。
//Flink核心依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.2</version>
<scope>provided</scope>
</dependency>
//应用程序依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.2</version>
</dependency>
Flink 开发
Flink应用程序一般先是通过数据源创建分布式数据集合,比如读取文件、Kafka或本地缓存。然后对分布式集合进行各种转换操作,比如过滤、聚合、映射、修改状态、定义窗口等。最后通过接收器(sink)返回结果,结果可以写入文件(分布式)、DB或标准化输出。
根据数据源的类型,也就是有界数据源(bounded)或无界数据源(unbounded),我们可以编写批处理程序(batch)和流处理程序(streaming)。Flink对于批处理程序和流处理程序提供了不同的API,其中DataSet API用于批处理,DataStream API用于流处理。尽管提供的API不同,但是Flink底层数据处理方式是一致的。
在Flink程序中DataSet和DataSteam用来表示程序中的数据集合,这些数据集是不可变的(immutable)。DataSet代表有限的数据集,而DataSteam代表无限的数据集合。
编写Flink应用程序
编写Flink应用程序基本可以分为以下5个步骤:
- 获取应用程序的执行环境(execution environment)。
- 加载/创建初始数据集合。
- 对数据集执行转换操作(transformation)。
- 指定计算结果输出。
- 触发程序执行。
DataSet API在org.apache.flink.api.java 包中;DataStream API在org.apache.flink.streaming.api包中。
批处理和流处理的程序步骤是一致的,下面给出流处理作业的编写步骤。
获取应用程序执行环境
获取执行环境是Flink程序的基础。对于流处理程序的执行环境为StreamExecutionEnvironment,我们可以通过StreamExecutionEnvironment静态方法获取。对于批处理作业执行环境为ExecutionEnvironment,同样需要使用ExecutionEnvironment获取。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,port,jarFile);
一般我们只需要通过getExecutionEnvironment()方法获取执行环境即可,因为他会根据当前环境自动创建合适的执行环境,比如我们在本地IDE执行程序,它将创建一个本地执行环境(local environment),该环境将在本地机器执行;如果我们将应用程序打成jar包交给Flink集群执行,getExecutionEnvironment()将返回一个集群执行环境。createLocalEnvironment()会创建一个本地执行环境,createRemoteEnvironment()会创建一个远程执行环境。
同理,批处理执行环境ExecutionEnvironment以同样方式创建执行环境。
加载/创建初始数据集合
加载/创建初始数据集合,一般主要是读取分布式文件、读取Kafka队列等。
//通过执行环境提供的方法读取外部数据集
DataStreamSource<String> textFile = env.readTextFile("/Users/yangjianzhang/development/flink-1.7.2/LICENSE");
读取数据集之后会返回一个DataStream(DataStreamSource是DataStream子类,用于获取数据源数据集的),我们之后的转换、输出存储操作都可以通过DataStream提供的API进行操作了。
执行转换操作
我们能够通过DataSteam提供的API进行各种转换(transformation)操作。转换函数我们可以通过三种方式实现:实现接口、匿名类和Lambda表达式。
//实现接口
class MyMap implements MapFunction<String,String> {
@Override
public String map(String s) throws Exception {
return s.toUpperCase();
}
}
data.map(new MyMap());
//通过匿名类方式
textFile.map(new MapFunction<String, String[]>() {
@Override
public String[] map(String s) throws Exception {
return s.split(" ");
}
});
//使用Lambda表达式
DataStream<String> filterLine = textFile.filter(line -> line.contains("flink"));
转换操作除了提供了基础接口(比如MapFunction),还提供了丰富函数(Rich Function)。Rich Function除了提供用户定义函数,还提供了其它四个方法:open、close、getRuntimeContext和setRuntimeContext。在一些场景,这些方法都是很有用的。
指定计算结果输出
我们可以将计算结果打印出来,也可以直接将结果写入到文件中。
filterLine.print();
filterLine.writeAsText("/Users/yangjianzhang/development/data/flink/contains.txt");
触发程序执行
Flink应用程序是懒执行的(lazy execution)。也就是说当程序main方法被执行时,数据的加载和转换并不会立即触发,而是会将每一步操作添加到执行计划中,当执行环境通过execute()方法显示触发时,才会进行具体的执行操作。
execute()方法会返回一个JobExecutionResult,它包含执行时间以及累加器(accumulator)结果。
//触发程序执行
JobExecutionResult result = env.execute();
通过懒执行评估(lazy evaluation)机制,我们可以构建复杂的数据处理程序,Flink会将整个执行计划作为一个执行单元来一起执行。
Demo
下面是根据上面编写程序步骤给出的完整Demo:
public class DemoStreamingJob {
public static void main(String[] args) throws Exception {
//Streaming process
//step1 获取/创建执行环境
//自动选择正确的执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//step2 加载/创建初始数据
DataStreamSource<String> textFile = env.readTextFile("/Users/yangjianzhang/development/flink-1.7.2/LICENSE");
//step3 对数据源数据进行转换操作
DataStream<String> filterLine = textFile.filter(line -> line.contains("flink"));
//step4 指定计算结果输出位置
filterLine.writeAsText("/Users/yangjianzhang/development/data/flink/contains.txt");
//step5 最后触发程序执行
JobExecutionResult result = env.execute();
}
}
指定key操作
Flink数据模型并不是基于键值对的,所以我们不需要将数据放到键值对中再传递给Flink。但是有时许多转换操作需要基于key值进行操作,比如join、groupBy、Reduce、Aggregator等等。
Flink针对上面这些问题,提出了“虚拟”key的概念。也就是说在传递过来的数据上,通过指定具体数据项为这个消息的key。
比如我们传递进来的数据是一个tuple元组:
("yjz",27,10234),
("yjz",27,21456)
("ls",28,12345)
我们可以指定元组中的第一个元素为key。
DataStream<Tuple3<String,Integer,Long>> streamInput = ...;
DataStream<...> windowed = streamInput.keyBy(0).window(...);
如果是DataSet可以通过groupBy来指定key:
DataSet<Tuple3<String,Integer,Long>> inputDataSetTuple = ...;
DataSet<...> reduced = inputDataSetTuple.groupBy(0).reduce(...);
上面只是针对tuple类型数据通过位置来简单的指定key,下面我们看一下Flink都支持了哪些指定key的方式。
定义Tuple类型中的key
上面我们已经讲了可以通过指定position来指定虚拟key,我们还可以使用的的更复杂一些。比如有些场景需要使用组合多个field的方式来指定key:
KeyedStream<Tuple3<String,Integer,Long>> = streamInput.keyBy(0,1);
上面使用元组中的第一个字段和第二个字段组合成一个key来使用。
使用字段表达式(Key Expression)指定key
使用字段表达式能够更加灵活的指定key,它可以用来指定POJO对象、Tuple元组中的key。
对于POJO对象可以通过a.b的形式来指定key。比如有以下POJO对象:
class Wc{
public User user;
public int count;
}
class User{
public String name;
public Tuple2<String,Integer> tuple;
public int age;
}
可以使用User作为key:words.keyBy("user")。
也可以使用User中的name作为key:words.keyBy("user.name")。
对于元组类型,我们即可以直接使用下标(从0开始),也可以使用"fx"来代表,比如第一个元素则用"f0"表示(感觉和直接使用下标没有区别)。当然可以和POJO对象组合使用:words.keyBy("user.tuple.f0")。
使用key选择器
使用key选择器能够以单个元素输入,并可以返回任意类型的元素key。
下面是返回字符串类型的key:
KeyStream<String> keyed = textFile.keyBy(new KeySelector<User,String>() {
@Override
public String getKey(User user) throws Exception {
return user.name;
}
});
Flink支持的数据类型
为了确保系统能够以确切有效的方式针对不同类型执行不同的策略,Flink提供了以下六种不同类别的数据类型。
- Java Tuple和Scala class类型。
- POJO对象类型。
- 原始数据类型(Primitive Type)。
- 常规类型(General Class Types)
- Values类型(自定义序列化类型)
- Hadoop Writable
Java Tuple和Scala class
元组类型是包含固定数量,具有各种类型字段的复合类型。Java API提供了Tuple1~Tuple25的元组类型,后面的数字代表元组中的元素个数,所以我们可以看出最多支持25个元素的元组。但是我们可以通过嵌套来存储更多的元素数据,每个字段都可以是Flink的任意类型数据(包括元组)。元组通过tuple.f0或tuple.getField(int position)来获取字段数据,索引从0开始。
DataStreamSource<Tuple2<String,Integer>> wordCounts = env.fromElements(new Tuple2<String,Integer>("hello",2),new Tuple2<String,Integer>("word",5));
DataStream<Integer> counts = wordCounts.map(word -> word.f1);
wordCounts.keyBy(0);
Scala中的Case class可以代替Java中的Tuple类型。
POJO对象类型
如果Java或Scala类满足以下情况,则可以作为POJO对象类型被Flink处理。
- 类必须是public访问类型。
- 必须有一个无参的公共构造函数。
- 所有字段字段都有public类型,或者有对应的setXxx()和getXxx()方法。
- 类中的字段类型必须是Flink所支持的数据类型。
自己一直理解Scala中的Case Class就是Java中的POJO,但是官方文档将Java Tuple和Scala Case Class放在一起,估计是以使用方式来进行划分的。
Flink中的POJO是使用Avro序列化框架进行序列化的。
public class UserCount{
public String name;
public int count;
public UserCount() {}
public UserCount(String name,int count) {
this.name = name;
this.count = count;
}
}
DataStreamSource<UserCount> userCount = env.fromElements(new UserCount("zhangsan",1),new UserCount("lisi",4));
userCount.keyBy("name");
原始数据类型(Primitive Types)
Flink支持Java或Scala中的所有原始数据类型,比如String、int、Double等。
常规类型(General Class Types)
除了POJO类型外,Flink能够支持Java和Scala大部分类。但是对于一些包含不能被序列化的字段类、I/O流类、或其它本地资源类是不支持的。
Flink会对常规类型以黑盒方式进行操作(无法访问其内容),使用Kryo进行常规类的序列化与反序列化。
上面POJO使用Avro序列化框架,这里的常规类型使用Kryo序列化框架,原因需要考证一下。
Values类型(自定义序列化类型)
值类型是使用手动序列化与反序列化来代替通用序列化框架。通过实现org.apache.flinktypes.Value
接口来自己实现序列化与反序列。当使用通用序列化框架效率比较低的时候,使用Value类型是非常合理的。比如对于一个数组,我们知道它大部分都是0,那么我们可以对非零元素进行特殊编码即可,而不是使用通用框架对所有元素进行编码。
Flink对于基本类型提供预定义的Value类型:ByteValue、ShortValue、IntValue、LongValue、FloatValue、DoubleValue、StringValue、CharValue、BooleanValue。
Hadoop Writable
我们还可以使用实现了org.apache.hadoop.Writable
接口的数据类型。其中使用write()方法和readFields()方法进行序列化和反序列化。
除了上面这六种类型数据,我们还可以使用一些特殊类型,比如Scala的Either、Option、Try等,Java中自定义的Either等。