【Flink on k8s】高可用的关键机制及configmap数据详解

1.高可用的关键机制

源码详解:DefaultCompletedCheckpointStore.addCheckpoint/tryRemoveCompletedCheckpoint
步骤 1:根据checkpointID获取checkpoint path
步骤 2:在s3 path写state数据,接着修改configmap的中checkpoint信息即flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader的checkpointID-0000000000000102688
步骤 3:把checkpoint信息放到队列里面,然后根据需要保留的completecheckpoint数量(集群配置state.checkpoints.num-retained),删除多余的completecheckpoint

public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
        implements CompletedCheckpointStore {
    
    // 主要是缓存completedCheckpoints的路径
    private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;

    @Override
    public void addCheckpoint(
            final CompletedCheckpoint checkpoint,
            CheckpointsCleaner checkpointsCleaner,
            Runnable postCleanup)
            throws Exception {
        // 省略...

        // 1.首先根据checkpointID获取checkpoint path
        final String path = completedCheckpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID());
        // 2.然后在s3 path写state数据,接着修改configmap的中checkpoint信息
        checkpointStateHandleStore.addAndLock(path, checkpoint);
        
        // 3.最后把checkpoint信息放到队列里面,然后根据需要保留的completecheckpoint数量
        completedCheckpoints.addLast(checkpoint);
        CheckpointSubsumeHelper.subsume(
                completedCheckpoints,
                maxNumberOfCheckpointsToRetain,
                completedCheckpoint ->
                        tryRemoveCompletedCheckpoint(
                                completedCheckpoint,
                                completedCheckpoint.shouldBeDiscardedOnSubsume(),
                                checkpointsCleaner,
                                postCleanup));
        // 省略...
    }

    

    private void tryRemoveCompletedCheckpoint(
            CompletedCheckpoint completedCheckpoint,
            boolean shouldDiscard,
            CheckpointsCleaner checkpointsCleaner,
            Runnable postCleanup)
            throws Exception {
        if (tryRemove(completedCheckpoint.getCheckpointID())) {
            checkpointsCleaner.cleanCheckpoint(
                    completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
        }
    }
}

2.高可用数据详解

2.1 高可用配置

① 采用 s3 作为状态后端

设置 s3 协议的文件路径作为状态后端即 s3://bucket01/flink/savepointss3://bucket01/flink/checkpoints,设置支持 s3 协议的集群即 s3.endpoints3.access-keys3.secret-key

② 基于 Kubernetes 设置高可用配置

high-availability 设置为 org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
kubernetes.namespace 是指 kubernetes 的 namespace,kubernetes.service-account 是指 kubernetes 的serviceaccount,high-availability.storageDir 采用 s3 地址,最后 kubernetes.cluster-id 是设置了高可用 configmap 的前缀,例如 flink-dispatcher-leader、flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader 等

$kubectl get cm |grep flink
flink-config                                              5      4d19h
flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader   4      24d
flink-dispatcher-leader                                    4      28d
flink-resourcemanager-leader                               2      28d
flink-restserver-leader                                    2      28d
$kubectl describe cm flink-config
Name:         flink-config
Namespace:    default
Labels:       <none>
Annotations:  <none>

Data
====
flink-conf.yaml:
----
省略...
#共享文件系统S3
s3.endpoint: http://service-minio:9000
s3.path.style.access: true
s3.access-key: admin
s3.secret-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#状态后端配置
state.backend: filesystem
state.checkpoints.dir: s3://bucket01/flink/checkpoints
state.savepoints.dir: s3://bucket01/flink/savepoints
#HA和k8s参数
kubernetes.namespace: default
kubernetes.cluster-id: flink
kubernetes.service-account: serviceaccount-flink
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://bucket01/flink/ha

2.2 集群 dispatcher 高可用数据

dispatcher 是管理作业的主节点,高可用数据主要有 dispatcher 主节点的地址非完成状态的作业状态和流图保存地址,其中流图保存地址是 Base64 编码的。如下所示,dispatcher 主节点是akka.tcp://flink@10.244.0.246:8123/user/rpc/dispatcher_1,作业 161511ce1fe78368bc659597e472fb7d 的状态是 Running ,其流图 jobGraph-161511ce1fe78368bc659597e472fb7d 保存在 s3://bucket01/flink/ha/default/submittedJobGraph307e2d6a5be8

