Keyed State实战

Flink提供了5种状态原语(state primitives):ValueState<T>ListState<T>ReducingState<T>AggregatingState<IN, OUT>MapState<UK, UV>。下面将通过样例讲解每个原语是如何使用的。

ValueState<T>

定义

This keeps a value that can be updated and retrieved (scoped to key of the input element as mentioned above, so there will possibly be one value for each key that the operation sees). The value can be set using update(T) and retrieved using T value().

说明

每个key会保留一个状态,可以通过update(T)更新,通过T value()获取状态的值。ValueState<T>接口只有这两个方法。

样例

业务场景

计算每个手机号在某个区域的驻留时长,一旦位置切换了,就重新开始计算驻留时长。

计算过程

每条数据的key为手机号,ValueState<Tuple2<String, Long>>保存这个手机最新的位置,以及第一次进入这个区域的时间。最新数据和状态中的区域进行比较,如果位置和上次一样,那么用当前时间剪去状态中的时间来计算这个手机号已经在这个区域驻留累多长时间。如果位置和状态中的不一样,说明位置发生了切换,那么更新状态值,驻留时间为0,重新开始计算。

代码传送门

public class StayTimeValueState {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //f0:手机号,f0:当前位置,f1:上报位置的时间
        KeyedStream<Tuple3<String, String, Long>, Tuple> keyedStream = env.addSource(new StateDataSource()).keyBy(0);
        keyedStream.map(new RichMapFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>>() {
            //f0:位置,f1:首次进入的时间
            private ValueState<Tuple2<String, Long>> stayAreaTime;
            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<Tuple2<String, Long>> descriptor =
                        new ValueStateDescriptor<>(
                                "stayAreaTime",
                                TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
                                }));
                stayAreaTime = getRuntimeContext().getState(descriptor);
            }

            @Override
            public Tuple3<String, String, Long> map(Tuple3<String, String, Long> value) throws Exception {
                Tuple2<String, Long> currentAreaTime = stayAreaTime.value();
                Tuple3<String, String, Long> result = new Tuple3<>();

                if (currentAreaTime != null && value.f1.equals(currentAreaTime.f0)){
                    result.setFields(value.f0, value.f1, value.f2 - currentAreaTime.f1);
                }else {
                    result.setFields(value.f0, value.f1, 0L);
                    stayAreaTime.update(new Tuple2<>(value.f1, value.f2));
                }

                return result;
            }
        }).print();

        env.execute("StayTimeValueState demo");
    }
}

结果分析

Sand data:(19911111111,A,1581402583622)
(19911111111,A,0)初始化,将位置和时间存入状态中
Sand data:(19911111111,B,1581402585627)
(19911111111,B,0)//位置和状态中存储的上一次位置不一致,发生了切换,更新状态中的位置和时间,重新计算
Sand data:(19911111111,B,1581402587631)
(19911111111,B,2004)//位置和状态中存储的上一次位置一样,驻留时间2004=1581402587631-1581402585627
Sand data:(19911111111,B,1581402589631)
(19911111111,B,4004)//位置没变,驻留时间4004=1581402589631-1581402585627
Sand data:(19911111111,A,1581402595644)
(19911111111,A,0)//位置和状态中存储的上一次位置不一致,发生了切换,更新状态中的位置和时间,重新计算

ListState<T>

定义

This keeps a list of elements. You can append elements and retrieve an Iterable over all currently stored elements. Elements are added using add(T) or addAll(List), the Iterable can be retrieved using Iterable get(). You can also override the existing list with update(List)

说明

每个key会保留一个List作为状态,这个list只能通过add(T)或者 addAll(List)进行追加新的元素,或者通过update(List)更新整个list。所以是不能更新之前list中的指定元素的值。

样例

业务场景

计算每个手机号在某个区域的驻留时长,位置如果切换了在回到原来的区域,驻留时长需要累加计算。例如某个人在A区域驻留了1分钟后去了B区域5分钟,又回来A区域这时候这个人在A区域的时间是累计时间1,而不像上面的例子从0开始计算。

计算过程

由于要计算每一次切换位置的驻留时长,所以状态中要记录手机切换位置的路径。上面的例子只能计算当前位置的驻留时间,一旦位置切换,就无法保留之前位置的驻留时长,所以基于上面的例子,再把每个位置的驻留时间存在一个list里面,这个list就相当于存储了这个手机号运行轨迹。

代码传送门

public class StayTimeListState {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //f0:手机号,f0:当前位置,f1:上报位置的时间
        KeyedStream<Tuple3<String, String, Long>, Tuple> keyedStream = env.addSource(new StateDataSource()).keyBy(0);

