Flink的API操作

1、Flink简介

官网

Apache Flink® - 数据流上的有状态计算


Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

1.1.1 处理无界和有界数据

任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上 的用户交互记录,所有这些数据都形成一种流。
数据可以被作为无界或者有界流来处理。

    1. 无界流有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持 续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限 的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺 序,以便能够推断结果的完整性。
    1. 有界流有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流 所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理

Apache Flink 擅长处理无界和有界数据集精确的时间控制和状态化使得 Flink 的运行时(runtime)能够 运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部 处理,产生了出色的性能。

1.1.2 部署应用到任意地方

Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源 管理器,例如Hadoop YARNApache MesosKubernetes,但同时也可以作为独立集群运行 (Standalone模式)。 Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resourcemanager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。 部署 Flink 应用程序时,Flink 会根据应用程序配置的并行性自动标识所需的资源,并从资源管理器请求 这些资源。在发生故障的情况下,Flink 通过请求新资源来替换发生故障的容器。提交或控制应用程序 的所有通信都是通过 REST 调用进行的,这可以简化 Flink 与各种环境中的集成

1.1.3 运行任意规模应用

Flink 旨在任意规模上运行有状态流式应用。因此,应用程序被并行化为可能数千个任务,这些任务分 布在集群中并发执行。所以应用程序能够充分利用无尽的 CPU、内存、磁盘和网络 IO。而且 Flink 很容 易维护非常大的应用程序状态。其异步和增量的检查点算法对处理延迟产生小的影响,同时保证精确 一次状态的一致性。

  • Flink 用户报告了其生产环境中一些令人印象深刻的扩展性数字:
    • 每天处理数万亿的事件
    • 可以维护几TB大小的状态
    • 可以部署上千个节点的集群

1.1.4 利用内存性能

有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过 可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行 所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证 故障场景下精确一次的状态一致性。


2、Flink架构图

3、Flink API

WordCount业务

导入Maven依赖

pom.xml

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>${flink.version}</version>
      <!--            <scope>provided</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>${flink.version}</version>
      <!--            <scope>provided</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime-web_2.11</artifactId>
      <version>${flink.version}</version>
      <!--            <scope>provided</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>2.7</version>
    </dependency>
    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>flink-connector-redis_2.11</artifactId>
      <version>1.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>compile</scope>
    </dependency>
  </dependencies>

程序开发步骤

  • 1、创建程序入口
  • 2、数据的输入
  • 3、数据的处理
  • 4、数据的输出
  • 5、启动应用程序

方法一:Tuple2

WordCount.java

//实时统计单词出现次数
public class WordCount {
    public static void main(String[] args) throws Exception{
        //创建程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //数据的输入
        DataStreamSource<String> myDataStream = env.socketTextStream("bigdata02", 1234);
        //数据的处理
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = myDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    out.collect(new Tuple2<>(word, 1));
                    //out.collect(Tuple2.of(word,1));
                }
            }
        }).keyBy(0)
                .sum(1);
        //数据的输出
        result.print();
        //启动应用程序
        env.execute("WordCount");
    }
}
nc -l -p 1234

方法二:面向对象的方法

WordCount.java

public class WordCount {
    public static void main(String[] args) throws Exception{
        //创建程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //数据的输入
        DataStreamSource<String> myDataStream = env.socketTextStream("bigdata02", 1234);
        //数据的处理
        SingleOutputStreamOperator<WordAndOne> result = myDataStream.flatMap(new FlatMapFunction<String, WordAndOne>() {
            @Override
            public void flatMap(String line, Collector<WordAndOne> out) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    out.collect(new WordAndOne(word, 1));
                }
            }
        }).keyBy("word")
                .sum("count");

        result.print();
        env.execute("wordcount");

    }

    public static class WordAndOne{
        private String word;
        private Integer count;

        @Override
        public String toString() {
            return "WordAndOne{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }

        public WordAndOne() {

        }
        public WordAndOne(String word, Integer count) {
            this.word = word;
            this.count = count;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public Integer getCount() {
            return count;
        }

        public void setCount(Integer count) {
            this.count = count;
        }
    }
}

nc -l -p 1234

方法三:内部类

WordCount.java

public class WordCount {
    public static void main(String[] args) throws Exception{
        //创建程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //数据的输入
        DataStreamSource<String> myDataStream = env.socketTextStream("bigdata02", 1234);
        //数据的处理
        SingleOutputStreamOperator<WordAndOne> result = myDataStream.flatMap(new StringSplitTask()).keyBy("word")
                .sum("count");

        result.print();
        env.execute("wordcount");

    }

    //封装了业务逻辑的算子
    public static  class StringSplitTask implements  FlatMapFunction<String,WordAndOne>{

        @Override
        public void flatMap(String line, Collector<WordAndOne> out) throws Exception {
            String[] fields = line.split(",");
            for (String word : fields) {
                out.collect(new WordAndOne(word, 1));
            }
        }
    }


    public static class WordAndOne{
        private String word;
        private Integer count;

        @Override
        public String toString() {
            return "WordAndOne{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }

        public WordAndOne() {

        }
        public WordAndOne(String word, Integer count) {
            this.word = word;
            this.count = count;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public Integer getCount() {
            return count;
        }

        public void setCount(Integer count) {
            this.count = count;
        }
    }
}

方法四:避免硬编码,从外部获取参数

WordCount.java

public class WordCount {
    public static void main(String[] args) throws Exception{
        //创建程序入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hostname = parameterTool.get("hostname");
        int port = parameterTool.getInt("port");


        //数据的输入
        DataStreamSource<String> myDataStream = env.socketTextStream(hostname,port);
        //数据的处理
        SingleOutputStreamOperator<WordAndOne> result = myDataStream.flatMap(new StringSplitTask()).keyBy("word")
                .sum("count");

        result.print();
        env.execute("wordcount");

    }

    //封装了业务逻辑的算子
    public static  class StringSplitTask implements FlatMapFunction<String, WordAndOne> {

        @Override
        public void flatMap(String line, Collector<WordAndOne> out) throws Exception {
            String[] fields = line.split(",");
            for (String word : fields) {
                out.collect(new WordAndOne(word, 1));
                //out.collect(Tuple2.of(word,1));
            }
        }
    }


    public static class WordAndOne{
        private String word;
        private Integer count;

        @Override
        public String toString() {
            return "WordAndOne{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }

        public WordAndOne() {

        }
        public WordAndOne(String word, Integer count) {
            this.word = word;
            this.count = count;
        }

        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public Integer getCount() {
            return count;
        }

        public void setCount(Integer count) {
            this.count = count;
        }
    }
}

Scala版

ScalaWordCount.scala

object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    //获取参数
    val hostname = ParameterTool.fromArgs(args).get("hostname")
    val port = ParameterTool.fromArgs(args).getInt("port")
    //TODO 导入隐式转换
    import org.apache.flink.api.scala._
    //步骤一:获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //步骤二:获取数据源
    val textStream = env.socketTextStream(hostname,port)
    //步骤三:数据处理
    val wordCountStream = textStream.flatMap(line => line.split(","))
      .map((_, 1))
      .keyBy(0)
      .sum(1)
    //步骤四:数据结果处理
    wordCountStream.print()
    //步骤六:启动程序
    env.execute("WindowWordCountScala")
  }

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,110评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,443评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,474评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,881评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,902评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,698评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,418评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,332评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,796评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,968评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,110评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,792评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,455评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,003评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,130评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,348评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,047评论 2 355