Flink应用开发

项目构建

项目模板

Flink应用项目可以使用Maven或SBT来构建项目,Flink针对这些构建工具提供了相应项目模板。
Maven模板命令如下,我们只需要根据提示输入应用项目的groupId、artifactId、version和package路径即可。

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.2

目录结构和我们使用IDEA创建的目录结构基本一样,只是它会帮我们引入Flink依赖和日志依赖。

<flink.version>1.7.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
...
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

flink-java和flink-streaming-java_2.11是我们使用Java开发Flink应用程序的必要依赖。

默认也帮我们引入maven-shade-plugin插件,所以在打包的时候记得将mainClass改成自己的主类。

SBT模板可以使用以下命令获取:

sbt new tillrohrmann/flink-project.g8

SBT版本需要大于等于0.13.13版本。

应用程序依赖

Flink应用程序开发依赖项可以分为两类:

  • Flink核心依赖(Flink core Dependencies):它是Flink运行系统所需的类和依赖项,也就是Flink项目的核心代码和所使用的依赖。比如实现的:调度、通信、checkpoint、API等。我们上面所引入的就是Flink的核心依赖,对于核心依赖我们只需要将依赖作用范围scope设置为provided即可,也就是不将依赖打入jar包。因为对于这些核心依赖,Flink运行集群能够为我们提供。

  • 应用程序依赖(User Application Dependencies):这部分就是我们开发应用程序所需要的一些其它依赖项,比如连接器、格式化库、Flink CEP、Flink SQL、Flink ML等。在打包应用程序的时候,我们需要将这些依赖项与我们的应用程序代码一同打入到一个jar包中。

//Flink核心依赖
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.7.2</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.7.2</version>
  <scope>provided</scope>
</dependency>
//应用程序依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
    <version>1.7.2</version>
</dependency>

Flink 开发

Flink应用程序一般先是通过数据源创建分布式数据集合,比如读取文件、Kafka或本地缓存。然后对分布式集合进行各种转换操作,比如过滤、聚合、映射、修改状态、定义窗口等。最后通过接收器(sink)返回结果,结果可以写入文件(分布式)、DB或标准化输出。

根据数据源的类型,也就是有界数据源(bounded)或无界数据源(unbounded),我们可以编写批处理程序(batch)和流处理程序(streaming)。Flink对于批处理程序和流处理程序提供了不同的API,其中DataSet API用于批处理,DataStream API用于流处理。尽管提供的API不同,但是Flink底层数据处理方式是一致的。

在Flink程序中DataSet和DataSteam用来表示程序中的数据集合,这些数据集是不可变的(immutable)。DataSet代表有限的数据集,而DataSteam代表无限的数据集合。

编写Flink应用程序

编写Flink应用程序基本可以分为以下5个步骤:

  1. 获取应用程序的执行环境(execution environment)。
  2. 加载/创建初始数据集合。
  3. 对数据集执行转换操作(transformation)。
  4. 指定计算结果输出。
  5. 触发程序执行。

DataSet API在org.apache.flink.api.java 包中;DataStream API在org.apache.flink.streaming.api包中。

批处理和流处理的程序步骤是一致的,下面给出流处理作业的编写步骤。

获取应用程序执行环境

获取执行环境是Flink程序的基础。对于流处理程序的执行环境为StreamExecutionEnvironment,我们可以通过StreamExecutionEnvironment静态方法获取。对于批处理作业执行环境为ExecutionEnvironment,同样需要使用ExecutionEnvironment获取。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,port,jarFile);

一般我们只需要通过getExecutionEnvironment()方法获取执行环境即可,因为他会根据当前环境自动创建合适的执行环境,比如我们在本地IDE执行程序,它将创建一个本地执行环境(local environment),该环境将在本地机器执行;如果我们将应用程序打成jar包交给Flink集群执行,getExecutionEnvironment()将返回一个集群执行环境。createLocalEnvironment()会创建一个本地执行环境,createRemoteEnvironment()会创建一个远程执行环境。
同理,批处理执行环境ExecutionEnvironment以同样方式创建执行环境。

加载/创建初始数据集合

加载/创建初始数据集合,一般主要是读取分布式文件、读取Kafka队列等。

//通过执行环境提供的方法读取外部数据集
DataStreamSource<String> textFile =  env.readTextFile("/Users/yangjianzhang/development/flink-1.7.2/LICENSE");

