聊聊flink的JobManagerGateway

本文主要研究一下flink的JobManagerGateway

RestfulGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java

public interface RestfulGateway extends RpcGateway {

    CompletableFuture<Acknowledge> cancelJob(JobID jobId, @RpcTimeout Time timeout);

    CompletableFuture<Acknowledge> stopJob(JobID jobId, @RpcTimeout Time timeout);

    CompletableFuture<String> requestRestAddress(@RpcTimeout  Time timeout);

    CompletableFuture<? extends AccessExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout);

    CompletableFuture<JobResult> requestJobResult(JobID jobId, @RpcTimeout Time timeout);

    CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(
        @RpcTimeout Time timeout);

    CompletableFuture<ClusterOverview> requestClusterOverview(@RpcTimeout Time timeout);

    CompletableFuture<Collection<String>> requestMetricQueryServicePaths(@RpcTimeout Time timeout);

    CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);

    default CompletableFuture<String> triggerSavepoint(
            JobID jobId,
            String targetDirectory,
            boolean cancelJob,
            @RpcTimeout Time timeout) {
        throw new UnsupportedOperationException();
    }

    default CompletableFuture<Acknowledge> disposeSavepoint(
            final String savepointPath,
            @RpcTimeout final Time timeout) {
        throw new UnsupportedOperationException();
    }

    default CompletableFuture<JobStatus> requestJobStatus(
            JobID jobId,
            @RpcTimeout Time timeout) {
        throw new UnsupportedOperationException();
    }

    default CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(
            JobID jobId,
            JobVertexID jobVertexId) {
        throw new UnsupportedOperationException();
    }

    default CompletableFuture<Acknowledge> rescaleJob(
            JobID jobId,
            int newParallelism,
            RescalingBehaviour rescalingBehaviour,
            @RpcTimeout Time timeout) {
        throw new UnsupportedOperationException();
    }

    default CompletableFuture<Acknowledge> shutDownCluster() {
        throw new UnsupportedOperationException();
    }
}
  • RestfulGateway接口继承了RpcGateway接口,它定义了cancelJob、stopJob、requestRestAddress、requestJob、requestJobResult、requestMultipleJobDetails、requestClusterOverview、requestMetricQueryServicePaths、requestTaskManagerMetricQueryServicePaths方法;另外提供了triggerSavepoint、disposeSavepoint、requestJobStatus、requestOperatorBackPressureStats、rescaleJob、shutDownCluster这几个默认方法,其实现均抛出UnsupportedOperationException异常

JobManagerGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java

public interface JobManagerGateway extends RestfulGateway {

    CompletableFuture<Integer> requestBlobServerPort(Time timeout);

    CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout);

    CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, @Nullable String savepointPath, Time timeout);

    CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout);

    CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout);

    CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout);

    CompletableFuture<Optional<Instance>> requestTaskManagerInstance(ResourceID resourceId, Time timeout);

    CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout);

    CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout);
}
  • JobManagerGateway接口继承了RestfulGateway接口,它定义了requestBlobServerPort、submitJob、cancelJobWithSavepoint、cancelJob、stopJob、requestClassloadingProps、requestTaskManagerInstance、requestTaskManagerInstances、requestJobsOverview方法;它有一个实现类是AkkaJobManagerGateway

AkkaJobManagerGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java

public class AkkaJobManagerGateway implements JobManagerGateway {

    private final ActorGateway jobManagerGateway;
    private final String hostname;

    public AkkaJobManagerGateway(ActorGateway jobManagerGateway) {
        this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);

