Flink(2) 简单上手

maven 依赖

  <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>

Flink 批处理能力

/**
 * 批处理
 */
public class WordCount {

    public static void main(String[] args)  throws Exception{
        //1.创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.从文件读取数据
        String inputPath = "/you/path/hello.txt";
        DataSet<String> inputDataSet = env.readTextFile(inputPath);
        //3.对数据集进行处理 ,拆成 单个单词,转换成 2元组(word ,1)进行统计
        DataSet<Tuple2<String,Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
                .groupBy(0) //按照第一位置的word分组
                .sum(1); //按照第二个位置上的数据求和

        resultSet.print(); //输出

    }

    //自定义类实现FlatMapFunction接口
    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            //按照空格分词
            String[] words = value.split(" ");
            //遍历所有的 word ,包成而元组输出
            for (String word: words ) {
                 out.collect(new Tuple2<String,Integer>(word ,1));
            }
        }
    }
    
}

Flink 流处理能力


/**
 * 流处理
 */
public class StreamWordCount {

    public static void main(String[] args) throws Exception{
        //env
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       //默认的并行度,是按照的当前电脑的cpu
//        env.getParallelism();
//        env.setParallelism(4);
        //前面的数字 1 ,代表当前我们现在环境当前并行执行的线程的编号
        // 1> (hello,1)

        //2.从文件读取数据
//        String inputPath = "/path/to/hello.txt";
//        DataStreamSource<String> inputDataStream = env.readTextFile(inputPath);

        //用 parameter tool 工具从程序的启动参数中提取配置项
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");

        //从队列读取数据
        //从socket文本流读取数据
        DataStream<String> inputDataStream = env.socketTextStream(host, port);


        //基于数据流进行转换计算     // keyBey按照key的hash ,进行重分区,不做计算
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
                .keyBy(0)
                .sum(1);

        //打印输出
        resultStream.print();

        //执行任务
        env.execute();

    }
}

2.启动设置

# idea 设置
program arguments :  --host localhost --port 7777
  1. 启动nc
nc -lk 7777
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容