说明:利用 OS 的 Base64 编解码工具,例如,编码是 echo "mmsc" | openssl base64 -e,解码是 echo "bW1zYwo=" | openssl base64 -d

$kubectl describe cm flink-dispatcher-leader
Name:         flink-dispatcher-leader
Namespace:    default
Labels:       app=flink
              configmap-type=high-availability
              type=flink-native-kubernetes
Annotations:  control-plane.alpha.kubernetes.io/leader:
                {"holderIdentity":"f7978fe5-962d-4037-aa23-19ff522afbff","leaseDuration":15.000000000,"acquireTime":"2022-05-19T15:09:39.272000Z","renewTi...

Data
====
runningJobsRegistry-161511ce1fe78368bc659597e472fb7d:
----
RUNNING
sessionId:
----
942d4a50-c31f-47fb-939b-94b14a1121fc
address:
----
akka.tcp://flink@10.244.0.246:8123/user/rpc/dispatcher_1
jobGraph-161511ce1fe78368bc659597e472fb7d:
----
rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAADcrNzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAN3MzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvc3VibWl0dGVkSm9iR3JhcGgzMDdlMmQ2YTViZTh4
Events:  <none>
$echo "rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAADcrNzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAN3MzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvc3VibWl0dGVkSm9iR3JhcGgzMDdlMmQ2YTViZTh4" | openssl base64 -d
▒▒sr;org.apache.flink.runtime.state.RetrievableStreamStateHandle▒U▒+LwrappedStreamStateHandlet2Lorg/apache/flink/runtime/state/StreamStateHandle;xpsr9org.apache.flink.runtime.state.filesystem.FileStateHandle▒u▒b▒J  stateSizefilePathtLorg/apache/flink/core/fs/Path;xpr▒srorg.apache.flink.core.fs.PathLuritLjava/net/URI;xpsr
java.net.URI▒x.C▒I▒LstringtLjava/lang/String;xpt7s3://bucket01/flink/ha/default/submittedJobGraph307e2d6a5be8

2.3 作业的 jobmanager 高可用数据

作业的高可用数据主要有 作业管理节点的地址当前作业的checkpoint 最新数据的保存地址,其中checkpoint 保存地址是 Base64 编码的。如下所示,作业管理节点是akka.tcp://flink@10.244.0.246:8123/user/rpc/jobmanager_2,该作业最新的 checkpoint 是 checkpointID-0000000000000102688,其保存地址是 s3://bucket01/flink/ha/default/completedCheckpointf07724c0946a

$kubectl describe cm flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader
Name:         flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader
Namespace:    default
Labels:       app=flink
              configmap-type=high-availability
              type=flink-native-kubernetes
Annotations:  control-plane.alpha.kubernetes.io/leader:
                {"holderIdentity":"f7978fe5-962d-4037-aa23-19ff522afbff","leaseDuration":15.000000000,"acquireTime":"2022-05-19T15:09:39.988000Z","renewTi...

Data
====
address:
----
akka.tcp://flink@10.244.0.246:8123/user/rpc/jobmanager_2
checkpointID-0000000000000102688:
----
rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAAAFyhzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAOXMzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvY29tcGxldGVkQ2hlY2twb2ludGYwNzcyNGMwOTQ2YXg=
counter:
----
102689
sessionId:
----
766ea025-af00-4b6b-8700-a80c9fa2a4e5
Events:  <none>
$echo "rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAAAFyhzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAOXMzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvY29tcGxldGVkQ2hlY2twb2ludGYwNzcyNGMwOTQ2YXg=" | openssl base64 -d
▒▒sr;org.apache.flink.runtime.state.RetrievableStreamStateHandle▒U▒+LwrappedStreamStateHandlet2Lorg/apache/flink/runtime/state/StreamStateHandle;xpsr9org.apache.flink.runtime.state.filesystem.FileStateHandle▒u▒b▒J  stateSizefilePathtLorg/apache/flink/core/fs/Path;xp(srorg.apache.flink.core.fs.PathLuritLjava/net/URI;xpsr
java.net.URI▒x.C▒I▒LstringtLjava/lang/String;xpt9s3://bucket01/flink/ha/default/completedCheckpointf07724c0946a
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,175评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,674评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,151评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,597评论 1 269
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,505评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,969评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,455评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,118评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,227评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,213评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,214评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,928评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,512评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,616评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,848评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,228评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,772评论 2 339

推荐阅读更多精彩内容