flink1.8 基于Zookeeper的HA源码分析

Zookeeper HA相关配置

## 使用zk做HA
high-availability: zookeeper
## zk地址
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
## flink在zk下的工作路径
high-availability.zookeeper.path.root: /flink
## 任务所在的HA路径
high-availability.cluster-id: /default
## 保存元数据到文件系统
high-availability.storageDir: hdfs:///flink/recovery

## --任务运行在YARN上的配置--
## applicationMaster重试的次数,默认为1,当application master失败的时候,该任务不会重启。
## 设置一个比较大的值的话,yarn会尝试重启applicationMaster。
yarn.application-attempts: 10
## flink是否应该重新分配失败的taskmanager容器。默认是true。
yarn.reallocate-failed:true
## applicationMaster可以接受的容器最大失败次数,达到这个参数,就会认为yarn job失败。
## 默认这个次数和初始化请求的taskmanager数量相等(-n 参数指定的)。
yarn.maximum-failed-containers:1


flink使用Zookeeper做HA

flink的ResourceManager、Dispatcher、JobManager、WebServer组件都需要高可用保证,同时flink高可用还需要持久化checkpoint的元数据信息,保留最近一次已经完成的checkpoint等工作,其中最重要的就是组件的leader选举、leader状态跟踪。本次抽取出Flink使用zk实现leader选举、leader状态跟踪代码,学习下flink是如何使用curator的。类之间的关系如下:


ZooKeeperHaServices是HighAvailabilityServices基于zookeeper的实现,通过使用ZooKeeperUtils类来创建组件的LeaderRetrievalService以及LeaderElectionService。

LeaderRetrievalService用来跟踪leader的变化,当发现leader地址变化时,要通知依赖它的组件去依赖新的leader。比如getResourceManagerLeaderRetriever方法,flink会监听zk的/leader/resource_manager_lock节点内容变化,内容是rm的leader地址和leaderUUID,而taskmanger调用该服务的start方法传递了一个LeaderRetrievalListener。如果节点内容发生变化,意味着rm的leader地址发生变化,那么的LeaderRetrievalListener的notifyLeaderAddress就会通知taskmanger去新的ResourceManager地址进行注册。zk实现该功能使用的是curator的NodeCache并重写了nodeChanged方法。

LeaderElectionService用来进行leader选举工作,当节点成为leader后会调用LeaderContender的grantLeadership方法。以ResourceManagerLeaderElection为例,flink会在zk的/leaderlatch/resource_manager_lock路径下创建临时节点,创建成功的rm节点成为leader触发rm的grantLeadership,最终将当前地址和UUID写入/leader/resource_manager_lock中,这样就触发了LeaderRetrievalService服务。zk实现leader选举使用的是curator的LeaderLatch并重写了isLeader和notLeader方法。同时使用NodeCache监听/leader/resource_manager_lock内容变化,确保新leader地址和UUID成功写入节点。

LeaderRetrievalListener对LeaderRetrievalService的leader地址变化做出响应,通过notifyLeaderAddress传递新leader地址。

LeaderContender对LeaderElectionService的节点角色发生变化做出响应,通过grantLeadership和revokeLeadership进行leader的授权和撤销工作。

一个集群目录下的zk结构如下图所示:


zk文件系统目录

flink相关源码

简单的走一下流程,看看集群启动时是如何创建ZooKeeperHaServices的。

集群启动入口ClusterEntrypoint
  • 根据集群的部署模式session or perjob由对应的子类调用ClusterEntrypoint的startCluster方法启动集群,接着会先调用initializeServices方法,启动集群相关的组件信息。这里只看启动haServices部分。
public void startCluster() throws ClusterEntrypointException {

  SecurityContext securityContext = installSecurityContext(configuration);
  securityContext.runSecured((Callable<Void>) () -> {
        runCluster(configuration);
         return null;
  });
}