        final Option<String> optHostname = jobManagerGateway.actor().path().address().host();
        hostname = optHostname.isDefined() ? optHostname.get() : "localhost";
    }

    @Override
    public String getAddress() {
        return jobManagerGateway.path();
    }

    @Override
    public String getHostname() {
        return hostname;
    }

    @Override
    public CompletableFuture<Integer> requestBlobServerPort(Time timeout) {
        return FutureUtils.toJava(
            jobManagerGateway
                .ask(JobManagerMessages.getRequestBlobManagerPort(), FutureUtils.toFiniteDuration(timeout))
                .mapTo(ClassTag$.MODULE$.apply(Integer.class)));
    }

    //--------------------------------------------------------------------------------
    // Job control
    //--------------------------------------------------------------------------------

    @Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout) {
        return FutureUtils
            .toJava(
                jobManagerGateway.ask(
                    new JobManagerMessages.SubmitJob(
                        jobGraph,
                        listeningBehaviour),
                    FutureUtils.toFiniteDuration(timeout)))
            .thenApply(
                (Object response) -> {
                    if (response instanceof JobManagerMessages.JobSubmitSuccess) {
                        JobManagerMessages.JobSubmitSuccess success = ((JobManagerMessages.JobSubmitSuccess) response);

                        if (Objects.equals(success.jobId(), jobGraph.getJobID())) {
                            return Acknowledge.get();
                        } else {
                            throw new CompletionException(new FlinkException("JobManager responded for wrong Job. This Job: " +
                                jobGraph.getJobID() + ", response: " + success.jobId()));
                        }
                    } else if (response instanceof JobManagerMessages.JobResultFailure) {
                        JobManagerMessages.JobResultFailure failure = ((JobManagerMessages.JobResultFailure) response);

                        throw new CompletionException(new FlinkException("Job submission failed.", failure.cause()));
                    } else {
                        throw new CompletionException(new FlinkException("Unknown response to SubmitJob message: " + response + '.'));
                    }
                }
            );
    }

    @Override
    public CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, String savepointPath, Time timeout) {
        CompletableFuture<JobManagerMessages.CancellationResponse> cancellationFuture = FutureUtils.toJava(
            jobManagerGateway
                .ask(new JobManagerMessages.CancelJobWithSavepoint(jobId, savepointPath), FutureUtils.toFiniteDuration(timeout))
                .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));

        return cancellationFuture.thenApply(
            (JobManagerMessages.CancellationResponse response) -> {
                if (response instanceof JobManagerMessages.CancellationSuccess) {
                    return ((JobManagerMessages.CancellationSuccess) response).savepointPath();
                } else {
                    throw new CompletionException(new FlinkException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause()));
                }
            });
    }

    @Override
    public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
        CompletableFuture<JobManagerMessages.CancellationResponse> responseFuture = FutureUtils.toJava(
            jobManagerGateway
                .ask(new JobManagerMessages.CancelJob(jobId), FutureUtils.toFiniteDuration(timeout))
                .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));

        return responseFuture.thenApply(
            (JobManagerMessages.CancellationResponse response) -> {
                if (response instanceof JobManagerMessages.CancellationSuccess) {
                    return Acknowledge.get();
                } else {
                    throw new CompletionException(new FlinkException("Cancel job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) response).cause()));
                }
            });
    }

    @Override
    public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) {
        CompletableFuture<JobManagerMessages.StoppingResponse> responseFuture = FutureUtils.toJava(
            jobManagerGateway
                .ask(new JobManagerMessages.StopJob(jobId), FutureUtils.toFiniteDuration(timeout))
                .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.StoppingResponse.class)));

        return responseFuture.thenApply(
            (JobManagerMessages.StoppingResponse response) -> {
                if (response instanceof JobManagerMessages.StoppingSuccess) {
                    return Acknowledge.get();
                } else {
                    throw new CompletionException(new FlinkException("Stop job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) response).cause()));
                }
            });
    }

    //--------------------------------------------------------------------------------
    // JobManager information
    //--------------------------------------------------------------------------------

    @Override
    public CompletableFuture<Optional<Instance>> requestTaskManagerInstance(ResourceID resourceId, Time timeout) {
        return FutureUtils.toJava(
            jobManagerGateway
                .ask(new JobManagerMessages.RequestTaskManagerInstance(resourceId), FutureUtils.toFiniteDuration(timeout))
                .mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class)))
            .thenApply(
                (JobManagerMessages.TaskManagerInstance taskManagerResponse) -> {
                    if (taskManagerResponse.instance().isDefined()) {
                        return Optional.of(taskManagerResponse.instance().get());
                    } else {
                        return Optional.empty();
                    }
                });
    }

    @Override
    public CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout) {
        CompletableFuture<JobManagerMessages.RegisteredTaskManagers> taskManagersFuture = FutureUtils.toJava(
            jobManagerGateway
                .ask(JobManagerMessages.getRequestRegisteredTaskManagers(), FutureUtils.toFiniteDuration(timeout))
                .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.RegisteredTaskManagers.class)));

        return taskManagersFuture.thenApply(
            JobManagerMessages.RegisteredTaskManagers::asJavaCollection);
    }

    @Override
    public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout) {
        return FutureUtils
            .toJava(jobManagerGateway
                .ask(
                    new JobManagerMessages.RequestClassloadingProps(jobId),
                    FutureUtils.toFiniteDuration(timeout)))
            .thenApply(
                (Object response) -> {
                    if (response instanceof JobManagerMessages.ClassloadingProps) {
                        return Optional.of(((JobManagerMessages.ClassloadingProps) response));
                    } else if (response instanceof JobManagerMessages.JobNotFound) {
                        return Optional.empty();
                    } else {
                        throw new CompletionException(new FlinkException("Unknown response: " + response + '.'));
                    }
                });
    }

    @Override
    public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
        return FutureUtils.toJava(
            jobManagerGateway
                .ask(new RequestJobDetails(true, true), FutureUtils.toFiniteDuration(timeout))
                .mapTo(ClassTag$.MODULE$.apply(MultipleJobsDetails.class)));
    }

    @Override
    public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
        CompletableFuture<JobManagerMessages.JobResponse> jobResponseFuture = FutureUtils.toJava(
            jobManagerGateway
                .ask(new JobManagerMessages.RequestJob(jobId), FutureUtils.toFiniteDuration(timeout))
                .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobResponse.class)));

        return jobResponseFuture.thenApply(
            (JobManagerMessages.JobResponse jobResponse) -> {
                if (jobResponse instanceof JobManagerMessages.JobFound) {
                    return ((JobManagerMessages.JobFound) jobResponse).executionGraph();
                } else {
                    throw new CompletionException(new FlinkJobNotFoundException(jobId));
                }
            });
    }

    @Override
    public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
        return requestJob(jobId, timeout).thenApply(JobResult::createFrom);
    }

    @Override
    public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
        return FutureUtils.toJava(
            jobManagerGateway
                .ask(RequestStatusOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
                .mapTo(ClassTag$.MODULE$.apply(ClusterOverview.class)));
    }

    @Override
    public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
        final String jobManagerPath = getAddress();
        final String jobManagerMetricQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;

        return CompletableFuture.completedFuture(
            Collections.singleton(jobManagerMetricQueryServicePath));
    }

    @Override
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
        return requestTaskManagerInstances(timeout)
            .thenApply(
                (Collection<Instance> instances) ->
                    instances
                        .stream()
                        .map(
                            (Instance instance) -> {
                                final String taskManagerAddress = instance.getTaskManagerGateway().getAddress();
                                final String taskManagerMetricQuerServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
                                    MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + instance.getTaskManagerID().getResourceIdString();

                                return Tuple2.of(instance.getTaskManagerID(), taskManagerMetricQuerServicePath);
                            })
                        .collect(Collectors.toList()));
    }

    @Override
    public CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout) {
        return FutureUtils.toJava(
            jobManagerGateway
                .ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
                .mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusOverview.class)));
    }

    @Override
    public CompletableFuture<String> requestRestAddress(Time timeout) {
        return FutureUtils.toJava(
            jobManagerGateway
                .ask(JobManagerMessages.getRequestRestAddress(), FutureUtils.toFiniteDuration(timeout))
                .mapTo(ClassTag$.MODULE$.apply(String.class)));
    }
}
  • AkkaJobManagerGateway实现了JobManagerGateway接口,其构造器要求传入jobManagerGateway
  • requestBlobServerPort方法传递RequestBlobManagerPort消息;submitJob方法传递SubmitJob消息;cancelJobWithSavepoint方法传递CancelJobWithSavepoint消息;cancelJob方法传递CancelJob消息;stopJob方法传递StopJob消息
  • requestTaskManagerInstance方法传递RequestTaskManagerInstance消息;requestTaskManagerInstances方法传递RequestRegisteredTaskManagers消息;requestClassloadingProps方法传递RequestClassloadingProps消息;requestMultipleJobDetails方法传递RequestJobDetails消息;requestJob方法传递RequestJob消息;requestClusterOverview方法传递RequestStatusOverview消息;requestJobsOverview方法传递RequestJobsWithIDsOverview消息;requestRestAddress方法传递RequestRestAddress消息

