深入理解Eureka Server集群同步(十)

集群启动同步

protected void initEurekaServerContext() throws Exception {
   
    // ....省略N多代码
    // 同步信息
   int registryCount = this.registry.syncUp();
   // ....省略N多代码
}

网上很多文章说是调用syncUp这个方法去其他Eureka Server节点复制注册信息,这个说法不是很准确, 在这个地方,SyncUp()这个方法并不会去其他Eureka Server节点复制信息,而是从本地内存里面获取注册信息, 看源码就知道了。

public int syncUp() {
    // Copy entire entry from neighboring DS node
    // 获取到的注册节点数量
    int count = 0;
    // 如果count==0 , 那么默认重试5次(前提是开启了register-with-eureka = true,否则为0)
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                // 从第二次开始,每次默认沉睡30秒
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        // 从本地内存里面获取注册实例信息
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    // 判断是否可以注册
                    if (isRegisterable(instance)) {
                        // 注册到当前Eureka Server里面
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}

参数说明:

regirstrySyncRetries : 当eureka服务器启动时尝试去获取集群里其他服务器上的注册信息的次数,默认为5,

只有当 eureka.client.register-with-eureka = true 的时候才会是5,如果是false ,则为0

registrySyncRetryWaitMs : 当eureka服务器启动时获取其他服务器的注册信息失败时,会再次尝试获取,期间需要等待的时间,默认为30 * 1000毫秒

count : 获取到的注册实例数量,如果为0 则根据重试次数进行重试,每次重试前沉默 30秒

PS: 在之前的文章中 7. 深入理解Eureka 获取注册信息(七) ,讲过Eureka Client启动的时候默认会自动从Eureka Server获取注册信息, 要想Eureka Server在启动的时候可以同步其他集群节点的注册信息,那么必须开启客户端配置

eureka.client.register-with-eureka = true    ## 是否作为一个Eureka Client 注册到Eureka Server上去
eureka.client.fetch-registry = true              ## 是否需要从Eureka Server上拉取注册信息到本地。

只有开启了上面两个配置,那么集群节点在启动的时候,会初始化Eureka Client端的配置 ,会从其他Eureka Server拉取注册信息到本地,同时

在初始化Eureka Server的时候,会从本地内存里面读取 注册信息,自动注册到本身的服务上

集群同步类型

public enum Action {
    Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;

    private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());

    public com.netflix.servo.monitor.Timer getTimer() {
        return this.timer;
    }
}


Heartbeat : 心跳续约

Register : 注册

Cancel : 下线

StatusUpdate : 添加覆盖状态

DeleteStatusOverride : 删除覆盖状态

发起同步

这里以注册的代码为例

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    // 发起注册
    super.register(info, leaseDuration, isReplication);
    // 注册完成后,在这里发起同步,同步类型为Register
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        // 判断是否是集群同步请求,如果是,则记录最后一分钟的同步次数
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        // 集群节点为空,或者这是一个Eureka Server 同步请求,直接return
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        // 循环相邻的Eureka Server Node, 分别发起请求同步
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // 判断是否是自身的URL,过滤掉
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 发起同步请求
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

步骤说明:

1.判断集群节点是否为空,为空则返回

2.isReplication 代表是否是一个复制请求, isReplication = true 表示是其他Eureka Server发过来的同步请求

这个时候是不需要继续往下同步的。否则会陷入同步死循环

3.循环集群节点,过滤掉自身的节点

4.发起同步请求 ,调用replicateInstanceActionsToPeers

PS: 这里提到了PeerEurekaNode , 对于PeerEurekaNodes的集群节点更新及数据读取,可以看这个1. 深入理解Eureka Server启动(一)在服务启动的时候,对PeerEurekaNodes集群开启了线程更新集群节点信息。每15分钟一次

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel: // 下线
                node.cancel(appName, id);
                break;
            case Heartbeat:
                // 心跳
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                // 获取本地最新的实例信息
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register: // 注册
                node.register(info);
                break;
            case StatusUpdate:  // 设置覆盖状态
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride: //删除覆盖状态
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

这里直接看注册,其他的原理上是一致的。

PeerEurekaNode的register方法如下。

public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    // 默认采用的是批处理
    batchingDispatcher.process(
            taskId("register", info),
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}

