Flink常见核心概念

分布式缓存

有时一些数据是通用的,就需要进行共享,可以放在文件、缓存、db中,可以放在文件中,先缓存到hadoop集群中,然后使用cache

public class CacheStream {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //注册缓存文件
        env.registerCachedFile("test.txt", "distributeCache");
        DataSource<String> data = env.fromElements("Linea", "Lineb", "Linec");
        DataSet<String> result = data.map(new RichMapFunction<String, String>() {
            private ArrayList<String> dataList = new ArrayList<>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //从缓存中获取文件
                File customFile = getRuntimeContext().getDistributedCache().getFile("distributeCache");
                List<String> lines = FileUtils.readLines(customFile);
                for (String line : lines) {
                    this.dataList.add(line);
                }
            }

            @Override
            public String map(String value) throws Exception {
                //执行数据处理
                return dataList + ":" + value;
            }
        });
        result.printToErr();
    }
}

故障恢复和重启策略

故障恢复

Flink支持不同级别的故障恢复故障,jobmanager.exection.failover-strategy的值为:region和full。
当值为full时,集群中的task发生故障,该任务的所有task都会发生重启。
当值为region时:

  • 发生错误的Task所在的Region需要重启
  • 如果当前Region的依赖数据出现损坏或者部分丢失,生产数据的Region也需要重启
  • 为了保证数据一致性,当前region的下游Region也需要重启。

重启策略

Flink提供的多种类型和级别的重启策略:

  • 固定延迟重启策略模式: 需要设置重启的次数和间隔。restart-strategy.fixed-delay.attempts(次数),restart-strategy.fixed-delay.delay(时间间隔)
  • 失败率重启策略模式: restart-strategy.failure-rate.delay(每次重试的时间间隔)、
    restart-strategy.failure-rate.failure-rate-interval(计算失败的事件间隔)、
    restart-strategy.failure-rate.max-failures-per-interval(在指定时间间隔内最大的失败次数)。
  • 无重启策略模式:作业发生错误,任务会直接退出。

如果用户配置了checkpoint,但没有设置重启策略,就会按照固定延迟重启策略模式进行重启,如果用户没有配置checkpoint,那么默认不会重启。

并行度

可以使用setParallelism(10)
配置的级别优先级:算子级别>执行环境级别>提交任务级别>系统配置级别。

窗口

Flink支持三种窗口:

  • 滚动窗口:窗口数据有固定的大小,窗口中的数据不会叠加;
  • 滑动窗口:窗口数据有固定的大小,并且有生成间隔。
  • 会话窗口:窗口数据没有固定的大小,根据用户传入的参数进行划分,窗口数据无叠加。

Flink中的时间分为三种:

  • 事件时间:事件实际发生的时间;
  • 摄入时间:事件进入流框架处理的时间;
  • 处理时间:事件被处理的时间。

水印的出现是为了解决实时计算中的数据乱序问题。水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。

摄入时间可以方式Flink内部处理数据是发生乱序的情况,但无法解决数据到达Flink之前发生的乱序问题。如果需要处理此类问题,建议使用EventTime。

状态

Keyed State是经过分区后的流上状态,每个Key都有自己的状态,Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,
流入这个算子子任务的数据可以访问和更新这个状态。每个算子子任务上的数据共享自己的状态。
Keyed State和Operator State,Flink的状态都是基于本地的,即每个算子子任务维护着这个算子子任务对应的状态存储,算子子任务之间的状态不能相互访问。

Flink的状态会保存在taskmanager的内存中,Flink 提供了三种可用的状态后端(MemoryStateBackend、FsStateBackend、RocksDBStateBackend)
用于在不同情况下进行状态后端的保存。