小结

  • RestfulGateway接口继承了RpcGateway接口,它定义了cancelJob、stopJob、requestRestAddress、requestJob、requestJobResult、requestMultipleJobDetails、requestClusterOverview、requestMetricQueryServicePaths、requestTaskManagerMetricQueryServicePaths方法;另外提供了triggerSavepoint、disposeSavepoint、requestJobStatus、requestOperatorBackPressureStats、rescaleJob、shutDownCluster这几个默认方法,其实现均抛出UnsupportedOperationException异常
  • JobManagerGateway接口继承了RestfulGateway接口,它定义了requestBlobServerPort、submitJob、cancelJobWithSavepoint、cancelJob、stopJob、requestClassloadingProps、requestTaskManagerInstance、requestTaskManagerInstances、requestJobsOverview方法;它有一个实现类是AkkaJobManagerGateway
  • AkkaJobManagerGateway实现了JobManagerGateway接口,其构造器要求传入jobManagerGateway;其大部分方法的实现均通过jobManagerGateway来传递消息,其中大量使用了JobManagerMessages里头定义的消息对象

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,376评论 6 491
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,126评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,966评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,432评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,519评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,792评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,933评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,701评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,143评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,488评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,626评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,292评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,896评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,742评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,977评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,324评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,494评论 2 348

推荐阅读更多精彩内容