        keyedStream.map(new RichMapFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>>() {
            //当前区域的驻留时长, f0:上次位置,f1:上次的时间
            private ValueState<Tuple2<String, Long>> stayAreaTime;
            //历史轨迹每个区域的驻留时长,f0:位置,f1:驻留时间
            private ListState<Tuple2<String, Long>> stayAreaTimeHistory;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<Tuple2<String, Long>> descriptor =
                        new ValueStateDescriptor<>(
                                "stayAreaTime",
                                TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
                                }));
                stayAreaTime = getRuntimeContext().getState(descriptor);

                ListStateDescriptor<Tuple2<String, Long>> historyDescriptor =
                        new ListStateDescriptor<>(
                                "stayAreaTimeHistory",
                                TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
                                }));
                stayAreaTimeHistory = getRuntimeContext().getListState(historyDescriptor);
            }

            @Override
            public Tuple3<String, String, Long> map(Tuple3<String, String, Long> value) throws Exception {
                Tuple2<String, Long> currentAreaTime = stayAreaTime.value();
                List<Tuple2<String, Long>> currentStayAreaTimeHistory = Lists.newArrayList(stayAreaTimeHistory.get());
                long historyTime = 0L;

                //遍历历史数据计算所有当前区域的驻留时长
                for (Tuple2<String, Long> history : currentStayAreaTimeHistory) {
                    if (history.f0.equals(value.f1)){
                        historyTime = historyTime + history.f1;
                    }
                }

                Tuple3<String, String, Long> result = new Tuple3<>();

                if (currentAreaTime != null && value.f1.equals(currentAreaTime.f0)){
                    long sum = value.f2 - currentAreaTime.f1 + historyTime;
                    result.setFields(value.f0, value.f1, sum);
                    stayAreaTimeHistory.add(new Tuple2<>(value.f1, value.f2 - currentAreaTime.f1));
                }else {
                    result.setFields(value.f0, value.f1, historyTime);
                }

                stayAreaTime.update(new Tuple2<>(value.f1, value.f2));
                return result;
            }
        }).print();

        env.execute("StayTimeListState demo");
    }
}

结果分析

Sand data:(19911111111,A,1581475330555)
(19911111111,A,0)
Sand data:(19911111111,A,1581475332576)
(19911111111,A,2021)
Sand data:(19911111111,B,1581475334580)
(19911111111,B,0)
Sand data:(19911111111,B,1581475335582)
(19911111111,B,1002)
Sand data:(19911111111,B,1581475336586)
(19911111111,B,2006)
Sand data:(19911111111,B,1581475338589)
(19911111111,B,4009)
Sand data:(19911111111,B,1581475340592)
(19911111111,B,6012)
Sand data:(19911111111,B,1581475341596)
(19911111111,B,7016)
Sand data:(19911111111,B,1581475343597)
(19911111111,B,9017)
Sand data:(19911111111,A,1581475344600)
(19911111111,A,2021)//切换到A区域,累加曾经在A区域到驻留时长,而不是从0开始计算
Sand data:(19911111111,A,1581475345604)
(19911111111,A,3025)
Sand data:(19911111111,B,1581475347608)
(19911111111,B,9017)//切换到B区域从历史记录中累加
Sand data:(19911111111,A,1581475348612)
(19911111111,A,3025)

MapState<UK, UV>

定义

This keeps a list of mappings. You can put key-value pairs into the state and retrieve an Iterable over all currently stored mappings. Mappings are added using put(UK, UV) or putAll(Map<UK, UV>). The value associated with a user key can be retrieved using get(UK). The iterable views for mappings, keys and values can be retrieved using entries(), keys() and values() respectively.

说明

每个key会保留一个Map作为状态,重点是这个Map可以通过put(UK, UV)更新某个元素,通过get(UK)获取某个元素的值。

样例

业务场景

和上面的一样,计算累加驻留时间。

计算过程

和上面类似,只是将去过的每个区域和对应的累计驻留时间存在一个map中。

代码传送门

public class StayTimeMapState {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //f0:手机号,f0:当前位置,f1:上报位置的时间
        KeyedStream<Tuple3<String, String, Long>, Tuple> keyedStream = env.addSource(new StateDataSource()).keyBy(0);
        keyedStream.map(new RichMapFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>>() {
            //当前区域的驻留时长, f0:上次位置,f1:上次的时间
            private ValueState<Tuple2<String, Long>> stayAreaTime;
            //历史轨迹每个区域的驻留时长,f0:位置,f1:累计驻留时间
            private MapState<String, Long> stayAreaTimeHistory;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<Tuple2<String, Long>> descriptor =
                        new ValueStateDescriptor<>(
                                "stayAreaTime",
                                TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
                                }));
                stayAreaTime = getRuntimeContext().getState(descriptor);