protected void initializeServices(Configuration configuration) {

    ioExecutor = Executors.newFixedThreadPool(
        Hardware.getNumberCPUCores(),
        new ExecutorThreadFactory("cluster-io"));
        
    haServices = createHaServices(configuration, ioExecutor);
    
    blobServer = new BlobServer(configuration, haServices.createBlobStore());
    blobServer.start();
        
    }
}
  • 根据high-availability配置创建ZooKeeperHaServices,默认情况下为NONE。
protected HighAvailabilityServices createHaServices(
    Configuration configuration,
    Executor executor) throws Exception {
    //创建HA服务时不需要地址解析
    return HighAvailabilityServicesUtils.createHighAvailabilityServices(
        configuration,
        executor,
        HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
}


//根据传递的high-availability配置,选择创建哪种HA服务,默认为NONE
public static HighAvailabilityServices createHighAvailabilityServices(
    Configuration configuration,
    Executor executor,
    AddressResolution addressResolution) throws Exception {
    //获取high-availability配置 如:zookeeper
    HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);

    switch (highAvailabilityMode) {
        case NONE:
            final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);

            final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                hostnamePort.f0,
                hostnamePort.f1,
                JobMaster.JOB_MANAGER_NAME,
                addressResolution,
                configuration);
            final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                hostnamePort.f0,
                hostnamePort.f1,
                ResourceManager.RESOURCE_MANAGER_NAME,
                addressResolution,
                configuration);
            final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
                hostnamePort.f0,
                hostnamePort.f1,
                Dispatcher.DISPATCHER_NAME,
                addressResolution,
                configuration);

            final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),
                "%s must be set",
                RestOptions.ADDRESS.key());
            final int port = configuration.getInteger(RestOptions.PORT);
            final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);
            final String protocol = enableSSL ? "https://" : "http://";

            return new StandaloneHaServices(
                resourceManagerRpcUrl,
                dispatcherRpcUrl,
                jobManagerRpcUrl,
                String.format("%s%s:%s", protocol, address, port));
        case ZOOKEEPER:
            //元数据存储服务  我们通常使用FileSystemBlobStore 路径就是 high-availability.storageDir: hdfs:///flink/recovery
            BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
            //  使用 ZooKeeper做HA服务
            return new ZooKeeperHaServices(
                ZooKeeperUtils.startCuratorFramework(configuration),
                executor,
                configuration,
                blobStoreService);

        case FACTORY_CLASS:
            return createCustomHAServices(configuration, executor);

        default:
            throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
    }
}
  • ZooKeeperHaServices主要提供了创建LeaderRetrievalService和LeaderElectionService方法,并给出了各个服务组件使用的ZK节点名称。别看是以_lock结尾,这个节点名称既在leaderlatcher做leader选举的分布式锁产生的路径,又在leader目录下用来存放leader的地址信息。

private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";
private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
//  web展示服务
private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";

// 创建ResourceManagerLeaderRetriever,对RM的leader地址变化进行跟踪
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
    return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}