读取数据集之后会返回一个DataStream(DataStreamSource是DataStream子类,用于获取数据源数据集的),我们之后的转换、输出存储操作都可以通过DataStream提供的API进行操作了。

执行转换操作

我们能够通过DataSteam提供的API进行各种转换(transformation)操作。转换函数我们可以通过三种方式实现:实现接口、匿名类和Lambda表达式。

//实现接口
class MyMap implements MapFunction<String,String> {
            @Override
            public String map(String s) throws Exception {
                return s.toUpperCase();
            }
}
data.map(new MyMap());
//通过匿名类方式
textFile.map(new MapFunction<String, String[]>() {
            @Override
            public String[] map(String s) throws Exception {
                return s.split(" ");
            }
        });
//使用Lambda表达式
DataStream<String> filterLine  = textFile.filter(line -> line.contains("flink"));

转换操作除了提供了基础接口(比如MapFunction),还提供了丰富函数(Rich Function)。Rich Function除了提供用户定义函数,还提供了其它四个方法:open、close、getRuntimeContext和setRuntimeContext。在一些场景,这些方法都是很有用的。

指定计算结果输出

我们可以将计算结果打印出来,也可以直接将结果写入到文件中。

filterLine.print();     
filterLine.writeAsText("/Users/yangjianzhang/development/data/flink/contains.txt");

触发程序执行

Flink应用程序是懒执行的(lazy execution)。也就是说当程序main方法被执行时,数据的加载和转换并不会立即触发,而是会将每一步操作添加到执行计划中,当执行环境通过execute()方法显示触发时,才会进行具体的执行操作。
execute()方法会返回一个JobExecutionResult,它包含执行时间以及累加器(accumulator)结果。

//触发程序执行
JobExecutionResult result = env.execute();

通过懒执行评估(lazy evaluation)机制,我们可以构建复杂的数据处理程序,Flink会将整个执行计划作为一个执行单元来一起执行。

Demo

下面是根据上面编写程序步骤给出的完整Demo:

public class DemoStreamingJob {
    public static void main(String[] args) throws Exception {
        //Streaming process

        //step1 获取/创建执行环境
        //自动选择正确的执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //step2 加载/创建初始数据
        DataStreamSource<String> textFile =  env.readTextFile("/Users/yangjianzhang/development/flink-1.7.2/LICENSE");
        //step3  对数据源数据进行转换操作
        DataStream<String> filterLine  = textFile.filter(line -> line.contains("flink"));

        //step4 指定计算结果输出位置
 filterLine.writeAsText("/Users/yangjianzhang/development/data/flink/contains.txt");

        //step5 最后触发程序执行
        JobExecutionResult result = env.execute();
    }
}

指定key操作

Flink数据模型并不是基于键值对的,所以我们不需要将数据放到键值对中再传递给Flink。但是有时许多转换操作需要基于key值进行操作,比如join、groupBy、Reduce、Aggregator等等。
Flink针对上面这些问题,提出了“虚拟”key的概念。也就是说在传递过来的数据上,通过指定具体数据项为这个消息的key。
比如我们传递进来的数据是一个tuple元组:

("yjz",27,10234),
("yjz",27,21456)
("ls",28,12345)

我们可以指定元组中的第一个元素为key。

DataStream<Tuple3<String,Integer,Long>> streamInput = ...;
DataStream<...> windowed =  streamInput.keyBy(0).window(...);

如果是DataSet可以通过groupBy来指定key:

DataSet<Tuple3<String,Integer,Long>> inputDataSetTuple = ...;
DataSet<...> reduced =  inputDataSetTuple.groupBy(0).reduce(...);

上面只是针对tuple类型数据通过位置来简单的指定key,下面我们看一下Flink都支持了哪些指定key的方式。

定义Tuple类型中的key

上面我们已经讲了可以通过指定position来指定虚拟key,我们还可以使用的的更复杂一些。比如有些场景需要使用组合多个field的方式来指定key:

KeyedStream<Tuple3<String,Integer,Long>> =  streamInput.keyBy(0,1);

上面使用元组中的第一个字段和第二个字段组合成一个key来使用。

使用字段表达式(Key Expression)指定key

使用字段表达式能够更加灵活的指定key,它可以用来指定POJO对象、Tuple元组中的key。
对于POJO对象可以通过a.b的形式来指定key。比如有以下POJO对象:

