流程
源码阅读
TransportBulkAction.java
//按分片对bulk请求项进行分组,创建不存在的索引并委托TransportShardBulkAction执行分片级的bulk操作
//协调节点流程:1.参数检查 2.处理pipeline 3.创建索引 4.自动生成id 5.合并请求 6.并行转发 7.等待全部响应 8.回复客户端
public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse>
TransportBulkAction.doExecute()
@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
boolean hasIndexRequestsWithPipelines = false;
final Metadata metadata = clusterService.state().getMetadata();
final Version minNodeVersion = clusterService.state().getNodes().getMinNodeVersion();
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
//把DocWriterRequest转换成IndexRequest
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
//每个索引请求都需要计算,因为此方法还会修改IndexRequest
boolean indexRequestHasPipeline = resolvePipelines(actionRequest, indexRequest, metadata);
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
}
//检查参数id
if (actionRequest instanceof IndexRequest) {
IndexRequest ir = (IndexRequest) actionRequest;
//版本小于7.5.0时,create操作不支持没有显式id的索引请求
ir.checkAutoIdWithOpTypeCreateSupportedByVersion(minNodeVersion);
if (ir.getAutoGeneratedTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
throw new IllegalArgumentException("autoGeneratedTimestamp should not be set externally");
}
}
}
//处理pipeline请求
if (hasIndexRequestsWithPipelines) {
//这个方法(doExecute)将被再次调用,但是会使用来自ingest节点处理的批量请求以及IngestService进行更新
//NOOP_PIPELINE_NAME处理每个请求。这确保了在第二次通过此方法时,不会采用此路径。
try {
if (Assertions.ENABLED) {
//是否所有非空请求都需要pipeline处理
final boolean arePipelinesResolved = bulkRequest.requests()
.stream()
.map(TransportBulkAction::getIndexWriteRequest)
.filter(Objects::nonNull)
.allMatch(IndexRequest::isPipelineResolved);
assert arePipelinesResolved : bulkRequest;
}
//如果本节点具备预处理资格,则进行处理
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, listener);
//如果本节点不具备预处理资格,则将请求随机转发到其他具备预处理资格的节点
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
} catch (Exception e) {
listener.onFailure(e);
}
return;
}
//自动创建索引
if (needToCheck()) {
//在开始之前,尝试创建bulk过程中需要的所有索引
//Step 1: 收集请求中的所有索引
final Set<String> indices = bulkRequest.requests.stream()
//删除请求不应该尝试创建索引(如果索引不存在),除非使用了external versioning
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.map(DocWriteRequest::index)
.collect(Collectors.toSet());
//Step 2: 过滤到不存在的索引,放入自动创建集合。同时,建立一个无法创建的索引的map,在尝试执行请求时使用
final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
Set<String> autoCreateIndices = new HashSet<>();
ClusterState state = clusterService.state();
for (String index : indices) {
boolean shouldAutoCreate;
try {
shouldAutoCreate = shouldAutoCreate(index, state);
} catch (IndexNotFoundException e) {
shouldAutoCreate = false;
indicesThatCannotBeCreated.put(index, e);
}
if (shouldAutoCreate) {
autoCreateIndices.add(index);
}
}
//Step 3: 创建所有不存在的索引(如果有的话)。在所有创建返回后执行bulk。
//创建索引请求被发送到Master节点,待收到全部创建请求的Response(无论成功还是失败的)之后,才进入下一个流程
//Master节点执行完创建索引流程,将新的clusterState发布完毕才会返回
//默认情况下,Master发布clusterState的Request收到半数以上的节点Responese,认为发布成功
if (autoCreateIndices.isEmpty()) {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
} else {
//计数器,值为需要创建的索引数量,counter.decrementAndGet() == 0表示全部创建完毕,进入下一个流程
final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size());
for (String index : autoCreateIndices) {
//发送创建索引请求
createIndex(index, bulkRequest.timeout(), minNodeVersion, new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse result) {
if (counter.decrementAndGet() == 0) {
threadPool.executor(ThreadPool.Names.WRITE).execute(
() -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated));
}
}
@Override
//收到失败的响应
public void onFailure(Exception e) {
if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) {
//如果create不起作用,则涉及此索引的所有请求都失败
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) {
bulkRequest.requests.set(i, null);
}
}
}
if (counter.decrementAndGet() == 0) {
executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> {
inner.addSuppressed(e);
listener.onFailure(inner);
}), responses, indicesThatCannotBeCreated);
}
}
});
}
}
} else {
executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
}
}
TransportBulkAction.BulkOperation
//在可重试集群块上重试,解析项请求,构造分片bulk请求并委托TransportShardBulkAction执行分片级的bulk操作,即合并请求
private final class BulkOperation extends ActionRunnable<BulkResponse>
TransportBulkAction.BulkOperation.doRun()
@Override
protected void doRun() {
assert bulkRequest != null;
//检查集群状态
final ClusterState clusterState = observer.setAndGetObservedState();
if (handleBlockExceptions(clusterState)) {
return;
}
//对请求的预处理:检查参数、自动生成id、处理routing等
//由于上一步可能有创建索引操作,所以在此先获取最新集群状态信息
//然后遍历所有请求,从集群状态中获取对应索引的元信息,检查mapping、routing、id等信息
//如果id不存在,则生成一个UUID作为文档id
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
Metadata metadata = clusterState.metadata();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
//请求只能为null,因为在前面的步骤中将其设置为null,因此会被忽略
if (docWriteRequest == null) {
continue;
}
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metadata)) {
continue;
}
Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
try {
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX:
prohibitAppendWritesInBackingIndices(docWriteRequest, metadata);
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
final IndexMetadata indexMetadata = metadata.index(concreteIndex);
MappingMetadata mappingMd = indexMetadata.mapping();
Version indexCreated = indexMetadata.getCreationVersion();
indexRequest.resolveRouting(metadata);
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
break;
case UPDATE:
TransportUpdateAction.resolveAndValidateRouting(metadata, concreteIndex.getName(),
(UpdateRequest) docWriteRequest);
break;
case DELETE:
docWriteRequest.routing(metadata.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
//检查是否需要路由,如果需要且没有指定路由,抛出错误
if (docWriteRequest.routing() == null && metadata.routingRequired(concreteIndex.getName())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.id());
}
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(),
docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
responses.set(i, bulkItemResponse);
//确保请求不再被处理
bulkRequest.requests.set(i, null);
}
}
//内容路由,构建基于shard的请求,本质是合并请求的过程
//首先,检查所有请求并创建一个ShardId -> 操作的映射,表示基于shard的请求结构
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
//根据路由算法计算某文档属于哪个分片。遍历所有的用户请求,重新封装后添加到上述map结构
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
request.routing()).shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}
if (requestsByShard.isEmpty()) {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)));
return;
}
//遍历所有需要写的shard,将位于某个shard的请求封装为BulkShardRequest类,发送请求
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
String nodeId = clusterService.localNode().getId();
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
bulkShardRequest.routedBasedOnClusterVersion(clusterState.version());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
//在listener中等待响应,每个响应也是以shard为单位的
//调用NodeClient.executeLoacally()
client.executeLocally(TransportShardBulkAction.TYPE, bulkShardRequest, new ActionListener<>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
@Override
public void onFailure(Exception e) {
//为所有相关请求设置失败
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest<?> docWriteRequest = request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)));
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
private void finishHim() {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)));
}
});
}
//允许在所有请求项完成之前回收大容量请求项的内存
bulkRequest = null;
}
NodeClient.executeLocally()
//在本地执行ActionType,返回用于跟踪它的Task,并链接一个ActionListener
//调用TaskManager.registerAndExecute()
public < Request extends ActionRequest,
Response extends ActionResponse
> Task executeLocally(ActionType<Response> action, Request request, ActionListener<Response> listener) {
return taskManager.registerAndExecute("transport", transportAction(action), request,
(t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e));
}
TaskManager.registerAndExecute()
public <Request extends ActionRequest, Response extends ActionResponse>
Task registerAndExecute(String type, TransportAction<Request, Response> action, Request request,
BiConsumer<Task, Response> onResponse, BiConsumer<Task, Exception> onFailure) {
final Releasable unregisterChildNode;
if (request.getParentTask().isSet()) {
unregisterChildNode = registerChildNode(request.getParentTask().getId(), lastDiscoveryNodes.getLocalNode());
} else {
unregisterChildNode = () -> {};
}
final Task task;
try {
task = register(type, action.actionName, request);
} catch (TaskCancelledException e) {
unregisterChildNode.close();
throw e;
}
//进入TransportAction.execute()
action.execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
Releasables.close(unregisterChildNode, () -> unregister(task));
} finally {
onResponse.accept(task, response);
}
}
@Override
public void onFailure(Exception e) {
try {
Releasables.close(unregisterChildNode, () -> unregister(task));
} finally {
onFailure.accept(task, e);
}
}
});
return task;
}
TransportShardBulkAction.java
//执行分片级bulk(index、delete或update)操作
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse>{}
//用于传输修改某些分片中的数据的操作(如index、delete和shardBulk)的基类
//允许在主分片和副本分片上执行写操作后执行异步操作(例如刷新)
public abstract class TransportWriteAction<
Request extends ReplicatedWriteRequest<Request>,
ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
Response extends ReplicationResponse & WriteResponse
> extends TransportReplicationAction<Request, ReplicaRequest, Response>{}
//基类,用于应该在主分片和副本分片上执行的请求
//子类可以解析目标分片,并为主操作和副本操作提供实现
//该操作在接收节点上对集群状态进行采样,以便重新路由到具有主分片的节点,在主节点上对请求进行验证,然后在进行主操作之前再次对具有副本分片的节点进行采样,以便执行复制
public abstract class TransportReplicationAction<
Request extends ReplicationRequest<Request>,
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse
> extends TransportAction<Request, Response> {}
public abstract class TransportAction<Request extends ActionRequest, Response extends ActionResponse>
TransportAction.execute()
//当传输操作应该继续在当前任务的上下文中运行时,使用此方法
public final void execute(Task task, Request request, ActionListener<Response> listener) {
//...
RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
requestFilterChain.proceed(task, actionName, request, listener);
}
private static class RequestFilterChain<Request extends ActionRequest, Response extends ActionResponse>
implements ActionFilterChain<Request, Response> {
//...
@Override
public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
int i = index.getAndIncrement();
try {
if (i < this.action.filters.length) {
this.action.filters[i].apply(task, actionName, request, listener, this);
} else if (i == this.action.filters.length) {
//进入TransportReplicationAction.doExecute()
this.action.doExecute(task, request, listener);
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch(Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}
}
TransportReplicationAction.doExecute()
//调用ReroutePhase.run(),转发请求
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
assert request.shardId() != null : "request shardId must be set";
new ReroutePhase((ReplicationTask) task, request, listener).run();
}
//负责路由和重试主分片上失败的操作
//实际的主操作是在具有主分片的节点的ReplicationOperation中完成的
//在将请求路由到目标节点之前,解析请求的索引和分片id
final class ReroutePhase extends AbstractRunnable {
//转发请求
@Override
protected void doRun() {
setPhase(task, "routing");
//获取最新集群状态
final ClusterState state = observer.setAndGetObservedState();
final ClusterBlockException blockException = blockExceptions(state, request.shardId().getIndexName());
if (blockException != null) {
if (blockException.retryable()) {
logger.trace("cluster is blocked, scheduling a retry", blockException);
retry(blockException);
} else {
finishAsFailed(blockException);
}
} else {
final IndexMetadata indexMetadata = state.metadata().index(request.shardId().getIndex());
if (indexMetadata == null) {
//确保节点上的集群状态至少与决定索引在哪的节点相同
if (state.version() < request.routedBasedOnClusterVersion()) {
logger.trace("failed to find index [{}] for request [{}] despite sender thinking it would be here. " +
"Local cluster state version [{}]] is older than on sending node (version [{}]), scheduling a retry...",
request.shardId().getIndex(), request, state.version(), request.routedBasedOnClusterVersion());
retry(new IndexNotFoundException("failed to find index as current cluster state with version [" + state.version() +
"] is stale (expected at least [" + request.routedBasedOnClusterVersion() + "]",
request.shardId().getIndexName()));
return;
} else {
finishAsFailed(new IndexNotFoundException(request.shardId().getIndex()));
return;
}
}
if (indexMetadata.getState() == IndexMetadata.State.CLOSE) {
finishAsFailed(new IndexClosedException(indexMetadata.getIndex()));
return;
}
if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) {
request.waitForActiveShards(indexMetadata.getWaitForActiveShards());
}
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
"request waitForActiveShards must be set in resolveRequest";
//获取主分片
final ShardRouting primary = state.getRoutingTable().shardRoutingTable(request.shardId()).primaryShard();
if (primary == null || primary.active() == false) {
logger.trace("primary shard [{}] is not yet active, scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]", request.shardId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard is not active");
return;
}
if (state.nodes().nodeExists(primary.currentNodeId()) == false) {
logger.trace("primary shard [{}] is assigned to an unknown node [{}], scheduling a retry: action [{}], request [{}], "
+ "cluster state version [{}]", request.shardId(), primary.currentNodeId(), actionName, request, state.version());
retryBecauseUnavailable(request.shardId(), "primary shard isn't assigned to a known node.");
return;
}
//获取主分片所在节点
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
//如果主分片在本节点,则在本地执行,否则转发出去
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetadata);
} else {
performRemoteAction(state, primary, node);
}
}
}
}