                MapStateDescriptor<String, Long> historyDescriptor =
                        new MapStateDescriptor<>(
                                "stayAreaTimeHistory",
                                TypeInformation.of(new TypeHint<String>() {
                                }),
                                TypeInformation.of(new TypeHint<Long>() {
                                }));
                stayAreaTimeHistory = getRuntimeContext().getMapState(historyDescriptor);
            }

            @Override
            public Tuple3<String, String, Long> map(Tuple3<String, String, Long> value) throws Exception {
                Tuple2<String, Long> currentAreaTime = stayAreaTime.value();
                long historyTime = stayAreaTimeHistory.contains(value.f1) ? stayAreaTimeHistory.get(value.f1) : 0L;

                Tuple3<String, String, Long> result = new Tuple3<>();

                if (currentAreaTime != null && value.f1.equals(currentAreaTime.f0)){
                    long sum = value.f2 - currentAreaTime.f1 + historyTime;
                    result.setFields(value.f0, value.f1, sum);
                    stayAreaTimeHistory.put(value.f1, sum);
                }else {
                    result.setFields(value.f0, value.f1, historyTime);
                }

                stayAreaTime.update(new Tuple2<>(value.f1, value.f2));

                return result;
            }
        }).print();

        env.execute("StayTimeMapState demo");
    }
}

结果分析

Sand data:(19911111111,A,1581477337568)
(19911111111,A,0)
Sand data:(19911111111,A,1581477338589)
(19911111111,A,1021)
Sand data:(19911111111,B,1581477340593)
(19911111111,B,0)
Sand data:(19911111111,B,1581477341595)
(19911111111,B,1002)
Sand data:(19911111111,B,1581477343600)
(19911111111,B,3007)
Sand data:(19911111111,B,1581477344602)
(19911111111,B,4009)
Sand data:(19911111111,A,1581477346604)
(19911111111,A,1021)
Sand data:(19911111111,B,1581477347609)
(19911111111,B,4009)
Sand data:(19911111111,A,1581477349613)
(19911111111,A,1021)

效果和上节中的一致

小结

通过上面3个例子,可以对比一下ValueStateListStateMapState的区别与联系。ValueState是最基本的一个接口,里面存的值可以是一些基本类型,但是同样也可以存储List或者Map,那么这样是不是就同样实现ListStateMapState的功能了呢?理论上是可以的,但是主要的问题在于更新操作,对于ValueState接口只能更新value,也就是如果这个value是list或者map,那么每次都要更新整个map或者list,查询也是需要先获取list或者map再遍历。而ListState可以直接追加新的状态,MapState就更加方便的可以更新某个之前的状态属性值。

对于上面的例子,使用ListState会不断追加新的区域和驻留时间,如果数据量很大,那么这个list会非常大,而且之前的值也没有保留的必要,变得浪费存储空间。当然这个接口也提供了一个update(List<T>)方法更新整个list。然而上面的场景使用MapState<UK, UV>就比较方便,可以直接更新某个区域的累计驻留时长。

所以在真实也业务场景中,到底使用哪种接口,还是需要根据业务选择一个操作比较方便并且节省存储空间的高效接口。

ReducingState<T>

定义

This keeps a single value that represents the aggregation of all values added to the state. The interface is similar to ListState but elements added using add(T) are reduced to an aggregate using a specified ReduceFunction.

说明

每添加一个状态值,都会调用ReduceFunction方法计算一次。

样例

业务场景

计算每个手机号累计驻留时长。

计算过程

输入数据中包含每个手机号在某个区域的时长,通过ReduceFunction进行一次累加。

代码传送门

public class StayTimeReducingState {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //f0:手机号,f0:当前位置,f1:驻留时长
        DataStream<Tuple3<String, String, Long>> source = env.fromElements("19911111111,A,6", "19911111112,A,12", "19911111111,A,3"
                , "19911111112,A,6", "19911111111,A,12", "19911111112,A,9")
                .map(new MapFunction<String, Tuple3<String, String, Long>>() {
                    @Override
                    public Tuple3<String, String, Long> map(String value) throws Exception {
                        String[] v = value.split(",");
                        return new Tuple3(v[0], v[1], Long.valueOf(v[2]));
                    }
                });
        KeyedStream<Tuple3<String, String, Long>, Tuple> keyedStream = source.keyBy(0);

