Flink常用API详解
概述:
Flink根据抽象程度分层,提供了3种不同的API和库,每一种API在简洁性和表达力上有着不同的侧重,并且针对不同的应用场景.
ProcessFunction
概述:
ProcessFunctino是Flink所提供最底层接口. ProcessFunction可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口
内的多个事件,它提供了对于时间和状态的细粒度控制,开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发
回调函数.因此你可以利用ProcessFunctino实现许多有状态的事件驱动应用所需要的基于单个事件的复杂业务逻辑.
DataStream API
概述:
为许多通用的流处理操作提供了处理原语
这些操作包括窗口、逐条记录的的转换操作,在处理时间时进行外部数据库查询等. DataStream API支持Java 和 Scala语言,预先定义了
例如map() 、reduce() 、Aggregate()等函数,你可以通过扩展实现预定义接口或使用Java、SCala的lambda表达式实现自定义的函数
SQL& Table API:
概述:
Flink支持两种关系型的API,Table API和SQL,这两个API都是批处理,和流处理统一的API,这意味着在无边界的实时数据流和
有边界的历史记录数据流上,关系型API 会以相同的语义执行查询,并产生相同的结果.Table API 和SQL 接住了 Apache Calcite
来进行查询的解析,校验以及优化,它们可以与DataStream和DataSEt API无缝集成,并支持用户自定义的标量函数,聚合函数以及标志函数.
复杂事件处理-CEP
概述:
模式检测是事件流处理中的一个非常常见的用例. Flink 的CEP 库提供了API,使用户能够以例如正则表达式
或状态机的方式指定事件模式,CEP库与Flink的DataStream API集成,以便在DataStream上评估模式.
CEP库的应用包括网络入侵检测,业务流程监控和欺诈检测.
DataSet API
概述:
DataSet API 是Flink用于批处理应用程序的核心 API, DataSet API所提供的的基础算子 包括 map、reduce、(outer) join、co-group、
iterate等.
所有算子都有相应的算法和数据结构支持,对内存中的序列化数据进行操作. 如果数据大小超过预留内存,则过量数据将存储到磁盘.
Gallery 可扩展的图形处理和分析库.
DataStream 的编程模型,
概述:
DataStream的编程模型包括四个部分: Environment,DataSource,Trasnformation,Sink
初始化上下文环境-> 数据源-> 转换操作,=> 数据输出
Flink的DataSource数据源;
1. 基于文件的Source
读取HDFS文件系统的Source
// 首先需要配置Hadoop的依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
读取HDFS上的文件:
env.readTextFile($HDFS_PATH)
2. 基于集合的Source
env.fromCollection()
3. 基于Kafka的Source
首先需要配置Kafka连接器的依赖,另外更多的连接器可以查看官网
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.1</version>
</dependency>
3. Flink的Sink数据目标
概述:
Flink针对DataStream提供了大量的已经实现的数据目标(Sink),包括文件,Kafka,Redis,HDFS,Elasticsearch
1. 基于HDFS的Sink
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.9.1</version>
</dependency>
Streaming File Sink能把数据写入HDFS中,还可以支持分桶写入,每一个分桶就对应HDFS中的一个目录,
默认按照该小时来分桶,在一个桶内部,会进一步将输出基于滚动策略切分成更小的文件.这有助于防止桶文件变得过大,
滚动策略也是可以配置的,默认策略会更具文件大小和超时时间来滚动文件,超时时间是指没有新数据写入部分文件(part file)的时间
2. 基于Redis的Sink
3. 基于Kafka的Sink
4. 自定义的Sink
4. DataStream 转换算子
概述:
即通过从一个或多个DataStream生成新的DataStream的过程被称为Transformation操作. 在转换过程中,每种操作类型被
定义为不同的Operator,Flink程序都能够将多个Transformation组成一个DataFlow的拓扑.
1. Map[DataStream->DataStream]
调用用户自定义的MapFunction对DataStream[T]数据进行处理,形成新的DataStream[T],其中数据格式可能会发生变化,
常用作对数据集内数据的清洗和转换,例如将输入数据集中的每个数值全部加1处理,并且将数据输出到下游数据集
2. FlatMap[DataStream->DataStream]
该算子主要应用处理输入一个元素产生一个或者多个元素的计算场景,比较常见的是在经典例子WordCount中,将每一行的
文本数据切割,生成单词序列
3. Filter[DataStream->DataStream]
该算子将按照条件对输入数据集进行筛选操作,将符合条件的数据集输出,将不符合条件的数据过滤掉.
3.1 // 通过通配符
val filter:DataStream[Int] = dataStream.filter{_%2==0}
3.2 // 或者指定运算表达式
val filter:DataStream[Int] = dataStream.filter{x=> x%2==0}
4. KeyBy[DataStream->KeyedStream]
该算子根据指定的key将输入的DataStream[T]数据格式转换为KeyedStream[T],也就是在数据集中执行Partition操作,将
相同的Key值的数据放置在相同的分区中.
例如WordCount-> 将数据集中第一个参数作为Key,对数据集进行KeyBy函数操作,形成根据Id分区的KeyedStream数据集.
eg:
val dataStream = env.fromElements((1,5),(2,2),(2,4),(1,3))
//指定第一个字段为分区key
val keyedStream: KeyedStream [(String,Int),Tuple] = dataStream.keyBy(0)
5. Reduce[KeyedStream->DataStream]
该算子和MapReduce中Reduce原理基本一致,主要目的是将输入的KeyedStream通过传入的用户自定义地ReduceFunction
滚动地进行数据聚合处理,其中定义ReduceFunction必须满足运算结合律和交换律,
eg:
对传入的KeyedStream数据集中相同key值的数据独立进行求和运算,得到每个key所对应的求和值.
val dataStream = env.fromElements(("a",3),("d",4),("c",2),("a",5))
//指定第一个字段为分区key`
val keyedStream:KeyedStream[(String,Int),Tuple] = dataStream.keyBy(0)
// 滚动对第二个字段进行reduce相加求和
val reduceStream = keyedStream.reduce{(x1,x2)=>(x1._1,x1._2+x2._2)
6. Aggregations[KeyedStream->DataStream]
Aggregations是KeyedDataStream接口提供的聚合算子,根据指定的字段进行聚合操作,滚动地产生一系列数据聚合结果.
其实是将Reduce算子中的函数进行了封装,封装的聚合操作有sum、min、minBy、max、maxBy等,这样就不需要用户自己定义
Reduce函数.
eg:
指定数据集中第一个字段作为key,用第二个字段作为累加字段,然后滚动地对第二个字段的数值进行累加并输出
//指定第一个字段为分区key
val keyedStream:KeyedStream[(Int,Int),Tuple] = dataStream.keyBy(0)
// 对对第二个字段进行sum统计
val sumStream:DataStream[(Int,Int)] = keyedStream.sum(1)
// 输出计算结果
sumStream.print()
7. Union[DataStream->DataStream]
union算子主要是将两个或者多个输入的数据集合并成一个数据集,需要保证两个数据集的格式一致,输出的数据集的格式和
输入的数据集格式保持一致,
code:
//获取flink实时流处理的环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 创建不同的数据集
val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
val dataStream2 = env.fromElements(("a", 1), ("d", 1), ("c", 1), ("a", 1))
// 调用union算子进行不同的数据集合并
dataStream.union(dataStream2).print()
8. Connect,CoMap,CoFlatMap[DataStream->ConnectedStream->DataStream](只能在Stream才可以用)
connect算子主要是为了合并两种或者多种不同数据类型的数据集,合并后会保留原来原来数据集的数据类型.
例如: dataStream1数据集为(String,Int) 元组类型,dataStream2数据集为Int类型,通过connect连接算子将两个不同数据
类型的流结合在一起,形成格式为ConnectedStreams的数据集,其内部数据为[(String,Int),Int]的混合数据类型,保留了两个
原始数据集的数据类型
eg:
//获取flink实时流处理的环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建不同的数据集
val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
val dataStream2 = env.fromElements(1, 2, 4, 5)
// 连接两个DataStream数据集
val connectedStream = dataStream.connect(dataStream2)
val result = connectedStream.map(
//第一个处理函数
t1 => {
(t1._1, t1._2)
},
//第二个处理函数
t2 => {
(t2, 0)
})
result.print()
env.execute("h")
注意:Union和Connect区别
1. Union之间两个流的类型必须是一样,Connect可以不一样,在之后的coMap中在去调整成为一样的.
2. Connect只能操作两个流,Union可以操作多个
9. Split 和 select [DataStream -> SplitStream->DataStream]
Split算子是将一个DataStream数据集按照条件进行拆分,形成两个数据集的过程,也是Union算子的逆向实现,每个接入的数据
都会被路由到一个或者多个输出数据集中,
在使用Splict函数中,需要定义split函数中的切分逻辑,通过调用split函数,然后指定条件判断函数,
例如: 如下代码所示,将根据第二个字段的奇偶性将数据集标记出来,如果是偶数则标记为even,如果是奇数则标记为odd,然后通过集合将标记返回,最终生成格式SplitStream的数据集
code:
//获取flink实时流处理的环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 导入Flink隐式转换
import org.apache.flink.streaming.api.scala._
// 创建不同的数据集
val dataStream = env.fromElements(("a", 3), ("d", 4), ("c", 2), ("a", 5))
val splitedStream = dataStream.split(t => if (t._2 % 2 == 0) Seq("even") else Seq("odd"))
// Split函数本身只是对输入数据集进行标记,并没有将数据集真正的实现拆分,因此需要借助Select函数根据标记将数据切分成不同的数据集,
//筛选出偶数数据集
val evenStream = splitedStream.select("even").print()
//筛选出偶数数据集
val oddStream = splitedStream.select("odd")
env.execute("l ")
函数类和富函数类
概述:
前面学过的所有算子集合都可以自定义一个函数类,富函数类作为参数,因为Flink暴露了这两种函数类的接口,常见的函数接口:
1. MapFunction
2. FlatMapFunction
3. ReduceFunction
富函数接口它与其他常规函数接口的不同在于:可以获取运行环境的上下文,在上下文环境中可以管理状态(State),并拥有一些生命周期方法,
所以可以实现更复杂的功能.富函数的接口有:
1. RichMapFunction
2. RichFlatMapFunction
3. RichFilterFunction
1. 普通函数类举例:
按照指定的时间格式输出每个通话的拨号时间和结束时间
code:
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object FunctionClassTransformation {
case class StationLog(sid: String, var callOut: String, var callInt: String, callType: String, callTime: Long, duration: Long)
def main(args: Array[String]): Unit = {
//获取flink实时流处理的环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 导入Flink隐式转换
import org.apache.flink.streaming.api.scala._
val data = env.readTextFile(getClass.getResource("station.log").getPath)
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
// 定义时间输出格式
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// 过滤那些通话成功的
data.filter(_.callType.equals("success"))
.map(new CallMapFunction(format))
.print()
env.execute("l ")
}
class CallMapFunction(format: SimpleDateFormat) extends
MapFunction[StationLog, String] {
override def map(t: StationLog): String = {
var startTime = t.callTime
val endTime = t.callTime + t.duration * 1000
"主叫号码: " + t.callOut + " , 被叫号码: " + t.callInt + ", 呼叫起始时间: " + format.format(new Date(startTime)) + ",呼叫结束时间: " + format.format(new Date(endTime))
}
}}
result:
主叫号码: 18600003186 , 被叫号码: 18900002113, 呼叫起始时间: 2019-12-23 13:54:13,呼叫结束时间: 2019-12-23 13:54:45
主叫号码: 18600003794 , 被叫号码: 18900009608, 呼叫起始时间: 2019-12-23 13:54:13,呼叫结束时间: 2019-12-23 13:54:17
主叫号码: 18600000005 , 被叫号码: 18900007729, 呼叫起始时间: 2019-12-23 13:56:43,呼叫结束时间: 2019-12-23 14:02:32
主叫号码: 18600005404 , 被叫号码: 18900000558, 呼叫起始时间: 2019-12-23 13:54:17,呼叫结束时间: 2019-12-23 13:54:22
主叫号码: 18600003532 , 被叫号码: 18900008128, 呼叫起始时间: 2019-12-23 13:54:19,呼叫结束时间: 2019-12-23 13:54:29
主叫号码: 18600003532 , 被叫号码: 18900008128, 呼叫起始时间: 2019-12-23 13:54:26,呼叫结束时间: 2019-12-23 13:54:41
主叫号码: 18600003502 , 被叫号码: 18900009859, 呼叫起始时间: 2019-12-23 13:54:28,呼叫结束时间: 2019-12-23 13:54:28
主叫号码: 18600003502 , 被叫号码: 18900009859, 呼叫起始时间: 2019-12-23 13:54:28,呼叫结束时间: 2019-12-23 13:54:28
2. 富函数类举例:
把呼叫成功的通话信息转换成真实的用户姓名,通话用户对应的用户表(在MySql数据库中),
由于需要从数据库中查询数据,就需要创建连接,创建连接的代码必须写在生命周期的open方法中,所以需要使用富函数类.
Rich Function有一个生命周期的概念,典型的生命周期方法有:
open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用.
clsose()方法是生命周期中的最后一个调用的方法,做一些清理工作
getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
code:
package com.bjsxt.flink.transformation
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import com.bjsxt.flink.source.StationLog
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TestRichFunctionClass {
/**
* 把通话成功的电话号码转换成真是用户姓名,用户姓名保存在Mysql表中
* @param args
*/
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//读取数据源
var filePath =getClass.getResource("/station.log").getPath
val stream: DataStream[StationLog] = streamEnv.readTextFile(filePath)
.map(line=>{
var arr=line.split(",")
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
})
//计算:把电话号码变成用户姓名
val result: DataStream[StationLog] = stream.filter(_.callType.equals("success"))
.map(new MyRichMapFunction)
result.print()
streamEnv.execute()
}
//自定义一个富函数类
class MyRichMapFunction extends RichMapFunction[StationLog,StationLog]{
var conn:Connection=_
var pst:PreparedStatement=_
override def open(parameters: Configuration): Unit = {
conn =DriverManager.getConnection("jdbc:mysql://localhost/test","root","123123")
pst =conn.prepareStatement("select name from t_phone where phone_number=?")
}
override def close(): Unit = {
pst.close()
conn.close()
}
override def map(value: StationLog): StationLog = {
println(getRuntimeContext.getTaskNameWithSubtasks)
//查询主叫号码对应的姓名
pst.setString(1,value.callOut)
val result: ResultSet = pst.executeQuery()
if(result.next()){
value.callOut=result.getString(1)
}
//查询被叫号码对应的姓名
pst.setString(1,value.callInt)
val result2: ResultSet = pst.executeQuery()
if(result2.next()){
value.callInt=result2.getString(1)
}
value
}
}
}
6. 底层 ProcessFunctionAPI
概述:
ProcessFunction是一个低层次的流处理操作,允许所有返回Stream的基础构建模块:
访问Event本身数据(比如:Event的时间, Event的当前Key)
管理状态 State(仅在keyed Stream中)
管理定时器Timer( 包括: 注册定时器,删除定时器等)
总而言之,ProcessFunction是Flink最底层的API,也是功能最强大的.
例如:
监控每一个手机,如果在5s内呼叫它的通话都是失败的,发出警告信息.
注意:
这个案例中会使用到状态编程,请同学们只要知道状态的意思,不需要掌握,
code:
package com.bjsxt.flink.transformation
import com.bjsxt.flink.source.StationLog
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
object TestProcessFunction {
//监控每一个手机号码,如果这个号码在5秒内,所有呼叫它的日志都是失败的,则发出告警信息
//如果在5秒内只要有一个呼叫不是fail则不用告警
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//读取数据源
val stream: DataStream[StationLog] = streamEnv.socketTextStream("hadoop101",8888)
.map(line=>{
var arr=line.split(",")
new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong)
})
//计算
val result: DataStream[String] = stream.keyBy(_.callInt)
.process(new MonitorCallFail)
result.print()
streamEnv.execute()
}
//自定义一个底层的类
class MonitorCallFail extends KeyedProcessFunction[String,StationLog,String]{
//使用一个状态对象记录时间
lazy val timeState :ValueState[Long] =getRuntimeContext.getState(new ValueStateDescriptor[Long]("time",classOf[Long]))
override def processElement(value: StationLog, ctx: KeyedProcessFunction[String, StationLog, String]#Context, out: Collector[String]): Unit = {
//从状态中取得时间
var time =timeState.value()
if(time==0 && value.callType.equals("fail") ){ //表示第一次发现呼叫失败,记录当前的时间
//获取当前系统时间,并注册定时器
var nowTime =ctx.timerService().currentProcessingTime()
//定时器在5秒后触发
var onTime =nowTime+8*1000L
ctx.timerService().registerProcessingTimeTimer(onTime)
//把触发时间保存到状态中
timeState.update(onTime)
}
if (time!=0 && !value.callType.equals("fail")){ //表示有一次成功的呼叫,必须要删除定时器
ctx.timerService().deleteProcessingTimeTimer(time)
timeState.clear() //清空状态中的时间
}
}
//时间到了,定时器执行,
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, StationLog, String]#OnTimerContext, out: Collector[String]): Unit = {
var warnStr="触发的时间:"+timestamp +" 手机号 :"+ctx.getCurrentKey
out.collect(warnStr)
timeState.clear()
}
}
}
7. 侧输出流 Side Output
概述:
在Flink处理数据流时,我们经常会遇到这样的情况: 在处理一个数据源时,往往需要将该源中的不同类型的数据做分割处理,如果使用filter
算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费;flink中的侧输出就是将数据流进行分割,而不对流
进行复制的一种分流机制,
Flink的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据.
案例: (根据基站的日志,请把呼叫成功的Stream(主流)和不成功的Stream(侧流分别输出)
code:
package FlinkDemo.functions
9
import FlinkDemo.functions.FunctionClassTransformation.StationLog
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
object TestSideOutpurStream {
// 导入Flink隐式转换
import org.apache.flink.streaming.api.scala._
// 侧输出流首先需要定义一个流的标签
val notSuccessTag = new OutputTag[StationLog]("not_success")
def main(args: Array[String]): Unit = {
//获取flink实时流处理的环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val data = env.readTextFile(getClass.getResource("station.log").getPath)
.map { line =>
var arr = line.split(",")
StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
}
val mainStream = data
.process(new CreateSideOutputStream(notSuccessTag))
// 得到测流
val sideStream = mainStream.getSideOutput(notSuccessTag)
mainStream.print("main")
sideStream.print("sideOutput")
env.execute()
}
class CreateSideOutputStream(tag: OutputTag[StationLog]) extends ProcessFunction[StationLog, StationLog] {
override def processElement(value: StationLog, context: ProcessFunction[StationLog, StationLog]#Context, collector: Collector[StationLog]): Unit = {
//输出主流
if (value.callType.equals("success")) {
collector.collect(value)
}
else {
//输出侧流
context.output(tag, value)}}}}