Flink 基础 - 状态管理

Flink 中的状态

  • 算子状态 (Operator State)
  • 键控状态 (Keded State)
  • 状态后端 (State Nackends)
image.png
  1. 由一个任务维护,用来计算耨个结果的所有数据,都属于这个任务的状态
  2. 可以认为状态是一个本地变量

算子状态

image.png
  • 算子状态的作用作用于当前算子任务,同一个子任务所有数据,都可以访问到相同的状态 ,状态对于同一个子任务是共享的
  • 算子状态不能由另一个子任务访问

列表状态 - list state

将状态定义为一组数据的列表

联合列表状态 - union list state

也是将状态定义为列表,与列表状态不同的是,发生故障时或从保存点启动,如何恢复

广播状态

当前所有分区状态全都一样

下面做一个代码示例。使用 map 算子统计输入数据的数量

package com.lxs.flink.realtime.state;

import com.lxs.utils.KafkaUtils;
import javafx.collections.ListChangeListener;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Collections;
import java.util.List;

/**
 * User: lixinsong
 * Date: 2021/1/20
 * Description:
 */

public class OperatorStateTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<Tuple2<String, Long>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).map(new MapFunction<String, Tuple2<String, Long >>() {
            @Override
            public Tuple2<String, Long> map(String s) throws Exception {
                String[] arr = s.split(",");
                return Tuple2.of(arr[1], Long.parseLong(arr[2]));
            }
        });
        
        // 定义一个有状态的 map操作 ,统计当前当前分区的数据个数 
        SingleOutputStreamOperator<Integer> map = dataStream.map(new MyCountMapper());
        map.print("test aa");
        env.execute(" test");

    }

    public static class MyCountMapper implements MapFunction<Tuple2<String, Long>, Integer>, ListCheckpointed<Integer>  {

        // 定义一个本地变量,作为算子状态
        private Integer count = 0;

        @Override
        public Integer map(Tuple2<String, Long> s) throws Exception {
            count ++;
            return count;
        }

        @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            // 对状态做一个快照
            return Collections.singletonList(count);
        }

        @Override
        public void restoreState(List<Integer> state) throws Exception {
            // 恢复数据
            for (Integer s : state) {
                count += s;
            }
        }
    }
}

此段代码比较简单,不做过多描述,需要注意,实现了 ListCheckpointed 的接口是为了,保存状态和恢复状态的

键控状态

image.png
  • 键控状态是根据输入数据流中定义的键来维护和访问的
  • Flink 为每个 key 维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态
  • 当任务处理一条数据时,他会自动将状态的访问范围限定为当前数据的 key

值状态 (Vlaue state)

将状态表示为单个的值
代码示例, 依然是最简单的wordCount, 这次在map中实现

package com.lxs.flink.realtime.state;

import com.lxs.utils.KafkaUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
 * User: lixinsong
 * Date: 2021/1/21
 * Description:
 */

public class KeyedStateTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 输入数据  word Count 示例
        DataStream<Tuple2<String, Long>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
                String[] arr = s.split(",");
                for(String a : arr) {
                    collector.collect(Tuple2.of(a, 1L));
                }
            }
        });
        // 定义一个有状态的 map操作 ,统计当前当前课程的购买次数
        DataStream<Tuple2<String, Long>> map = dataStream.keyBy(0).map(new MyCountMapper1());
        map.print("test aa");
        env.execute(" test");

    }

    public static class MyCountMapper1 extends RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> {

        private ValueState<Integer> keyCountState;

        @Override
        public void open(Configuration parameters) throws Exception {
            keyCountState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("key-count", Integer.class));

        }

        @Override
        public void close() throws Exception {
            super.close();
        }

        @Override
        public Tuple2<String, Long> map(Tuple2<String, Long> s) throws Exception {
            Integer count = keyCountState.value();   // 获取这个key的state
            if (Objects.isNull(count)) {
                count = 0;
            }
            count ++;     // 次数 + 1
            keyCountState.update(count);              // 更新状态
            return Tuple2.of(s.f0, (long)count);
        }
    }
}

测试结果

test aa> (li,1)
test aa> (xin,1)
test aa> (xin,2)
test aa> (song,1)

这次我们的wordcount 没有使用类似 sum() ,这样的聚合算子,只是在计算的时候,记录了每个key的中间值,每次累加,主要注意,这里一定要继承 RichMapFunction, 只有这样才可以使用 运行时上下文

列表状态

将状态表示为一组数据的列表
有时候我们需要一个列表来储存状态,代码其实和上面的值类型相似,不在说具体场景,介绍下代码如何使用

public static class MyCountMapper1 extends RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> {

        // 值类型状态
        // private ValueState<Integer> keyCountState;

        // 列表类型
        private ListState<Integer> listState;

        @Override
        public void open(Configuration parameters) throws Exception {
            // keyCountState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("key-count", Integer.class));
            listState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("my-list", Integer.class));
        }

        @Override
        public void close() throws Exception {
            super.close();
        }

        @Override
        public Tuple2<String, Long> map(Tuple2<String, Long> s) throws Exception {
//            Integer count = keyCountState.value();   // 获取这个key的state
//            if (Objects.isNull(count)) {
//                count = 0;
//            }
//            count ++;     // 次数 + 1
//            keyCountState.update(count);              // 更新状态
//            return Tuple2.of(s.f0, (long)count);
            Iterable<Integer> integers = listState.get();  // 获取状态
            listState.add(1);   // 追加操作
            listState.update(Lists.newArrayList());    // 更新操作
            return null;
        }
    }

此段代码只是演示,没有实际意义

映射状态

将状态表示为一组 key -value 对
其实与上面的类似,不在介绍

聚合状态 (Reducing state & Aggregating state)

将状态表示为一个用于聚合操作的列表 ,不在详细介绍

状态后端 (State Backends)

状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就是 状态后端
状态后端主要负责: 本地的状态管理,将检查点(checkpoint)的状态写入远程存储

状态后端类型

MemoryStateBackend

内存级的状态后端,会将键控状态作为内存中的对象进行管理

FsStateBackend

checkpoint 存到远侧还能管的持久化文件系统,而本地状态和 MemoryStateBackend 一致

RocksDBStatebackend

将所有状态序列化后,存入本地的RocksDB 中存储

编程使用

在初始化话环境是,手动设定

env.setStateBackend(new MemoryStateBackend())  
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • [TOC] 一、前言 有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,然后在新流入...
    w1992wishes阅读 6,844评论 0 6
  • 前言 在传统的批处理中,数据划分为一个个batch,然后每一个Task去处理一个batch。一个批次的数据通过计算...
    Rex_2013阅读 3,051评论 0 4
  • 1.流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用...
    安申阅读 2,147评论 0 1
  • 推荐指数: 6.0 书籍主旨关键词:特权、焦点、注意力、语言联想、情景联想 观点: 1.统计学现在叫数据分析,社会...
    Jenaral阅读 5,753评论 0 5
  • 昨天,在回家的路上,坐在车里悠哉悠哉地看着三毛的《撒哈拉沙漠的故事》,我被里面的内容深深吸引住了,尽管上学时...
    夜阑晓语阅读 3,833评论 2 9