默认采用的是批量任务处理器,就是将task放入任务队列中,然后通过线程获取任务队列里面的任务,模仿ThreadExecutorPool的方式,生成线程,

从队列里面抓取任务处理,统一批量执行,Eureka Server 那边也是统一接收,这样提高了同步效率

批量处理的任务执行器是com.netflix.eureka.cluster.ReplicationTaskProcessor

@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
    // 构建ReplicationInstance放入ReplicationList 
    ReplicationList list = createReplicationListOf(tasks);
    try {
        // 发起批量处理请求
        EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
        int statusCode = response.getStatusCode();
        if (!isSuccess(statusCode)) {
            if (statusCode == 503) {
                logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
                return ProcessingResult.Congestion;
            } else {
                // Unexpected error returned from the server. This should ideally never happen.
                logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
                return ProcessingResult.PermanentError;
            }
        } else {
            // 处理执行结果 ,成功则调用handleSuccess ,失败则调用handleFailure。
            handleBatchResponse(tasks, response.getEntity().getResponseList());
        }
    } catch (Throwable e) {
        if (isNetworkConnectException(e)) {
            logNetworkErrorSample(null, e);
            return ProcessingResult.TransientError;
        } else {
            logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
            return ProcessingResult.PermanentError;
        }
    }
    return ProcessingResult.Success;
}

请求批量处理的接口地址 : peerreplication/batch/

handleBatchResponse(tasks, response.getEntity().getResponseList()) , 循环调用处理结果,

成功则调用handleSuccess. , 失败则调用handleFailure , 比如hearbeat的时候,调用返回码为

404的时候,会重新发起注册。

ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
    @Override
    public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
        return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
    }

    @Override
    public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
        super.handleFailure(statusCode, responseEntity);
        if (statusCode == 404) {
                // 重新发起注册。
                register(info);
            }
        } else if (config.shouldSyncWhenTimestampDiffers()) {
            InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
            if (peerInstanceInfo != null) {
                syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
            }
        }
    }
};

Eureka Server接收同步

程序入口 : com.netflix.eureka.resources.PeerReplicationResource

@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
    try {
        ReplicationListResponse batchResponse = new ReplicationListResponse();
        // 循环请求的任务
        for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
            try {
                // 分发任务,同时将处理结果收集起来,等会统一返回
                batchResponse.addResponse(dispatch(instanceInfo));
            } catch (Exception e) {
                batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
                logger.error(instanceInfo.getAction() + " request processing failed for batch item "
                        + instanceInfo.getAppName() + '/' + instanceInfo.getId(), e);
            }
        }
        return Response.ok(batchResponse).build();
    } catch (Throwable e) {
        logger.error("Cannot execute batch Request", e);
        return Response.status(Status.INTERNAL_SERVER_ERROR).build();
    }
}


private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
    //  创建实例
    ApplicationResource applicationResource = createApplicationResource(instanceInfo);
    // 创建实例
    InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
    //获取客户端instance的lastDirtyTimestamp  ,有点类似于版本号的概念。
    String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
    // 获取覆盖状态
    String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
    // 获取instance的状态
    String instanceStatus = toString(instanceInfo.getStatus());

    Builder singleResponseBuilder = new Builder();
    switch (instanceInfo.getAction()) {
        case Register: // 注册
            singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
            break;
        case Heartbeat: // 心跳续约
            singleResponseBuilder = handleHeartbeat(resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
            break;
        case Cancel:   // 下线
            singleResponseBuilder = handleCancel(resource);
            break;
        case StatusUpdate:  // 修改覆盖状态
            singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
            break;
        case DeleteStatusOverride: // 删除覆盖状态
            singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
            break;
    }
    return singleResponseBuilder.build();
}

以上五个场景,这里就不一一说了,就说一下注册吧,

private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
    // 调用Application控制层的接口,添加实例
    applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
    return new Builder().setStatusCode(Status.OK.getStatusCode());
}


@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
    // 省略代码1000行
    return Response.status(204).build();  // 204 to be backwards compatible
}

REPLICATION = “true” ,此次请求为true,表示是一个服务端的复制请求。

由上面可以知道,集群同步走的和客户端注册的后续流程是一样的,只不过isReplication=true , 表明这是一个集群同步的请求

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351

推荐阅读更多精彩内容