1.1 Flink之数据源
1.1.1 source简介
source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加一个source。 flink提供了大量的已经实现好的source方法,你也可以自定义source:
- (1)通过实现sourceFunction接口来自定义无并行度的source
- (2)通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的 source 不过大多数情况下,我们使用自带的source即可。
获取source的方式
- (1)基于文件
readTextFile(path)
读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。 - (2)基于socket
socketTextStream
从socker中读取数据,元素可以通过一个分隔符切开。 - (3)基于集合
fromCollection(Collection)
通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。 - (4)自定义输入
addSource 可以实现读取第三方数据源的数据
系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】
扩展的connectors
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)
1.1.2 数据源之collection
StreamingSourceFromCollection.java
public class StreamingSourceFromCollection {
public static void main(String[] args) throws Exception {
//步骤一:获取环境变量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//步骤二:模拟数据
ArrayList<String> data = new ArrayList<String>();
data.add("hadoop");
data.add("spark");
data.add("flink");
//步骤三:获取数据源
DataStreamSource<String> dataStream = env.fromCollection(data);
//步骤四:transformation操作
SingleOutputStreamOperator<String> addPreStream = dataStream.map(new MapFunction<String, String>() {
public String map(String word) throws Exception {
return "mi_" + word;
}
});
//步骤五:对结果进行处理(打印)
addPreStream.print().setParallelism(1);
//步骤六:启动程序
env.execute("StreamingSourceFromCollection");
}
}
1.1.3 自定义单并行度数据源
MyNoParalleSource.java
/**
*
* 我们数据输出的数据类型
*
* 代表我们的这个数据源只能支持一个并行度(单并行度)
*/
public class MyNoParalleSource implements SourceFunction<Long> {
private long number = 1L;
private boolean isRunning = true;
@Override
public void run(SourceContext<Long> sct) throws Exception {
while (isRunning){
//往下游发送数据
sct.collect(number);
number++;
//每秒生成一条数据
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning=false;
}
}
StreamingDemoWithMyNoPralalleSource.java
public class StreamingDemoWithMyNoPralalleSource {
public static void main(String[] args) throws Exception {
/**
* 1. 获取程序入口
*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
/**
* 2 获取数据源
*/
DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
/**
* 3 数据的处理
*/
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接受到了数据:"+value);
return value;
}
}).setParallelism(2);
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
//过滤出来偶数
return number % 2 == 0;
}
}).setParallelism(2);
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
RichParallelSourceFunction是支持设置多并行度的,关于RichParallelSourceFunction
与RichSourceFunction
的区别,前者支持用户设置多并行度,后者不支持通过setParallelism()
方法设置并行度大于1,默认的并行度为1,否则会报如下错误:
bashException in thread "main" java.lang.IllegalArgumentException: The maximum parallelism of non parallel operator must be 1.
1.1.4 自定义多并行度数据源
MyParalleSource.java
/**
* 我们的这个source是支持多并行度的
*/
public class MyParalleSource implements ParallelSourceFunction<Long> {
private long number = 1L;
private boolean isRunning = true;
@Override
public void run(SourceContext<Long> sct) throws Exception {
while (isRunning){
sct.collect(number);
number++;
//每秒生成一条数据
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning=false;
}
}
StreamingDemoWithMyPralalleSource.java
public class StreamingDemoWithMyPralalleSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<Long> numberStream = env.addSource(new MyParalleSource()).setParallelism(2);
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接受到了数据:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;
}
});
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
1.2 常见Transformation操作
1.2.1 map和filter
/**
* 数据源:1 2 3 4 5.....源源不断过来
* 通过map打印一下接收到数据
* 通过filter过滤一下数据,我们只需要偶数
*/
public class MapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
//flink FlatMap/map -> spark FlatMap/map -> Scala flatmap/Map
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接受到了数据:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;//true
}
});
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
1.2.3 union
/**
* 合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的
* union timeWindowAll
*/
public class unionDemo {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
//把text1和text2组装到一起
DataStream<Long> text = text1.union(text2);
DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("原始接收到数据:" + value);
return value;
}
});
//每2秒钟处理一次数据
DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
//打印结果
sum.print().setParallelism(1);
String jobName = unionDemo.class.getSimpleName();
env.execute(jobName);
}
}
1.2.4 connect,conMap和conFlatMap
/**
* 和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法
*/
public class ConnectionDemo {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "str_" + value;
}
});
//union
ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
//这个方法处理的是数据源 1
@Override
public Object map1(Long value) throws Exception {
return value;
}
//这个方法处理的就是数据源 2
@Override
public Object map2(String value) throws Exception {
return value;
}
});
//打印结果
result.print().setParallelism(1);
String jobName = ConnectionDemo.class.getSimpleName();
env.execute(jobName);
}
}
1.2.5 Split和Select
/**
* 根据规则把一个数据流切分为多个流
应用场景:
* 可能在实际工作中,源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以在根据一定的规则,
* 把一个数据流切分成多个数据流,这样每个数据流就可以使用不用的处理逻辑了
*/
public class SplitDemo {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1
//对流进行切分,按照数据的奇偶性进行区分
SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
@Override
public Iterable<String> select(Long value) {
ArrayList<String> outPut = new ArrayList<>();
if (value % 2 == 0) {
outPut.add("even");//偶数
} else {
outPut.add("odd");//奇数
}
return outPut;
}
});
//选择一个或者多个切分后的流
DataStream<Long> evenStream = splitStream.select("even");
DataStream<Long> oddStream = splitStream.select("odd");
DataStream<Long> moreStream = splitStream.select("odd","even");
//打印结果
//打印偶数
evenStream.print().setParallelism(1);
//打印奇数
// oddStream.print().setParallelism(1);
//打印全部
// moreStream.print().setParallelism(1);
String jobName = SplitDemo.class.getSimpleName();
env.execute(jobName);
}
}
1.3常见sink操作
1.3.1 print() / printToErr()
打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
1.3.2 writeAsText()
/**
* 数据源:1 2 3 4 5.....源源不断过来
* 通过map打印一下接收到数据
* 通过filter过滤一下数据,我们只需要偶数
*/
public class WriteTextDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接受到了数据:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;
}
});
filterDataStream.writeAsText("D:\\flinkout\\value.txt").setParallelism(1);
filterDataStream.print();
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
1.3.3 Flink提供的sink
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)
Google PubSub (source/sink)
1.3.4 自定义sink
/**
* 把数据写入redis
*/
public class SinkForRedisDemo {
public static void main(String[] args) throws Exception {
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//数据源
DataStreamSource<String> text = env.socketTextStream("bigdata02", 8888, "\n");
//lpsuh l_words word
//对数据进行组装,把string转化为tuple2<String,String>
DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
//k v
return new Tuple2<>("f", value);
}
});
// //创建redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("bigdata04").setPort(6379).setPassword("bigdata04").build();
//
// //创建redissink
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
l_wordsData.addSink(redisSink);
env.execute("StreamingDemoToRedis");
}
/**
* 把数据插入到redis到逻辑
*/
public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
//表示从接收的数据中获取需要操作的redis key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//表示从接收的数据中获取需要操作的redis value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
}
}
1.4 【State】
1.4.1 state概述
Apache Flink® — Stateful Computations over Data Streams
回顾单词计数的例子
//实时统计单词出现次数
public class WordCount {
public static void main(String[] args) throws Exception{
//创建程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//数据的输入
DataStreamSource<String> myDataStream = env.socketTextStream("bigdata02", 1234);
//数据的处理
SingleOutputStreamOperator<Tuple2<String, Integer>> result = myDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(new Tuple2<>(word, 1));
//out.collect(Tuple2.of(word,1));
}
}
}).keyBy(0)
.sum(1);
//数据的输出
result.print();
//启动应用程序
env.execute("WordCount");
}
}
输入
hadoop,hadoop
hadoop
hive,hadoop
输出
4> (hadoop,1)
4> (hadoop,2)
4> (hadoop,3)
1> (hive,1)
4> (hadoop,4)
我们会发现,单词出现的次数有累计的效果。如果没有状态的管理,是不会有累计的效果的,所以Flink 里面还有state的概念。
state:一般指一个具体的task/operator的状态。State可以被记录,在失败的情况下数据还可以恢复, Flink中有两种基本类型的State:Keyed State,Operator State,他们两种都可以以两种形式存在:原 始状态(raw state)和托管状态(managed state)
托管状态:由Flink框架管理的状态,我们通常使用的就是这种。
原始状态:由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态 内容,对其内部数据结构一无所知。通常在DataStream上的状态推荐使用托管的状态,当实现一个用 户自定义的operator时,会使用到原始状态。但是我们工作中一般不常用,所以我们不考虑他。
1.4.2 State类型
Operator State(task级别的)
- operator state是task级别的state,说白了就是每个task对应一个state
- Kafka Connector source中的每个分区(task)都需要记录消费的topic的partition和offset等信息。
Keyed State(针对每一个key)
- keyed state 记录的是每个key的状态
-
- Keyed state托管状态有六种类型:
- ValueState
- ListState
- MapState
- ReducingState
- AggregatingState
-
FoldingState
-
state理解
数据源是Kafka
1.4.3 Keyed State的案例演示
ValueState
public class CountWindowAverageWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
/**
* 1.valueState 属于keyed state
* 2.valueState里面只能存储一条数据
*
* 思路:
* long1:当前key出现的次数
* long2:累加的value值
* if(long1=3){
* long2/long1 =avg
* }
*/
private ValueState<Tuple2<Long, Long>> countAndSum;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Long, Long>> average = new ValueStateDescriptor<>(
"average",
Types.TUPLE(Types.LONG, Types.LONG)
);
countAndSum = getRuntimeContext().getState(average);
}
@Override
public void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception {
Tuple2<Long, Long> currentState = countAndSum.value();
if (currentState == null) {
currentState = Tuple2.of(0L, 0L);
}
//统计key出现的次数
currentState.f0 += 1;
//统计value总值
currentState.f1 += element.f1;
countAndSum.update(currentState);
if (currentState.f0 ==3){
double avg =(double)currentState.f1/currentState.f0;
out.collect(Tuple2.of(element.f0,avg));
//清空里面的数据
countAndSum.clear();
}
}
}
/**
* 需求:当接收到的相同 key 的元素个数等于 3 个
* 就计算这些元素的 value 的平均值。
* 计算 keyed stream 中每 3 个元素的 value 的平均值
*
* 1,3
* 1,7
*
* 1,5
*
* 1,5.0
*
* 2,4
*
* 2,2
* 2,5
*
* 2,3.666
*
* key,value
* 1 long,5 doulbe
*
*/
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//数据源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));
// 输出:
//(1,5.0)
//(2,3.6666666666666665)
dataStreamSource
.keyBy(0)
.flatMap(new CountWindowAverageWithValueState()) //flatMap,map + state = 自定义函数的感觉
.print();
env.execute("TestStatefulApi");
}
}
结果输出:
ListState
public class CountWindowAverageWithListState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
/**
* 1,3
* 1,7
* 1,5
*/
private ListState<Tuple2<Long, Long>> elementsByKey;
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<Tuple2<Long, Long>> average = new ListStateDescriptor<>(
"average",
Types.TUPLE(Types.LONG, Types.LONG)
);
elementsByKey = getRuntimeContext().getListState(average);
}
@Override
public void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception {
Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();
if (currentState ==null){
elementsByKey.addAll(Collections.emptyList());
}
elementsByKey.add(element);
ArrayList<Tuple2<Long, Long>> allElements = Lists.newArrayList(elementsByKey.get());
if (allElements.size() ==3){
long count =0;
long sum=0;
for(Tuple2<Long,Long> ele:allElements){
count++;
sum +=ele.f1;
}
double avg =(double)sum/count;
out.collect(Tuple2.of(element.f0,avg));
elementsByKey.clear();
}
}
}
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//数据源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));
// 输出:
//(1,5.0)
//(2,3.6666666666666665)
dataStreamSource
.keyBy(0)
.flatMap(new CountWindowAverageWithListState()) //flatMap,map + state = 自定义函数的感觉
.print();
env.execute("TestStatefulApi");
}
}
结果输出:
MapState
/**
* MapState<K, V> :这个状态为每一个 key 保存一个 Map 集合
* put() 将对应的 key 的键值对放到状态中
* values() 拿到 MapState 中所有的 value
* clear() 清除状态
*/
public class CountWindowAverageWithMapState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
// managed keyed state
//1. MapState :key 是一个唯一的值,value 是接收到的相同的 key 对应的 value 的值
//我们开发过程当中声明的state其实我们可以理解为就是一个辅助变量。
//Map的数据类型:key相同 数据就覆盖了
/**
* 1,3
* 1,5
* 1,7
*
*/
private MapState<String, Long> mapState;
@Override
public void open(Configuration parameters) throws Exception {
// 注册状态
MapStateDescriptor<String, Long> descriptor =
new MapStateDescriptor<String, Long>(
"average", // 状态的名字
String.class, Long.class); // 状态存储的数据类型
mapState = getRuntimeContext().getMapState(descriptor);
}
/**
* 1,3
* 1,5
* 1,7
*
* dfsfsdafdsf,3
* dfsfxxxfdsf,5
* xxxx323123,7
*
*
* @param element
* @param out
* @throws Exception
*/
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, Double>> out) throws Exception {
mapState.put(UUID.randomUUID().toString(), element.f1);
// 判断,如果当前的 key 出现了 3 次,则需要计算平均值,并且输出
List<Long> allElements = Lists.newArrayList(mapState.values());
if (allElements.size() == 3) {
long count = 0;
long sum = 0;
for (Long ele : allElements) {
count++;
sum += ele;
}
double avg = (double) sum / count;
//
out.collect(Tuple2.of(element.f0, avg));
// 清除状态
mapState.clear();
}
}
}
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//数据源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));
// 输出:
//(1,5.0)
//(2,3.6666666666666665)
dataStreamSource
.keyBy(0)
.flatMap(new CountWindowAverageWithMapState()) //flatMap,map + state = 自定义函数的感觉
.print();
env.execute("TestStatefulApi");
}
}
输出结果:
ReducingState
/**
* ReducingState<T> :这个状态为每一个 key 保存一个聚合之后的值
* get() 获取状态值
* add() 更新状态值,将数据放到状态中
* clear() 清除状态
*/
public class SumFunction
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
//sum = 最终累加的结果的数据类型
private ReducingState<Long> sumState;
@Override
public void open(Configuration parameters) throws Exception {
// 注册状态
ReducingStateDescriptor<Long> descriptor =
new ReducingStateDescriptor<Long>(
"sum", // 状态的名字
new ReduceFunction<Long>() { // 聚合函数
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}, Long.class); // 状态存储的数据类型
sumState = getRuntimeContext().getReducingState(descriptor);
}
/**
*
* 3
* 5
* 7
*
* @param element
* @param out
* @throws Exception
*/
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, Long>> out) throws Exception {
// 将数据放到状态中
sumState.add(element.f1);
out.collect(Tuple2.of(element.f0, sumState.get()));
}
}
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//数据源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));
// 输出:
dataStreamSource
.keyBy(0)
.flatMap(new SumFunction()) //累加
.print();
env.execute("TestStatefulApi");
}
}
输出:
AggregatingState
public class ContainsValueFunction
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {
/**
* 1, contains:3 and 5
*/
private AggregatingState<Long, String> totalStr;//辅助字段
@Override
public void open(Configuration parameters) throws Exception {
// 注册状态
AggregatingStateDescriptor<Long, String, String> descriptor =
new AggregatingStateDescriptor<Long, String, String>(
"totalStr", // 状态的名字
//SparkSQL 自定义聚合函数
new AggregateFunction<Long, String, String>() {
//初始化的操作,只运行一次哦
@Override
public String createAccumulator() {
return "Contains:";
}
@Override
public String add(Long value, String accumulator) {
if ("Contains:".equals(accumulator)) {
return accumulator + value;
}
return accumulator + " and " + value;
}
@Override
public String merge(String a, String b) {
return a + " and " + b;
}
@Override
public String getResult(String accumulator) {
//contains:1
//contains: 1 and 3 and
return accumulator;
}
}, String.class); // 状态存储的数据类型
totalStr = getRuntimeContext().getAggregatingState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, String>> out) throws Exception {
totalStr.add(element.f1);
out.collect(Tuple2.of(element.f0, totalStr.get()));
}
}
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//数据源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));
// 输出:
dataStreamSource
.keyBy(0)
.flatMap(new ContainsValueFunction()) //flatMap,map + state = 自定义函数的感觉
.print();
env.execute("TestStatefulApi");
}
}
输出:
1.5 State backend
1.5.1 概述
Flink支持的StateBackend:
MemoryStateBackend
FsStateBackend
RocksDBStateBackend
1.5.2 MemoryStateBackend
默认情况下,状态信息是存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到 JobManager 的堆内存中。
- 缺点:
只能保存数据量小的状态 状态数据有可能会丢失 - 优点:
开发测试很方便
1.5.3 FSStateBackend
状态信息存储在 TaskManager 的堆内存中的,checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)
- 缺点:
状态大小受TaskManager内存限制(默认支持5M) - 优点:
状态访问速度很快
状态信息不会丢失
用于: 生产,也可存储状态数据量大的情况
1.5.4 RocksDBStateBackend
状态信息存储在 RocksDB 数据库 (key-value 的数据存储服务), 最终保存在本地文件中 checkpoint 的时候将状态保存到指定的文件中 (HDFS 等文件系统)
- 缺点:
状态访问速度有所下降 - 优点:
可以存储超大量的状态信息
状态信息不会丢失
用于: 生产,可以存储超大量的状态信息
1.5.5 StateBackend配置方式
(1)单任务调整
修改当前任务代码 env.setStateBackend(new FsStateBackend("hdfs://bigdata02:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】
第三方依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
(2)全局调整
修改flink-conf.yaml
state.backend: filesystem state.checkpoints.dir: hdfs://bigdata02:9000/flink/checkpoints
注意:state.backend的值可以是下面几种:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
1.6 checkpoint(容错)
1.6.1 checkpoint概述
- (1)为了保证state的容错性,Flink需要对state进行checkpoint。
- (2)Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个 Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩 溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常
- (3)Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提: 持久化的source,它需要支持在一定时间内重放事件。这种sources的典型例子是持久化的消息队列 (比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等) 用于state的持久化存储,例如分布式文件系统(比如HDFS,S3,GFS等)
生成快照
恢复快照
1.6.2 checkpoint配置
默认checkpoint功能是disabled的,想要使用的时候需要先启用,checkpoint开启之后, checkPointMode有两种,Exactly-once和At-least-once,默认的checkPointMode是Exactly-once, Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终 延迟为几毫秒)。
/**
* state:
* keyed
* operator -> checkpoint
*/
public class WordCount4 {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String hostname = parameterTool.get("hostname");
int port = parameterTool.getInt("port");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//10s 15s
//如果数据量比较大,建议5分钟左右checkpoint的一次。
//阿里他们使用的时候 也是这样建议的。
env.enableCheckpointing(10000);//10s 15s state
FsStateBackend fsStateBackend = new FsStateBackend("hdfs://bigdata02:9000/flink_1/checkpoint");
MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
env.setStateBackend(fsStateBackend);
env.setStateBackend(new RocksDBStateBackend("hdfs://bigdata02:9000/flink_2/checkpoint"));
//setCheckpointingMode---是否允许数据重复
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//setMinPauseBetweenCheckpoints ---两个checkpoint之间间隔多久
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//setCheckpointTimeout ---超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000);
//enableExternalizedCheckpoints---cancel程序的时候保存checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 间隔
));
DataStreamSource<String> dataStream = env.socketTextStream(hostname, port);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(0)
.sum(1);
result.print();
env.execute("WordCount check point....");
}
}
1.7 恢复数据(容错)
1.7.1 重启策略概述(重试)
Flink支持不同的重启策略,以在故障发生时控制作业如何重启,集群在启动时会伴随一个默认的重启策 略,在没有定义具体重启策略时会使用该默认策略。 如果在工作提交时指定了一个重启策略,该策略会 覆盖集群的默认策略,默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。
常用的重启策略
- (1)固定间隔 (Fixed delay)
- (2)失败率 (Failure rate)
- (3)无重启 (No restart)
如果没有启用 checkpointing,则使用无重启 (no restart) 策略。
如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略, 尝试重启次数 默认值是:Integer.MAX_VALUE,重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在 应用代码中动态指定,会覆盖全局配置。
1.7.2 重启策略
固定间隔 (Fixed delay)
第一种:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 尝试重启的次数
Time.of(10, TimeUnit.SECONDS) // 间隔
));
失败率 (Failure rate)
第一种:全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 一个时间段内的最大失败次数
Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段
Time.of(10, TimeUnit.SECONDS) // 间隔
));
无重启 (No restart)
第一种:全局配置 flink-conf.yaml
restart-strategy: none
第二种:应用代码设置
env.setRestartStrategy(RestartStrategies.noRestart());
1.7.3 多checkpoint
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink 程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint, 并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录 处理有问题,希望将整个状态还原到4小时之前Flink可以支持保留多个Checkpoint,需要在Flink的配置 文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数:
state.checkpoints.num-retained: 20
这样设置以后就查看对应的Checkpoint在HDFS上存储的文件目录
hdfs dfs -ls hdfs://bigdata02:9000/flink/checkpoints
如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现