class Wc{
    public User user;
    public int count; 
}
class User{
    public String name;
    public Tuple2<String,Integer> tuple;
    public int age;
}

可以使用User作为key:words.keyBy("user")。
也可以使用User中的name作为key:words.keyBy("user.name")。

对于元组类型,我们即可以直接使用下标(从0开始),也可以使用"fx"来代表,比如第一个元素则用"f0"表示(感觉和直接使用下标没有区别)。当然可以和POJO对象组合使用:words.keyBy("user.tuple.f0")。

使用key选择器

使用key选择器能够以单个元素输入,并可以返回任意类型的元素key。
下面是返回字符串类型的key:

KeyStream<String> keyed = textFile.keyBy(new KeySelector<User,String>() {
            @Override
            public String getKey(User user) throws Exception {
                return user.name;
            }
        });

Flink支持的数据类型

为了确保系统能够以确切有效的方式针对不同类型执行不同的策略,Flink提供了以下六种不同类别的数据类型。

  1. Java Tuple和Scala class类型。
  2. POJO对象类型。
  3. 原始数据类型(Primitive Type)。
  4. 常规类型(General Class Types)
  5. Values类型(自定义序列化类型)
  6. Hadoop Writable

Java Tuple和Scala class

元组类型是包含固定数量,具有各种类型字段的复合类型。Java API提供了Tuple1~Tuple25的元组类型,后面的数字代表元组中的元素个数,所以我们可以看出最多支持25个元素的元组。但是我们可以通过嵌套来存储更多的元素数据,每个字段都可以是Flink的任意类型数据(包括元组)。元组通过tuple.f0或tuple.getField(int position)来获取字段数据,索引从0开始。

DataStreamSource<Tuple2<String,Integer>> wordCounts = env.fromElements(new Tuple2<String,Integer>("hello",2),new Tuple2<String,Integer>("word",5));
DataStream<Integer> counts = wordCounts.map(word -> word.f1);
        wordCounts.keyBy(0);

Scala中的Case class可以代替Java中的Tuple类型。

POJO对象类型

如果Java或Scala类满足以下情况,则可以作为POJO对象类型被Flink处理。

  1. 类必须是public访问类型。
  2. 必须有一个无参的公共构造函数。
  3. 所有字段字段都有public类型,或者有对应的setXxx()和getXxx()方法。
  4. 类中的字段类型必须是Flink所支持的数据类型。

自己一直理解Scala中的Case Class就是Java中的POJO,但是官方文档将Java Tuple和Scala Case Class放在一起,估计是以使用方式来进行划分的。
Flink中的POJO是使用Avro序列化框架进行序列化的。

public class UserCount{
            public String name;
            public int count;

            public UserCount() {}
            public UserCount(String name,int count) {
                this.name = name;
                this.count = count;
            }
 }
DataStreamSource<UserCount> userCount = env.fromElements(new UserCount("zhangsan",1),new UserCount("lisi",4));
userCount.keyBy("name");

原始数据类型(Primitive Types)

Flink支持Java或Scala中的所有原始数据类型,比如String、int、Double等。

常规类型(General Class Types)

除了POJO类型外,Flink能够支持Java和Scala大部分类。但是对于一些包含不能被序列化的字段类、I/O流类、或其它本地资源类是不支持的。
Flink会对常规类型以黑盒方式进行操作(无法访问其内容),使用Kryo进行常规类的序列化与反序列化。

上面POJO使用Avro序列化框架,这里的常规类型使用Kryo序列化框架,原因需要考证一下。

Values类型(自定义序列化类型)

值类型是使用手动序列化与反序列化来代替通用序列化框架。通过实现org.apache.flinktypes.Value接口来自己实现序列化与反序列。当使用通用序列化框架效率比较低的时候,使用Value类型是非常合理的。比如对于一个数组,我们知道它大部分都是0,那么我们可以对非零元素进行特殊编码即可,而不是使用通用框架对所有元素进行编码。
Flink对于基本类型提供预定义的Value类型:ByteValue、ShortValue、IntValue、LongValue、FloatValue、DoubleValue、StringValue、CharValue、BooleanValue。

Hadoop Writable

我们还可以使用实现了org.apache.hadoop.Writable接口的数据类型。其中使用write()方法和readFields()方法进行序列化和反序列化。

除了上面这六种类型数据,我们还可以使用一些特殊类型,比如Scala的Either、Option、Try等,Java中自定义的Either等。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容