序
本文主要研究一下flink的JobManagerGateway
RestfulGateway
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
public interface RestfulGateway extends RpcGateway {
CompletableFuture<Acknowledge> cancelJob(JobID jobId, @RpcTimeout Time timeout);
CompletableFuture<Acknowledge> stopJob(JobID jobId, @RpcTimeout Time timeout);
CompletableFuture<String> requestRestAddress(@RpcTimeout Time timeout);
CompletableFuture<? extends AccessExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout);
CompletableFuture<JobResult> requestJobResult(JobID jobId, @RpcTimeout Time timeout);
CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(
@RpcTimeout Time timeout);
CompletableFuture<ClusterOverview> requestClusterOverview(@RpcTimeout Time timeout);
CompletableFuture<Collection<String>> requestMetricQueryServicePaths(@RpcTimeout Time timeout);
CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
default CompletableFuture<String> triggerSavepoint(
JobID jobId,
String targetDirectory,
boolean cancelJob,
@RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
default CompletableFuture<Acknowledge> disposeSavepoint(
final String savepointPath,
@RpcTimeout final Time timeout) {
throw new UnsupportedOperationException();
}
default CompletableFuture<JobStatus> requestJobStatus(
JobID jobId,
@RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
default CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(
JobID jobId,
JobVertexID jobVertexId) {
throw new UnsupportedOperationException();
}
default CompletableFuture<Acknowledge> rescaleJob(
JobID jobId,
int newParallelism,
RescalingBehaviour rescalingBehaviour,
@RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
default CompletableFuture<Acknowledge> shutDownCluster() {
throw new UnsupportedOperationException();
}
}
- RestfulGateway接口继承了RpcGateway接口,它定义了cancelJob、stopJob、requestRestAddress、requestJob、requestJobResult、requestMultipleJobDetails、requestClusterOverview、requestMetricQueryServicePaths、requestTaskManagerMetricQueryServicePaths方法;另外提供了triggerSavepoint、disposeSavepoint、requestJobStatus、requestOperatorBackPressureStats、rescaleJob、shutDownCluster这几个默认方法,其实现均抛出UnsupportedOperationException异常
JobManagerGateway
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
public interface JobManagerGateway extends RestfulGateway {
CompletableFuture<Integer> requestBlobServerPort(Time timeout);
CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout);
CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, @Nullable String savepointPath, Time timeout);
CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout);
CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout);
CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout);
CompletableFuture<Optional<Instance>> requestTaskManagerInstance(ResourceID resourceId, Time timeout);
CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout);
CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout);
}
- JobManagerGateway接口继承了RestfulGateway接口,它定义了requestBlobServerPort、submitJob、cancelJobWithSavepoint、cancelJob、stopJob、requestClassloadingProps、requestTaskManagerInstance、requestTaskManagerInstances、requestJobsOverview方法;它有一个实现类是AkkaJobManagerGateway
AkkaJobManagerGateway
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
public class AkkaJobManagerGateway implements JobManagerGateway {
private final ActorGateway jobManagerGateway;
private final String hostname;
public AkkaJobManagerGateway(ActorGateway jobManagerGateway) {
this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);
final Option<String> optHostname = jobManagerGateway.actor().path().address().host();
hostname = optHostname.isDefined() ? optHostname.get() : "localhost";
}
@Override
public String getAddress() {
return jobManagerGateway.path();
}
@Override
public String getHostname() {
return hostname;
}
@Override
public CompletableFuture<Integer> requestBlobServerPort(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(JobManagerMessages.getRequestBlobManagerPort(), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(Integer.class)));
}
//--------------------------------------------------------------------------------
// Job control
//--------------------------------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout) {
return FutureUtils
.toJava(
jobManagerGateway.ask(
new JobManagerMessages.SubmitJob(
jobGraph,
listeningBehaviour),
FutureUtils.toFiniteDuration(timeout)))
.thenApply(
(Object response) -> {
if (response instanceof JobManagerMessages.JobSubmitSuccess) {
JobManagerMessages.JobSubmitSuccess success = ((JobManagerMessages.JobSubmitSuccess) response);
if (Objects.equals(success.jobId(), jobGraph.getJobID())) {
return Acknowledge.get();
} else {
throw new CompletionException(new FlinkException("JobManager responded for wrong Job. This Job: " +
jobGraph.getJobID() + ", response: " + success.jobId()));
}
} else if (response instanceof JobManagerMessages.JobResultFailure) {
JobManagerMessages.JobResultFailure failure = ((JobManagerMessages.JobResultFailure) response);
throw new CompletionException(new FlinkException("Job submission failed.", failure.cause()));
} else {
throw new CompletionException(new FlinkException("Unknown response to SubmitJob message: " + response + '.'));
}
}
);
}
@Override
public CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, String savepointPath, Time timeout) {
CompletableFuture<JobManagerMessages.CancellationResponse> cancellationFuture = FutureUtils.toJava(
jobManagerGateway
.ask(new JobManagerMessages.CancelJobWithSavepoint(jobId, savepointPath), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));
return cancellationFuture.thenApply(
(JobManagerMessages.CancellationResponse response) -> {
if (response instanceof JobManagerMessages.CancellationSuccess) {
return ((JobManagerMessages.CancellationSuccess) response).savepointPath();
} else {
throw new CompletionException(new FlinkException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause()));
}
});
}
@Override
public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) {
CompletableFuture<JobManagerMessages.CancellationResponse> responseFuture = FutureUtils.toJava(
jobManagerGateway
.ask(new JobManagerMessages.CancelJob(jobId), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));
return responseFuture.thenApply(
(JobManagerMessages.CancellationResponse response) -> {
if (response instanceof JobManagerMessages.CancellationSuccess) {
return Acknowledge.get();
} else {
throw new CompletionException(new FlinkException("Cancel job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) response).cause()));
}
});
}
@Override
public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) {
CompletableFuture<JobManagerMessages.StoppingResponse> responseFuture = FutureUtils.toJava(
jobManagerGateway
.ask(new JobManagerMessages.StopJob(jobId), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.StoppingResponse.class)));
return responseFuture.thenApply(
(JobManagerMessages.StoppingResponse response) -> {
if (response instanceof JobManagerMessages.StoppingSuccess) {
return Acknowledge.get();
} else {
throw new CompletionException(new FlinkException("Stop job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) response).cause()));
}
});
}
//--------------------------------------------------------------------------------
// JobManager information
//--------------------------------------------------------------------------------
@Override
public CompletableFuture<Optional<Instance>> requestTaskManagerInstance(ResourceID resourceId, Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(new JobManagerMessages.RequestTaskManagerInstance(resourceId), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class)))
.thenApply(
(JobManagerMessages.TaskManagerInstance taskManagerResponse) -> {
if (taskManagerResponse.instance().isDefined()) {
return Optional.of(taskManagerResponse.instance().get());
} else {
return Optional.empty();
}
});
}
@Override
public CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout) {
CompletableFuture<JobManagerMessages.RegisteredTaskManagers> taskManagersFuture = FutureUtils.toJava(
jobManagerGateway
.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.RegisteredTaskManagers.class)));
return taskManagersFuture.thenApply(
JobManagerMessages.RegisteredTaskManagers::asJavaCollection);
}
@Override
public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout) {
return FutureUtils
.toJava(jobManagerGateway
.ask(
new JobManagerMessages.RequestClassloadingProps(jobId),
FutureUtils.toFiniteDuration(timeout)))
.thenApply(
(Object response) -> {
if (response instanceof JobManagerMessages.ClassloadingProps) {
return Optional.of(((JobManagerMessages.ClassloadingProps) response));
} else if (response instanceof JobManagerMessages.JobNotFound) {
return Optional.empty();
} else {
throw new CompletionException(new FlinkException("Unknown response: " + response + '.'));
}
});
}
@Override
public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(new RequestJobDetails(true, true), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(MultipleJobsDetails.class)));
}
@Override
public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
CompletableFuture<JobManagerMessages.JobResponse> jobResponseFuture = FutureUtils.toJava(
jobManagerGateway
.ask(new JobManagerMessages.RequestJob(jobId), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobResponse.class)));
return jobResponseFuture.thenApply(
(JobManagerMessages.JobResponse jobResponse) -> {
if (jobResponse instanceof JobManagerMessages.JobFound) {
return ((JobManagerMessages.JobFound) jobResponse).executionGraph();
} else {
throw new CompletionException(new FlinkJobNotFoundException(jobId));
}
});
}
@Override
public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {
return requestJob(jobId, timeout).thenApply(JobResult::createFrom);
}
@Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(RequestStatusOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(ClusterOverview.class)));
}
@Override
public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
final String jobManagerPath = getAddress();
final String jobManagerMetricQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
return CompletableFuture.completedFuture(
Collections.singleton(jobManagerMetricQueryServicePath));
}
@Override
public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
return requestTaskManagerInstances(timeout)
.thenApply(
(Collection<Instance> instances) ->
instances
.stream()
.map(
(Instance instance) -> {
final String taskManagerAddress = instance.getTaskManagerGateway().getAddress();
final String taskManagerMetricQuerServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + instance.getTaskManagerID().getResourceIdString();
return Tuple2.of(instance.getTaskManagerID(), taskManagerMetricQuerServicePath);
})
.collect(Collectors.toList()));
}
@Override
public CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusOverview.class)));
}
@Override
public CompletableFuture<String> requestRestAddress(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(JobManagerMessages.getRequestRestAddress(), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(String.class)));
}
}
- AkkaJobManagerGateway实现了JobManagerGateway接口,其构造器要求传入jobManagerGateway
- requestBlobServerPort方法传递RequestBlobManagerPort消息;submitJob方法传递SubmitJob消息;cancelJobWithSavepoint方法传递CancelJobWithSavepoint消息;cancelJob方法传递CancelJob消息;stopJob方法传递StopJob消息
- requestTaskManagerInstance方法传递RequestTaskManagerInstance消息;requestTaskManagerInstances方法传递RequestRegisteredTaskManagers消息;requestClassloadingProps方法传递RequestClassloadingProps消息;requestMultipleJobDetails方法传递RequestJobDetails消息;requestJob方法传递RequestJob消息;requestClusterOverview方法传递RequestStatusOverview消息;requestJobsOverview方法传递RequestJobsWithIDsOverview消息;requestRestAddress方法传递RequestRestAddress消息
小结
- RestfulGateway接口继承了RpcGateway接口,它定义了cancelJob、stopJob、requestRestAddress、requestJob、requestJobResult、requestMultipleJobDetails、requestClusterOverview、requestMetricQueryServicePaths、requestTaskManagerMetricQueryServicePaths方法;另外提供了triggerSavepoint、disposeSavepoint、requestJobStatus、requestOperatorBackPressureStats、rescaleJob、shutDownCluster这几个默认方法,其实现均抛出UnsupportedOperationException异常
- JobManagerGateway接口继承了RestfulGateway接口,它定义了requestBlobServerPort、submitJob、cancelJobWithSavepoint、cancelJob、stopJob、requestClassloadingProps、requestTaskManagerInstance、requestTaskManagerInstances、requestJobsOverview方法;它有一个实现类是AkkaJobManagerGateway
- AkkaJobManagerGateway实现了JobManagerGateway接口,其构造器要求传入jobManagerGateway;其大部分方法的实现均通过jobManagerGateway来传递消息,其中大量使用了JobManagerMessages里头定义的消息对象