public class FlinkValueState {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromElements(Tuple2.of(1L,3L),Tuple2.of(1L,5L),Tuple2.of(1L,7L),
                Tuple2.of(1L,5L),Tuple2.of(1L,2L))
                .keyBy(0)
                .flatMap(new CountWindowAverage())
                .printToErr();
        env.execute("submit job");
    }
}
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
        Tuple2<Long, Long> currentSum;
        if (null == sum.value()) {
            currentSum = Tuple2.of(0L, 0L);
        } else {
            currentSum = sum.value();
        }
        currentSum.f0 += 1;
        currentSum.f1 += input.f1;
        sum.update(currentSum);
        if (currentSum.f0 >= 2){
            out.collect(new Tuple2<>(input.f0,currentSum.f1/currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<Tuple2<Long, Long>>("average", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
            @Override
            public TypeInformation<Tuple2<Long, Long>> getTypeInfo() {
                return super.getTypeInfo();
            }
        }));
        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();
        descriptor.enableTimeToLive(ttlConfig);
        sum = getRuntimeContext().getState(descriptor);
    }
}

MeomoryStateBackend

MeomoryStateBackend将state数据存储在内存中,一般用来进行本地调试用,需要注意:

  • 每个独立状态默认限制大小为5MB,可以通过构造函数增加容量
  • 状态的大小不能超过akka的Framesize大小
  • 聚合后的状态必须能放进JobManager的内存中
  • 记录一些状态很小的Job状态信息

FsStateBackend

FsStateBackend 会把状态数据保存在TaskManager的内存中,CheckPoint时,将状态快找写入到配置的文件系统目录中,少量的元数据信息存储到
JobManager的内存中,适用于大作业、状态较大、全局高可用的哪些任务。

RocksDBStateBackend

RocksDBStateBackend将正在运行中的状态数据保存在RocksDB数据库中,RocksDB数据库默认将数据存储在TaskManaer运行节点的数据目录下。
RocksDBStateBackend是唯一支持增量快照的状态后端。

分流

分流可以用filter,需要多次遍历原始流。
分流使用split,将不同类型的数据进行标记,split算子切分过的流,是不能进行二次切分的,否则会抛出异常。

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;


class StreamingDemoSplit {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //获取数据源
        List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
        data.add(new Tuple3<>(0,1,0));
        data.add(new Tuple3<>(0,1,1));
        data.add(new Tuple3<>(0,2,2));
        data.add(new Tuple3<>(0,1,3));
        data.add(new Tuple3<>(1,2,5));
        data.add(new Tuple3<>(1,2,9));
        data.add(new Tuple3<>(1,2,11));
        data.add(new Tuple3<>(1,2,13));
        DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
        SplitStream<Tuple3<Integer, Integer, Integer>> splitStream = items.split(new OutputSelector<Tuple3<Integer, Integer, Integer>>() {
            @Override
            public Iterable<String> select(Tuple3<Integer, Integer, Integer> value) {
                List<String> tags = new ArrayList<>();
                if (value.f0 == 0) {
                    tags.add("zeroStream");
                } else if (value.f0 == 1) {
                    tags.add("oneStream");
                }
                return tags;
            }
        });

        splitStream.select("zeroStream").print();
        splitStream.select("oneStream").printToErr();

        //打印结果
        String jobName = "user defined streaming source";
        env.execute(jobName);
    }
}

SideOutput分类