// 创建ResourceManagerLeaderElectionService,对RMleader挂掉后重新进行选举
public LeaderElectionService getResourceManagerLeaderElectionService() {
    return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
  • ZooKeeperUtils创建LeaderRetrievalService流程。
  1. 接收curator客户端以及服务在zk下的节点路径,创建出ZooKeeperLeaderRetrievalService(ZKLRS)对象。
  2. ZKLRS这个对象就是对zk节点的内容进行了监听,当内容发生变化时,通知给通过start方法传递过来的LeaderRetrievalListener。
public void start(LeaderRetrievalListener listener) throws Exception {
    synchronized (lock) {
        //leader发生变化时,通知对应的LeaderRetrievalListener
        leaderListener = listener;
        // 异常时调用当前对象的unhandledError方法
        client.getUnhandledErrorListenable().addListener(this);
        // 使用NodeCache监听节点内容变化
        cache.getListenable().addListener(this);
        cache.start();
        
        // 对会话连接状态进行跟踪
        client.getConnectionStateListenable().addListener(connectionStateListener);

        running = true;
    }
}
  1. 通过重写nodeChanged方法,来获取Leader变更后的地址,并传递新的地址
public void nodeChanged() throws Exception {
    synchronized (lock) {
        if (running) {
            try {
                LOG.debug("Leader node has changed.");

                ChildData childData = cache.getCurrentData();

                String leaderAddress;
                UUID leaderSessionID;

                if (childData == null) {
                    leaderAddress = null;
                    leaderSessionID = null;
                } else {
                    byte[] data = childData.getData();

                    if (data == null || data.length == 0) {
                        leaderAddress = null;
                        leaderSessionID = null;
                    } else {
                        ByteArrayInputStream bais = new ByteArrayInputStream(data);
                        ObjectInputStream ois = new ObjectInputStream(bais);
                        // leader 地址
                        leaderAddress = ois.readUTF();
                        // leader uuid
                        leaderSessionID = (UUID) ois.readObject();
                    }
                }
                // leader 地址发生变化
                if (!(Objects.equals(leaderAddress, lastLeaderAddress) &&
                    Objects.equals(leaderSessionID, lastLeaderSessionID))) {

                    lastLeaderAddress = leaderAddress;
                    lastLeaderSessionID = leaderSessionID;
                    //  传递新的leaderAddress和leaderSessionID
                    leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
                }
            } catch (Exception e) {
                leaderListener.handleError(new Exception("Could not handle node changed event.", e));
                throw e;
            }
        } else {
            LOG.debug("Ignoring node change notification since the service has already been stopped.");
        }
    }
}
  • ZooKeeperUtils创建ZooKeeperLeaderElectionService流程。
  1. 传递leader所在的zk路径、选举时临时节点创建的zk路径。之所以要传递leader节点是要在新leader产生时,将新leader的地址和uuid写入。
public static ZooKeeperLeaderElectionService createLeaderElectionService(
        final CuratorFramework client,
        final Configuration configuration,
        final String pathSuffix) {
    // 在leaderlatch节点下进行选举
    final String latchPath = configuration.getString(
        HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;
     // leader节点    
    final String leaderPath = configuration.getString(
        HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;

    return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
}
  1. 通过调用start方法传递LeaderContender,并开启leader选举。
public void start(LeaderContender contender) throws Exception {

    synchronized (lock) {
        // 绑定异常处理监听器
        client.getUnhandledErrorListenable().addListener(this);
        // 传递Contender竞争者
        leaderContender = contender;
        
        //开启leader选举服务,成为leader的节点会触发isleader
        leaderLatch.addListener(this);
        leaderLatch.start();
        
        //监听leader节点内容变化
        cache.getListenable().addListener(this);
        cache.start();

        client.getConnectionStateListenable().addListener(listener);

        running = true;
    }
}
  1. 当某一Contender成为leader后,会触发grantLeadership传递新leader的uuid进行授权,并调用LeaderElectionService的confirmLeaderSessionID,将新leader地址写入leader节点。
public void confirmLeaderSessionID(UUID leaderSessionID) {
    // 是Leader
    if (leaderLatch.hasLeadership()) {
        // check if this is an old confirmation call
        synchronized (lock) {
            if (running) {
                if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
                    confirmedLeaderSessionID = leaderSessionID;
                    // 将confirmLeaderSessionID写到 leader目录下
                    writeLeaderInformation(confirmedLeaderSessionID);
                }
            }
        }
    } 
}
  1. 写入时会触发当前对象的nodeChanged方法,该方法用来确保新leader地址和uuid成功写入。
public void nodeChanged() throws Exception {
    try {
        // leaderSessionID is null if the leader contender has not yet confirmed the session ID
        if (leaderLatch.hasLeadership()) { // leader
            synchronized (lock) {
                if (running) {
                    // 当选为leader 已经被确认
                    if (confirmedLeaderSessionID != null) {
                        ChildData childData = cache.getCurrentData();
                        //  没写进去,再写一次
                        if (childData == null) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(
                                    "Writing leader information into empty node by {}.",
                                    leaderContender.getAddress());
                            }
                            writeLeaderInformation(confirmedLeaderSessionID);
                        } else {
                            byte[] data = childData.getData();

                            if (data == null || data.length == 0) {
                                // the data field seems to be empty, rewrite information
                                writeLeaderInformation(confirmedLeaderSessionID);
                            } else {
                                ByteArrayInputStream bais = new ByteArrayInputStream(data);
                                ObjectInputStream ois = new ObjectInputStream(bais);

                                String leaderAddress = ois.readUTF();
                                UUID leaderSessionID = (UUID) ois.readObject();

                                if (!leaderAddress.equals(this.leaderContender.getAddress()) ||
                                    (leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {
                                    writeLeaderInformation(confirmedLeaderSessionID);
                                }
                            }
                        }
                    }
                } else {
                    // leader未确认confirmedLeaderSessionID
                    LOG.debug("Ignoring node change notification since the service has already been stopped.");
                }
            }
        }
    } catch (Exception e) {
            ...
    }
}

writeLeaderInformation用来写入leader地址和uuid,写入时先判断leader节点是否由当前leader会话创建的,如果不是则删除后重写创建。

protected void writeLeaderInformation(UUID leaderSessionID) {
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(baos);
        //  leader 地址
        oos.writeUTF(leaderContender.getAddress());
        //  leader的 UUID
        oos.writeObject(leaderSessionID);

        oos.close();

        boolean dataWritten = false;

        while (!dataWritten && leaderLatch.hasLeadership()) {
            Stat stat = client.checkExists().forPath(leaderPath);

            if (stat != null) {
                long owner = stat.getEphemeralOwner();
                long sessionID = client.getZookeeperClient().getZooKeeper().getSessionId();
                //节点由当前会话创建
                if (owner == sessionID) {
                    try {
                        client.setData().forPath(leaderPath, baos.toByteArray());

                        dataWritten = true;
                    } catch (KeeperException.NoNodeException noNode) {
                        // node was deleted in the meantime
                    }
                } else {
                    try {
                        //  不是当前节点创建则先删除
                        client.delete().forPath(leaderPath);
                    } catch (KeeperException.NoNodeException noNode) {
                        // node was deleted in the meantime --> try again
                    }
                }
            } else {
                try {
                    client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
                            leaderPath,
                            baos.toByteArray());

                    dataWritten = true;
                } catch (KeeperException.NodeExistsException nodeExists) {
                    // node has been created in the meantime --> try again
                }
            }
        }
    } 
}