        keyedStream.map(new RichMapFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>>() {
            //累计驻留时长
            private ReducingState<Long> stayAreaTimeSum;
            @Override
            public void open(Configuration parameters) throws Exception {
                ReducingStateDescriptor<Long> sumDescriptor =
                        new ReducingStateDescriptor<>(
                                "stayAreaTimeSum",
                                new ReduceFunction<Long>() {
                                    @Override
                                    public Long reduce(Long value1, Long value2) throws Exception {
                                        return value1 + value2;
                                    }
                                },
                                TypeInformation.of(new TypeHint<Long>() {
                                }));
                stayAreaTimeSum = getRuntimeContext().getReducingState(sumDescriptor);
            }

            @Override
            public Tuple3<String, String, Long> map(Tuple3<String, String, Long> value) throws Exception {
                stayAreaTimeSum.add(value.f2);
                long timeSum = stayAreaTimeSum.get();
                return new Tuple3<>(value.f0, value.f1, timeSum);
            }
        }).print();

        env.execute("StayTimeReducingState demo");
    }
}

结果分析

(19911111111,A,6)
(19911111112,A,12)
(19911111111,A,9)//每个手机号累加计算
(19911111112,A,18)//每个手机号累加计算
(19911111111,A,21)
(19911111112,A,27)

AggregatingState<IN, OUT>

定义

This keeps a single value that represents the aggregation of all values added to the state. Contrary to ReducingState, the aggregate type may be different from the type of elements that are added to the state. The interface is the same as for ListState but elements added using add(IN) are aggregated using a specified AggregateFunction.

说明

AggregatingState也是可以实现ReducingState的功能,但是比它更加灵活和丰富,而且这个计算的输入和输出可以是不同类型。需要实现AggregateFunction接口。

样例

业务场景

计算每个手机号平均每次的累计驻留时长。

计算过程

累加驻留时长和次数进行求平均值

代码传送门

public class StayTimeAggregatingState {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //f0:手机号,f0:当前位置,f1:驻留时长
        DataStream<Tuple3<String, String, Long>> source = env.fromElements("19911111111,A,6", "19911111112,A,12", "19911111111,A,3"
                , "19911111112,A,6", "19911111111,A,12", "19911111112,A,9")
                .map(new MapFunction<String, Tuple3<String, String, Long>>() {
                    @Override
                    public Tuple3<String, String, Long> map(String value) throws Exception {
                        String[] v = value.split(",");
                        return new Tuple3(v[0], v[1], Long.valueOf(v[2]));
                    }
                });

        KeyedStream<Tuple3<String, String, Long>, Tuple> keyedStream = source.keyBy(0);

        keyedStream.map(new RichMapFunction<Tuple3<String, String, Long>, Tuple2<String, Double>>() {
            //输入为驻留时长,输出为平均驻留时长
            private AggregatingState<Long, Double> stayAreaTimeAgg;
            @Override
            public void open(Configuration parameters) throws Exception {
                AggregatingStateDescriptor<Long, AverageAccumulator, Double> aggDescriptor =
                        new AggregatingStateDescriptor<>(
                                "stayAreaTime",
                                new AggregateFunction<Long, AverageAccumulator, Double>() {

                                    @Override
                                    public AverageAccumulator createAccumulator() {
                                      //每个key创建一个
                                        return new AverageAccumulator();
                                    }

                                    @Override
                                    public AverageAccumulator add(Long value, AverageAccumulator acc) {
                                        acc.sum += value;
                                        acc.count++;
                                        return acc;
                                    }

                                    @Override
                                    public Double getResult(AverageAccumulator acc) {
                                        return acc.sum / (double) acc.count;
                                    }

                                    @Override
                                    public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
                                        a.count += b.count;
                                        a.sum += b.sum;
                                        return a;
                                    }
                                },
                                TypeInformation.of(new TypeHint<AverageAccumulator>() {
                                }));
                stayAreaTimeAgg = getRuntimeContext().getAggregatingState(aggDescriptor);
            }

            @Override
            public Tuple2<String, Double> map(Tuple3<String, String, Long> value) throws Exception {
                stayAreaTimeAgg.add(value.f2);
                return new Tuple2<>(value.f0, stayAreaTimeAgg.get());
            }
        }).print();

        env.execute("StayTimeAggregatingState demo");
    }

    private static class AverageAccumulator {
        private long count;
        private long sum;
    }
}

结果分析

(19911111111,6.0)
(19911111112,12.0)
(19911111111,4.5)//在上例的基础上取平均值
(19911111112,9.0)
(19911111111,7.0)
(19911111112,9.0)

小结

AggregatingState和ReducingState比较类似,都是可以进行一些聚合计算,避免了把所有明细数据都保留在状态中,只保留最终聚合的结果,节省了存储空间。而AggregatingState可以实现更加复杂的聚合操作。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容