SideOutput取代SplitStream,并且也是Flink推荐的分流方法,执行步骤:

  • 定义OutputTag
  • 调用特定函数进行数据拆分
  •  ProcessFunction
    
  •  KeyedProcessFunction
    
  •  CoProcessFunction
    
  •  KeyedProcessFunction
    
  •  ProcessWindowFunction
    
  •  ProcessAllWindowFunction
    
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.ArrayList;
import java.util.List;
class StreamingDemoSideOutPut {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //获取数据源
        List data = new ArrayList<Tuple3<Integer,Integer,Integer>>();
        data.add(new Tuple3<>(0,1,0));
        data.add(new Tuple3<>(0,1,1));
        data.add(new Tuple3<>(0,2,2));
        data.add(new Tuple3<>(0,1,3));
        data.add(new Tuple3<>(1,2,5));
        data.add(new Tuple3<>(1,2,9));
        data.add(new Tuple3<>(1,2,11));
        data.add(new Tuple3<>(1,2,13));
        DataStreamSource<Tuple3<Integer,Integer,Integer>> items = env.fromCollection(data);
        OutputTag<Tuple3<Integer,Integer,Integer>> zeroStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("zeroStream") {};
        OutputTag<Tuple3<Integer,Integer,Integer>> oneStream = new OutputTag<Tuple3<Integer,Integer,Integer>>("oneStream") {};
        SingleOutputStreamOperator<Tuple3<Integer, Integer, Integer>> processStream= items.process(new ProcessFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>>() {
            @Override
            public void processElement(Tuple3<Integer, Integer, Integer> value, Context ctx, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
                if (value.f0 == 0) {
                    ctx.output(zeroStream, value);
                } else if (value.f0 == 1) {
                    ctx.output(oneStream, value);
                }
            }
        });
        DataStream<Tuple3<Integer, Integer, Integer>> zeroSideOutput = processStream.getSideOutput(zeroStream);
        DataStream<Tuple3<Integer, Integer, Integer>> oneSideOutput = processStream.getSideOutput(oneStream);
        zeroSideOutput.print();
        oneSideOutput.printToErr();
        //打印结果
        String jobName = "user defined streaming source";
        env.execute(jobName);
    }

}

CEP

Flink CEP的程序结构主要分为两个步骤:定义模式、匹配结果。

public class StreamingCep {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource source = env.fromElements(
                //浏览记录
                Tuple3.of("Marry", "外套", 1L),
                Tuple3.of("Marry", "帽子", 1L),
                Tuple3.of("Marry", "帽子", 2L),
                Tuple3.of("Marry", "帽子", 3L),
                Tuple3.of("Ming", "衣服", 1L),
                Tuple3.of("Marry", "鞋子", 1L),
                Tuple3.of("Marry", "鞋子", 2L),
                Tuple3.of("LiLei", "帽子", 1L),
                Tuple3.of("LiLei", "帽子", 2L),
                Tuple3.of("LiLei", "帽子", 3L)
        );
        //定义Pattern,寻找连续搜索帽子的用户
        Pattern<Tuple3<String, String, Long>, Tuple3<String, String, Long>> pattern = Pattern
                .<Tuple3<String, String, Long>>begin("start")
                .where(new SimpleCondition<Tuple3<String, String, Long>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Long> value) throws Exception {
                        return value.f1.equals("帽子");
                    }
                })
                .next("middle")
                .where(new SimpleCondition<Tuple3<String, String, Long>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Long> value) throws Exception {
                        return value.f1.equals("帽子");
                    }
                });
        KeyedStream keyedStream = source.keyBy(0);
        PatternStream patternStream = CEP.pattern(keyedStream, pattern);

        SingleOutputStreamOperator matchStream = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Long>, String>() {
            @Override
            public String select(Map<String, List<Tuple3<String, String, Long>>> pattern) throws Exception {
                List<Tuple3<String, String, Long>> middle = pattern.get("middle");
                return middle.get(0).f0 + ":" + middle.get(0).f2 + ":" + "连续搜索两次帽子!";
            }
        });
        matchStream.printToErr();
        env.execute("execute cep");
    }
}

自定义Source

通过实现Flink的SourceFunction或者ParallelSourceFunction来实现单个或者多个并行度的Source。

public class CustomSource implements SourceFunction<Item> {
    private boolean isRunning = true;
    //重写run方法来产生一个源源不断的数据发送源
    @Override
    public void run(SourceContext<Item> ctx) throws Exception {
        while (isRunning) {
            Item item = generateItem();
            ctx.collect(item);
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        isRunning = false;
    }
    private Item generateItem() {
        int i = new Random().nextInt(100);
        Item item = new Item();
        item.setName("name " + i);
        item.setId(i);
        return item;
    }
}

异步I/O和可查询状态都是Flink提供的非常底层与外部系统交互的方式。其中异步I/O是为了解决Flink在实时计算中访问外部存储产生的延迟问题,需要使用
继承RichAsyncFunction来使用异步I/O。
ResultFuture的complete方法是异步的,不需要等待返回。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容