本次学习了flink如何使用curator来操作zk节点,实现leader选举和leader状态跟踪。LeaderRetrievalListener和LeaderContender两个接口更像是这一部分功能的输入和输出,来跟踪leader的变化情况。而中间部分对zk节点的操作和状态监听,则可以抽取出来在自己的项目中使用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容

  • 摘要 要学习系统构架,ZooKeeper (下文简称zk)是无法绕开的开源技术。大型网站后台成百上千的分布式服务节...
    老吴学技术阅读 1,329评论 0 4
  • ... 一、相关概念 中间件:为分布式系统提供协调服务的组件,如专门用于计算服务的机器就是一个计算型中间件,还有专...
    帅可儿妞阅读 473评论 0 0
  • 一、系统模型 1、数据模型 zk结构视图与unix的文件系统有点类似,但是没有目录和文件的相关概念。而是使用特有的...
    小manong阅读 2,128评论 0 3
  • 学习整理: zookeeper有哪些应用,HA热备、分布式锁、集中配置用到了zookeeper的什么? ZooKe...
    大佛爱读书阅读 230评论 0 0
  • 毕业一年多了,我们见面的次数屈指可数。匆匆一见,想说的话都没有说完,想时间过得慢一点。 我在哏都与...
    总有不期而遇的温暖阅读 251评论 0 0