Flink 使用之数据源

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

Flink内置数据源

Text file

读取磁盘或者HDFS中的文件作为数据源。
唯一的参数file path可以指定:

  • file:///path/to/file.txt
  • hdfs:///path/to/file.txt

注意:

  1. 如果不填写前缀file://或者hdfs://,默认为file://
  2. 使用Flink读取HDFS文件系统,需要去官网下载对应Pre-bundled Hadoop包。这里给出的链接是适用于Hadoop 2.8.3。之后将这个jar复制到flink安装位置的lib目录中。
val stream = env.readTextFile("/path/to/file.txt")

socketTextStream

使用socket作为数据源。但不推荐socket在生产环境中作为数据源。原因如下:

  • socket无状态,也不能replay。故无法保证数据精准投送。
  • socket数据源并行度只能是1,无法很好利用并发处理性能。

SocketTextStream适合用于debug或者是测试用途。

val stream = env.socketTextStream("localhost", 9000)

fromElements

将一系列元素作为数据源。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.fromElements(1, 2, 3);

fromCollection

和fromElements方法类似,不同的是该方法接收一个集合对象,而不是可变参数。如下所示:

val stream = env.fromCollection(Array(1, 2, 3))

Kafka 数据源

该数据源用于接收Kafka的数据。
使用Kafka数据源之前需要先确定Kafka的版本,引入对应的Kafka Connector以来。对应关系如下所示。

Kafka 版本 Maven 依赖
0.8.x flink-connector-kafka-0.8_2.11
0.9.x flink-connector-kafka-0.9_2.11
0.10.x flink-connector-kafka-0.10_2.11
0.11.x flink-connector-kafka-0.11_2.11
1.0 以上 flink-connector-kafka_2.11

引入Maven依赖。以flink-connector-kafka_2.11为例,添加以下依赖到pom.xml文件:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.9.1</version>
</dependency>

在集群中运行时,为了减少提交jar包的大小,需要将该依赖设置为provided。然后把此依赖包复制到Flink各个节点安装位置的lib目录中。

一个简单的使用例子如下:

// 设置Kafka属性
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.100.128:9092")
properties.setProperty("group.id", "test")

// 创建Kafka数据源,其中test为topic名称
val consumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties)

DeserializationSchema

DeserializationSchema用于将接收到的二进制数据转换为Java或Scala对象。Kafka Connector提供了如下4种DeserializationSchema:

  • TypeInformationSerializationSchema:使用Flink的TypeInformation反序列化。如果上游数据也是通过Flink TypeInformation序列化后写入的,这里使用此schema最为合适。
  • JsonDeserializationSchema :将获取的数据转换为JSON格式。这里有一个坑,如果发送过来的数据不是合法的JSON格式,数据源会抛出异常导致TaskManager重启。如果需要对不合法的JSON数据容错,需要实现自定义的DeserializationSchema。
  • AvroDeserializationSchema:读取Avro格式的数据。
  • SimpleStringSchema:转换接收到的数据为字符串。

自定义DeserializationSchema

所有的Schema需要实现DeserializationSchema。该接口源码如下所示:

@Public
public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

    /**
     * Deserializes the byte message.
     *
     * @param message The message, as a byte array.
     *
     * @return The deserialized message as an object (null if the message cannot be deserialized).
     */
    T deserialize(byte[] message) throws IOException;

    /**
     * Method to decide whether the element signals the end of the stream. If
     * true is returned the element won't be emitted.
     *
     * @param nextElement The element to test for the end-of-stream signal.
     * @return True, if the element signals end of stream, false otherwise.
     */
    boolean isEndOfStream(T nextElement);
}

方法解释:

  • deserialize:将二进制消息转换为某类型消息。
  • isEndOfStream:表示是否是最后一条数据。

以SimpleStringSchema为例展示下怎么编写自定义的DeserializationSchema。
相关代码如下:

@PublicEvolving
public class SimpleStringSchema implements DeserializationSchema<String>, SerializationSchema<String> {
    // SerializationSchema接口的方法省略
    @Override
    public String deserialize(byte[] message) {
        return new String(message, charset);
    }

    @Override
    public boolean isEndOfStream(String nextElement) {
        return false;
    }
    // ...
}

起始位置属性配置

使用示例:

myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour

方法解释:

  • setStartFromEarliest:从最早儿元素开始消费
  • setStartFromLatest:从最近的元素开始消费
  • setStartFromTimestamp:从指定时间戳的数据开始消费
  • setStartFromGroupOffsets:这是默认的配置。从消费组的offset开始消费。必须配置group.id配置项。

Topic和分区感知

Topic感知

可以使用如下构造函数创建FlinkKafkaConsumer:

FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) 

和指定topic名称不同的是,这里传入的是一个正则表达式。所有名称匹配该正则表达式的topic都会被订阅。如果配置了分区感知(配置flink.partition-discovery.interval-millis为非负数),Job启动之后kafka新创建的topic如果匹配该正则,也会被订阅到。

分区感知

在Job运行过程中如果kafka新创建了partition,Flink可以动态感知到,然后对其中数据进行消费。整个过程仍然可以保证exactly once语义。

默认情况分区感知是禁用的。如果要开启分区感知,可以设置flink.partition-discovery.interval-millis,即分区感知触发时间间隔。

实现自定义数据源

自定义数据源需要实现Flink提供的SourceFunction接口。

SourceFunction接口的定义如下:

@Public
public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceContext<T> ctx) throws Exception;
    void cancel();
}

run方法

run方法为数据源向下游发送数据的主要逻辑。编写套路为:

  • 不断调用循环发送数据。
  • 使用一个状态变量控制循环的执行。当cancel方法执行后必须能够跳出循环,停止发送数据。
  • 使用SourceContext的collect等方法将元素发送至下游。
  • 如果使用Checkpoint,在SourceContext collect数据的时候必须加锁。防止checkpoint操作和发送数据操作同时进行。

cancel方法:

cancel方法在数据源停止的时候调用。cancel方法必须能够控制run方法中的循环,停止循环的运行。并做一些状态清理操作。

SourceContext类

SourceContext在SourceFunction中使用,用于向下游发送数据,或者是发送watermark。
SourceContext的方法包括:

  • collect:向下游发送数据。有如下三种情况:
    • 如果使用ProcessingTime,该元素不携带timestamp。
    • 如果使用IngestionTime,元素使用系统当前时间作为timestamp。
    • 如果使用EventTime,元素不携带timestamp。需要在数据流后续为元素指定timestamp(assignTimestampAndWatermark)。
  • collectWithTimestamp:向下游发送带有timestamp的数据。和collect方法一样也有如下三种情况:
    • 如果使用ProcessingTime,timestamp会被忽略
    • 如果使用IngestionTime,使用系统时间覆盖timestamp
    • 如果使用EventTime,使用指定的timestamp
  • emitWatermark:向下游发送watermark。watermark也包含一个timestamp。向下游发送watermark意味着所有在watermark的timestamp之前的数据已经到齐。如果在watermark之后,收到了timestamp比该watermark的timestamp小的元素,该元素会被认为迟到,将会被系统忽略,或者进入到旁路输出(side output)。
  • markAsTemporarilyIdle:标记此数据源暂时闲置。该数据源暂时不会发送任何数据和watermark。仅对IngestionTime和EventTime生效。下游任务前移watermark的时候将不会再等待被标记为闲置的数据源的watermark。

CheckpointedFunction

如果数据源需要保存状态,那么就需要实现CheckpointedFunction中的相关方法。
CheckpointedFunction包含如下方法:

  • snapshotState:保存checkpoint的时候调用。需要在此方法中编写状态保存逻辑
  • initializeState:在数据源创建或者是从checkpoint恢复的时候调用。此方法包含数据源的状态恢复逻辑。

样例

Flink官方给出的样板Source。这个数据源会发送0-999到下游系统。代码如下所示:

public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
    private long count = 0L;
    // 使用一个volatile类型变量控制run方法内循环的运行
    private volatile boolean isRunning = true;

    // 保存数据源状态的变量
    private transient ListState<Long> checkpointedCount;

    public void run(SourceContext<T> ctx) {
        while (isRunning && count < 1000) {
            // this synchronized block ensures that state checkpointing,
            // internal state updates and emission of elements are an atomic operation
            // 此处必须要加锁,防止在checkpoint过程中,仍然发送数据
            synchronized (ctx.getCheckpointLock()) {
                ctx.collect(count);
                count++;
            }
        }
    }

    public void cancel() {
        // 设置isRunning为false,终止run方法内循环的运行
        isRunning = false;
    }

    public void initializeState(FunctionInitializationContext context) {
        // 获取存储状态
        this.checkpointedCount = context
            .getOperatorStateStore()
            .getListState(new ListStateDescriptor<>("count", Long.class));

        // 如果数据源是从失败中恢复,则读取count的值,恢复数据源count状态
        if (context.isRestored()) {
            for (Long count : this.checkpointedCount.get()) {
                this.count = count;
            }
        }
    }

    public void snapshotState(FunctionSnapshotContext context) {
        // 保存数据到状态变量
        this.checkpointedCount.clear();
        this.checkpointedCount.add(count);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354

推荐阅读更多精彩内容