慕课网Flink入门课程笔记
视频地址:https://www.imooc.com/video/17914
1 简介
Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,提供支持流处理和批处理两种类型的应用功能。
现有的开源计算方案中,将流处理和批处理作为两类不同的应用类型:流处理一般支持低延迟、Exactly-once保证(仅用一次),而批处理则是需要支持高吞吐、高效处理。
Flink完全支持流处理,作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,其输入数据流有界。
Deployment层
涉及Flink的部署模式,本地模式、集群模式(Standalone/YARN)、云。生产环境中主要为YARN模式。RunTime层
提供了支持Flink计算的全部核心实现,例如:支持分布式的Stream处理、调度策略等,为上层API服务。API层
实现面向无界Stream的流处理和面向Batch的批处理API,分别为DataStream API和DataSet API。Libaries层
在API层之上构建的满足特定应用的实现计算框架,分别用于面向流处理和批处理。
CEP面向事件,Table用于类SQL,FlinkML用于机器学习,Gelly为图处理。
2 Flink的优势和对比
高吞吐、低延迟、高性能的流处理。
支持高度灵活的窗口操作。
支持有状态计算的Exactly-once语义。
提供DataStream和DataSet API。
3 基本概念和编程模型
基本概念
Flink程序的基础构建模块是流(Streams)和转换(transformations)。流就是数据的输入,转换是对数据的操作。每个流都起始于一个或多个source,并终止于一个sink。
流上的聚合需要由窗口来划定范围,比如时间区间和数量的序号范围等。窗口分为多个类型,滚动窗口(没有数据重叠),滑动窗口(有数据重叠),会话窗口(由不活动的间隙打断)。
上图中的sliding size 是滑动窗口在滑动过程中的时间间隔。
滚动窗口有滚动时间窗口和滚动数据窗口,前者划定一段时间内的数据,后者等待数据规模到达阈值。
基本架构
Flink是基于Master-Slave风格的架构,其启动时会启动JobManager进程和至少一个TaskManager进程。
JobManager
系统的协调者,负责接收Job,调度组成Job的多个Task的执行。
收集Job的状态信息,管理从节点TaskManager。TaskManager
实际负责执行计算的工作节点,在其上执行Task。
TaskManager负责管理其所在节点上的资源信息,并在启动时将这些信息向JobManager汇报。Client
用户提交Flink程序的时候,会先创建一个Client,该Client对提交的程序进行预处理后在提交到Flink集群。
Client会将用户提交的Flink程序组成一个JobGraph,并以这种形式提交。
4 安装与使用
通过Git获取源码进行安装:
或是下载编译好的二级制文件。
如果是为了部署生产环境,则需要下载与Hadoop兼容的版本。
解压后进入bin目录运行启动脚本:
windows下直接启动.bat文件。
其UI界面端口默认为8081:
5 示例程序
windows下通过flink run ../example ... *.jar --port *
可以运行一个flink自带的示例程序
通过curl来创建一个maven项目。
$ curl https://flink.apache.org/q/quickstart.sh | bash
wordcount程序如下:
package org.myorg.quickstart;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import scala.Int;
/**
* Created by Edwin_1993 on 2019/3/3.
*/
public class WordCount {
public static void main(String[] args) throws Exception {
//flink 上下文
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(
"To be or not to be,--that is a question--",
"hello everyone, my name is Lileilei",
"Fine,thank you and you?"
);
DataSet<Tuple2<String,Integer>> counts =
text.flatMap(new LineSplitter())
.groupBy(0) //0至数组中的第0号元素 eg:(tuntun,1)
.sum(1);
counts.print();
}
public static final class LineSplitter implements FlatMapFunction<String,Tuple2<String, Integer>>{
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// 表示匹配数字、字母、下划线和加号本身字符
String[] tokens = value.toLowerCase().split("\\W+");
for (String token:tokens){
if(token.length() > 0){
out.collect(new Tuple2<String ,Integer>(token,1));
}
}
}
}
}
部分输入日志如下:
6 Flink常用算子
算子是在Transformation过程中使用的,主要分为DatasetAPI 和DataStreamAPI
Stream中有类似的API
也有特有的API:
7 应用实例
添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
创建类:
package org.myorg.quickstart;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
/**
* Created by Edwin_1993 on 2019/3/3.
*/
public class WikipediaAnalysis {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());
// keyedEdits为无边界的流输入
KeyedStream<WikipediaEditEvent,String> keyedEdits = edits
.keyBy((KeySelector<WikipediaEditEvent,String >) event->{
return event.getUser();
});
// 将输入进行Sink
DataStream<Tuple2<String,Long>> result = keyedEdits
.timeWindow(Time.seconds(5))
// fold的两个参数,第一个是初始值:new Tuple2<>("", 0L)
// 第二个是foldfunction,对fold功能进行编写
.fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> fold(Tuple2<String, Long> stringLongTuple2, WikipediaEditEvent event) throws Exception {
stringLongTuple2.f0 = event.getUser();
stringLongTuple2.f1 += event.getByteDiff();
return stringLongTuple2;
}
});
result.print();//可以落地到其它地方,kafka或是redis
see.execute();
}
}
部分结果: