Nacos源码分析-服务注册

零、本文纲要

  • 一、源码准备
  • 二、了解服务注册-客户端
    1、Nacos的服务注册表结构
    2、查看Nacos的服务注册源码
    3、跟踪Nacos的服务注册流程
    4、客户端注册的流程图
  • 三、了解服务注册-服务端
    1、确定模块
    2、跟踪Nacos接收处理服务注册源码
    3、服务端注册的流程图
  • 四、Nacos服务注册部分总结

tips:Ctrl + F快速定位所需内容阅读吧。

一、源码准备

1、下载Nacos源码

官方下载连接:Release 1.4.2 (Apr 29th, 2021) · alibaba/nacos · GitHub

下载Nacos源码.png

2、解压导入源码

导入IDEA,此处省略步骤。

3、proto编译

Nacos底层的数据通信会基于protobuf对数据做序列化和反序列化,需要先将proto文件编译为对应的Java代码。

proto编译.png
  • ① 安装protoc

下载protoc:Releases · protocolbuffers/protobuf · GitHub

image.png
  • ② 解压&配置环境变量
复制目录.png
配置系统变量.png
配置环境变量.png

4、编译proto

  • ① 进入目标目录nacos-1.4.2\consistency\src\main
进入目标目录.png
  • ② 打开cmd窗口进行编译

编译consistency.proto到java目录,如下:

protoc --java_out=./java ./proto/consistency.proto

编译Data.proto到java目录,如下:

protoc --java_out=./java ./proto/Data.proto

5、启动Nacos配置

Nacos控制台启动类.png
启动Nacos配置.png

6、测试

访问控制台.png

二、了解服务注册-客户端

1、Nacos的服务注册表结构

  • ① 环境隔离:namespace
  • ② 服务分组:group
  • ③ 服务集群:service cluster
  • ④ 服务实例:service instance
image.png

2、查看Nacos的服务注册源码

  • ① 定位依赖

服务注册与Nacos的依赖有关,所以查看spring-cloud-starter-alibaba-nacos-discovery依赖,如下:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
  • ② 在依赖中查看自动装配文件spring.factories
查看自动装配文件.png
  • ③ 定位Nacos服务注册自动配置类
定位Nacos服务注册自动配置类.png
  • ④ 查看NacosServiceRegistryAutoConfiguration类源码
image.png

3、跟踪Nacos的服务注册流程

由上述内容我们可以知道,Nacos服务自动注册是从NacosServiceRegistryAutoConfiguration类开始的,自动注册涉及到NacosAutoServiceRegistration类。

  • ① NacosAutoServiceRegistration类

NacosServiceRegistryAutoConfiguration类最后返回了new出来的NacosAutoServiceRegistration类对象,所以我们继续跟踪该类的构造方法,如下:

NacosAutoServiceRegistration类.png

该类构造方法中初始化其父类,所以我们继续跟踪父类。

  • ② AbstractAutoServiceRegistration类

该类实现了ApplicationListener接口,监听Spring容器启动过程中的WebServerInitializedEvent事件,如下:

AbstractAutoServiceRegistration类.png

在监听到WebServerInitializedEvent(web服务初始化完成)的事件后,执行了bind 方法,如下:

监听到WebServerInitializedEvent事件.png

AbstractAutoServiceRegistration#bind(row91-102)方法,如下:

@Deprecated
public void bind(WebServerInitializedEvent event) {
  // 获取 ApplicationContext 对象
  ApplicationContext context = event.getApplicationContext();
  // 判断服务的 Namespace,一般为 null
  if (context instanceof ConfigurableWebServerApplicationContext) {
    if ("management".equals(((ConfigurableWebServerApplicationContext) context)
        .getServerNamespace())) {
      return;
    }
  }
  // 记录当前 web 服务的端口
  this.port.compareAndSet(0, event.getWebServer().getPort());
  // 启动当前服务注册流程
  this.start();
}

AbstractAutoServiceRegistration#start(row125-147)方法,如下:

public void start() {
  if (!isEnabled()) {
    if (logger.isDebugEnabled()) {
      logger.debug("Discovery Lifecycle disabled. Not starting");
    }
    return;
  }

  // only initialize if nonSecurePort is greater than 0 and it isn't already running
  // because of containerPortInitializer below
  // 当前服务处于未运行状态时,才进行初始化
  if (!this.running.get()) {
    // 发布服务开始注册的事件
    this.context.publishEvent(
        new InstancePreRegisteredEvent(this, getRegistration()));
    // 【关键】开始服务注册
    register();
    if (shouldRegisterManagement()) {
      registerManagement();
    }
    // 发布注册完成事件
    this.context.publishEvent(
        new InstanceRegisteredEvent<>(this, getConfiguration()));
    // 服务状态设置为运行状态,基于AtomicBoolean#compareAndSet(row98-102)
    this.running.compareAndSet(false, true);
  }

}

AbstractAutoServiceRegistration#register(row238-240)方法,如下:

ServiceRegistry的register方法.png
NacosServiceRegistry实现类.png
  • ③ NacosServiceRegistry类

NacosServiceRegistry#register(row59-89)方法,如下:

@Override
public void register(Registration registration) {
  // 判断 ServiceId 是否为空,spring.applicaion.name 不能为空
  if (StringUtils.isEmpty(registration.getServiceId())) {
    log.warn("No service to register for nacos client...");
    return;
  }
  // 获取 Nacos 的命名服务,就是注册中心服务
  NamingService namingService = namingService();
  // 获取 serviceId、group
  String serviceId = registration.getServiceId();
  String group = nacosDiscoveryProperties.getGroup();
  // 封装服务实例,包含:Ip、Port、Weight、ClusterName、Ephemeral等
  Instance instance = getNacosInstanceFromRegistration(registration);

  try {
    // 开始注册服务
    namingService.registerInstance(serviceId, group, instance);
    log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
        instance.getIp(), instance.getPort());
  }
  catch (Exception e) {
    if (nacosDiscoveryProperties.isFailFast()) {
      log.error("nacos registry, {} register failed...{},", serviceId,
          registration.toString(), e);
      rethrowRuntimeException(e);
    }
    else {
      log.warn("Failfast is false. {} register failed...{},", serviceId,
          registration.toString(), e);
    }
  }
}

可以看到方法中最终是调用NamingService的registerInstance方法实现注册的,如下:

NamingService的registerInstance方法.png

NamingService的实现类NacosNamingService,如下:

image.png
  • ④ NacosNamingService类

NacosNamingService#registerInstance(row204-213)方法,如下:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    // 检查超时参数是否异常,心跳超时时间(默认15秒)必须大于心跳周期(默认5秒)
    NamingUtils.checkInstanceIsLegal(instance);
    // 拼接得到新的服务名,格式:groupName@@serviceName
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    // 判断是否为临时实例,默认为 true
    if (instance.isEphemeral()) {
        // 是临时实例,需要定时向 Nacos 服务发送心跳
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    // 【关键】发送注册服务实例的请求
    serverProxy.registerService(groupedServiceName, groupName, instance);
}

可以看到registerService最后是由NamingProxy的实现的,如下:

registerService.png

补充:com.alibaba.nacos.api.common.Constants(row167-171),如下:

// 心跳超时时间,15s
public static final long DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
// IP删除超时时间,30s
public static final long DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30);
// 心跳周期,5s
public static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
  • ⑤ NamingProxy类

NamingProxy#registerService(row220-248)方法,如下:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
            instance);
    // 组织请求参数    
    final Map<String, String> params = new HashMap<String, String>(16);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
    // 通过 POST 请求,将上述参数发送到:/nacos/v1/ns/instance
    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    
}

这里提交的信息就是Nacos服务注册接口需要的完整参数,核心参数有:

Ⅰ NAMESPACE_ID:环境;
Ⅱ SERVICE_NAME:服务名称;
Ⅲ GROUP_NAME:分组名称;
Ⅳ CLUSTER_NAME:集群名称;
Ⅴ ip:当前实例的IP地址;
Ⅵ port:当前实例的端口。

补充:com.alibaba.nacos.client.naming.utils.UtilAndComs(row30-34),如下:

public static String webContext = "/nacos";
public static String nacosUrlBase = webContext + "/v1/ns";
public static String nacosUrlInstance = nacosUrlBase + "/instance";

4、客户端注册的流程图

客户端注册的流程图.png

三、了解服务注册-服务端

经过以上了解,我们知道最后客户端注册服务实例是通过 POST 请求,将注册参数发送到:/nacos/v1/ns/instance。因此,我们从对应接收此请求的Controller开始。

1、确定模块

  • ① nacos-concle模块

我们启动Nacos服务会使用Nacos-concle模块的启动类,该模块中引用的nacos-naming模块就是我们服务注册相关的模块。

nacos-naming模块.png
  • ② nacos-naming模块

可以看到InstanceController类的请求路由即是我们POST请求的路由的部分,如下:

image.png

因此,我们从InstanceController开始研究接收请求处理服务注册的源码。

2、跟踪Nacos接收处理服务注册源码

  • ① InstanceController类

InstanceController#register方法,如下:

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    // 从 request 获取 namespaceId,没有则为默认 public
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);

    // 获取服务名称 serviceName = "group@@serviceName"
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);

    // 把 request 中的参数封装为 Instance 对象
    final Instance instance = parseInstance(request);

    // 【关键】注册实例
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

此处会进入ServiceManager的注册方法,如下:

注册实例.png
  • ② ServiceManager类

Ⅰ ServiceManager#serviceMap属性:Map(namespace, Map(group::serviceName, Service)),里面注册着各个服务实例,如下:

Nacos服务注册表.png

Ⅱ ServiceManager#registerInstance方法,如下:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    // 如果是第一次,则创建空的服务,放入注册表
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());

    // 从注册表中拿到 service
    Service service = getService(namespaceId, serviceName);

    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    // 【关键】添加实例到 service 当中
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

Ⅲ ServiceManager#addInstance方法,如下:

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    // 给当前服务生成一个唯一标识,可以理解为 serviceId
    // 临时:com.alibaba.nacos.naming.iplist.ephemeral. + namespaceId + ## + serviceName
    // 永久:com.alibaba.nacos.naming.iplist. + namespaceId + ## + serviceName
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

    // 从注册表中拿到 service
    Service service = getService(namespaceId, serviceName);

    // 以 service 为锁对象,同一个服务的多个实例,只能串行来完成注册(不能并发修改)
    synchronized (service) {
        // 【重点】拷贝注册表中 旧的实例列表,在此结合新注册的实例,得到最终的实例列表 COPY ON WRITE
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

        // 封装实例列表到 Instances 对象中
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);

        // 更新注册表(更新本地注册表、数据同步给 Nacos 集群中的其他节点)
        consistencyService.put(key, instances);
    }
}

该方法中对修改服务列表的动作加锁处理,确保线程安全。而在同步代码块中,包含下面几步:

  • 1)先获取要更新的实例列表,addIpAddresses(service, ephemeral, ips);
  • 2)然后将更新后的数据封装到Instances对象中,后面更新注册表时使用
  • 3)最后,调用consistencyService.put()方法完成Nacos集群的数据同步,保证集群一致性。

注意:在第1步的addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中。在第3步中,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。

COPY ON WRITE:在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。

【A、更新服务列表List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

整个过程:

a、获取旧的实例列表,对比新的与旧的;
b、添加新的实例,旧的实例同步id;
c、返回最新的实例列表。

具体源码如下:

ServiceManager#addIpAddresses方法,如下:

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}

ServiceManager#updateIpAddresses方法,如下:

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
        throws NacosException {
    // 从 DataStore 中获取实例列表,可以理解为 Nacos 集群同步来的实例列表
    Datum datum = consistencyService
            .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

    // 从本地注册表中,获取实例列表
    List<Instance> currentIPs = service.allIPs(ephemeral);
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    Set<String> currentInstanceIds = Sets.newHashSet();
    // 封装本地注册表中实例列表
    for (Instance instance : currentIPs) {
        currentInstances.put(instance.toIpAddr(), instance);
        currentInstanceIds.add(instance.getInstanceId());
    }

    // 合并与拷贝,旧实例列表
    Map<String, Instance> instanceMap;
    if (datum != null && null != datum.value) {
        // 如果集群同步列表中有数据,则将本地注册列表和 datum 中的列表做合并
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    } else {
        instanceMap = new HashMap<>(ips.length);
    }

    // 遍历新实例列表
    for (Instance instance : ips) {
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            cluster.init();
            service.getClusterMap().put(instance.getClusterName(), cluster);
            Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                            instance.getClusterName(), instance.toJson());
        }

        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            instanceMap.remove(instance.getDatumKey());
        } else {
            // 尝试获取与当前实例ip、端口一致的旧实例
            Instance oldInstance = instanceMap.get(instance.getDatumKey());
            if (oldInstance != null) {
                // 如果存在,则把旧的 instanceId 赋值作为新的 instanceId
                instance.setInstanceId(oldInstance.getInstanceId());
            } else {
                // 如果不存在,证明是一个全新实例,则生成id
                instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            }
            instanceMap.put(instance.getDatumKey(), instance);
        }

    }

    if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
        throw new IllegalArgumentException(
                "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                        .toJson(instanceMap.values()));
    }
    // 返回实例列表
    return new ArrayList<>(instanceMap.values());
}

【B、Nacos集群一致性consistencyService.put(key, instances);

Nacos集群一致性.png

ServiceManager#consistencyService属性,如下:

consistencyService属性.png

可以看到,此处的put方法正是采用DelegateConsistencyServiceImpl的put方法。

DelegateConsistencyServiceImpl#put方法,如下:

@Override
public void put(String key, Record value) throws NacosException {
    // 根据实例是否是临时实例,判断委托对象
    mapConsistencyService(key).put(key, value);
}

DelegateConsistencyServiceImpl#mapConsistencyService方法,如下:

private ConsistencyService mapConsistencyService(String key) {
    // 判断是否是临时实例:
    // 是,选择 ephemeralConsistencyService,也就是 DistroConsistencyServiceImpl类
    // 否,选择 persistentConsistencyService,也就是 PersistentConsistencyServiceDelegateImpl
    return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}

默认情况下,所有实例都是临时实例,下面则关注DistroConsistencyServiceImpl类。

  • ③ DistroConsistencyServiceImpl类

DistroConsistencyServiceImpl#put方法,如下:

@Override
public void put(String key, Record value) throws NacosException {
    // 异步,更新本地注册表
    onPut(key, value);
    // 异步,将数据同步给 Nacos 集群中的其他节点
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}

onPut(key, value);
key:ServiceManager#addInstance方法中的String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);,临时:com.alibaba.nacos.naming.iplist.ephemeral. + namespaceId + ## + serviceName
value:ServiceManager#addInstance方法中的instances.setInstanceList(instanceList);封装了实例列表的Instances对象。

distroProtocol.sync(...)
是通过Distro协议将数据同步给集群中的其它Nacos节点。

【A、更新本地实例列表】

DistroConsistencyServiceImpl#onPut方法,如下:

public void onPut(String key, Record value) {
    // 判断是否是临时实例
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        // 把实例列表封装到 Datum
        Datum<Instances> datum = new Datum<>();
        // value 是服务中的实例列表 Instances
        datum.value = (Instances) value;
        // key 是 serviceId
        datum.key = key;
        datum.timestamp.incrementAndGet();
        // 以 serviceId 为 key,Datum 为 value 缓存起来
        dataStore.put(key, datum);
    }
    //
    if (!listeners.containsKey(key)) {
        return;
    }
    // 【重点】把 serviceId 和当前操作类型存入 notifier
    notifier.addTask(key, DataOperation.CHANGE);
}

此处我们可以看到更新本地列表的操作最后交由notifier对象完成,notifier对象是DistroConsistencyServiceImpl的内部类实例,如下:

Notifier内部类.png

a、将变更事件放入阻塞队列

该对象内部维护了一个阻塞队列,存放服务列表变更的事件,DistroConsistencyServiceImpl#Notifier#tasks属性,如下:

阻塞队列属性.png

DistroConsistencyServiceImpl#Notifier#addTask方法,如下:

public void addTask(String datumKey, DataOperation action) {

    if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
        return;
    }
    if (action == DataOperation.CHANGE) {
        services.put(datumKey, StringUtils.EMPTY);
    }
    // 把 serviceId 和事件放入阻塞队列
    tasks.offer(Pair.with(datumKey, action));
}

b、异步更新

DistroConsistencyServiceImpl#init方法,如下:

// 一个bean的初始化过程中,方法执行先后顺序为 Constructor > @Autowired > @PostConstruct
@PostConstruct // 在依赖加载后,对象使用前执行,而且只执行一次
public void init() {
    // 利用线程池执行 notifier
    // public class Notifier implements Runnable{...}
    GlobalExecutor.submitDistroNotifyTask(notifier);
}
单线程线程池.png

可以看到Notifier是通过一个单线程的线程池,来不断从阻塞队列中获取任务,执行服务列表的更新。

DistroConsistencyServiceImpl#Notifier#run方法,如下:

@Override
public void run() {
    Loggers.DISTRO.info("distro notifier started");
    // 死循环
    for (; ; ) {
        try {
            // 从阻塞队列中获取任务
            Pair<String, DataOperation> pair = tasks.take();
            // 执行任务,更新服务列表
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}

DistroConsistencyServiceImpl#Notifier#handle方法,如下:

private void handle(Pair<String, DataOperation> pair) {
    try {
        // 获取 serviceId
        String datumKey = pair.getValue0();
        // 事件类型,是 CHANGE 类型
        DataOperation action = pair.getValue1();

        services.remove(datumKey);

        int count = 0;

        if (!listeners.containsKey(datumKey)) {
            return;
        }

        for (RecordListener listener : listeners.get(datumKey)) {

            count++;

            try {
                if (action == DataOperation.CHANGE) {
                    // 【重点】这里的 listener 就是 service,当服务变更时,自然就触发了 onChange 事件,处理变更
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }

                if (action == DataOperation.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }

        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO
                    .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
        }
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}

c、覆盖实例列表listener.onChange(datumKey, dataStore.get(datumKey).value);

Service#onChange方法,如下:

@Override
public void onChange(String key, Instances value) throws Exception {

    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    // 对权重做初始化
    for (Instance instance : value.getInstanceList()) {

        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }

        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }

        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    // 更新实例列表
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));

    recalculateChecksum();
}

Service#updateIPs方法,如下:

public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    // 创建新的 map,相当于一个新的 clusterMap
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
    // 把所有实例放入新的 clusterMap
    for (Instance instance : instances) {
        try {
            if (instance == null) {
                Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                continue;
            }

            if (StringUtils.isEmpty(instance.getClusterName())) {
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }

            if (!clusterMap.containsKey(instance.getClusterName())) {
                Loggers.SRV_LOG
                        .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                                instance.getClusterName(), instance.toJson());
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                cluster.init();
                getClusterMap().put(instance.getClusterName(), cluster);
            }

            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }

            clusterIPs.add(instance);
        } catch (Exception e) {
            Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
        }
    }

    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        //make every ip mine
        // 遍历新的 clusterMap,得到 cluster 中的实例列表
        List<Instance> entryIPs = entry.getValue();
        // 【重点】把新实例列表,更新到注册表中的 cluster 中
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }

    setLastModifiedMillis(System.currentTimeMillis());
    getPushService().serviceChanged(this);
    StringBuilder stringBuilder = new StringBuilder();

    for (Instance instance : allIPs()) {
        stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
    }

    Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
            stringBuilder.toString());

}

Cluster#updateIps方法,如下:

public void updateIps(List<Instance> ips, boolean ephemeral) {
    // 先得到旧的实例列表
    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;

    HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());

    for (Instance ip : toUpdateInstances) {
        oldIpMap.put(ip.getDatumKey(), ip);
    }
    // ips 中包含两部分:新增的实例,要更新的实例
    // 新旧实例列表交集,得到要更新的部分
    List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
    if (updatedIPs.size() > 0) {
        for (Instance ip : updatedIPs) {
            Instance oldIP = oldIpMap.get(ip.getDatumKey());

            // do not update the ip validation status of updated ips
            // because the checker has the most precise result
            // Only when ip is not marked, don't we update the health status of IP:
            if (!ip.isMarked()) {
                // 将实例的 health 保持为 oldInstance 的 health
                ip.setHealthy(oldIP.isHealthy());
            }

            if (ip.isHealthy() != oldIP.isHealthy()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                        (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
            }

            if (ip.getWeight() != oldIP.getWeight()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
                        ip.toString());
            }
        }
    }
    // 新旧实例列表相减,得到待新增的实例列表
    List<Instance> newIPs = subtract(ips, oldIpMap.values());
    if (newIPs.size() > 0) {
        Loggers.EVT_LOG
                .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                        getName(), newIPs.size(), newIPs.toString());

        for (Instance ip : newIPs) {
            HealthCheckStatus.reset(ip);
        }
    }
    // 旧新实例列表相减,得到待删除的实例列表(即旧实例列表有,而新实例列表没有,需删除)
    List<Instance> deadIPs = subtract(oldIpMap.values(), ips);

    if (deadIPs.size() > 0) {
        Loggers.EVT_LOG
                .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                        getName(), deadIPs.size(), deadIPs.toString());

        for (Instance ip : deadIPs) {
            HealthCheckStatus.remv(ip);
        }
    }

    toUpdateInstances = new HashSet<>(ips);
    // 用新实例列表直接覆盖了 cluster 中的旧实例列表
    if (ephemeral) {
        ephemeralInstances = toUpdateInstances;
    } else {
        persistentInstances = toUpdateInstances;
    }
}

【B、集群数据同步】

DistroConsistencyServiceImpl#sync方法,如下:

public void sync(DistroKey distroKey, DataOperation action, long delay) {
    // 遍历,获取 Nacos 集群中的所有成员,除了自己
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                each.getAddress());
        // 定义一个Distro的同步任务
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        // 交给线程池去执行
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
        }
    }
}

上述代码中同步的任务封装为一个DistroDelayTask对象,交给了distroTaskEngineHolder.getDelayTaskExecuteEngine()执行,其返回值为NacosDelayTaskExecuteEngine,这个类维护了一个线程池,并且接收任务,执行任务。

getDelayTaskExecuteEngine.png
DistroDelayTaskExecuteEngine.png
NacosDelayTaskExecuteEngine.png

NacosDelayTaskExecuteEngine#processTasks方法,如下:

protected void processTasks() {
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            // 尝试执行同步任务,如果失败会重试
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

可以看出来基于Distro模式的同步是异步进行的,并且失败时会将任务重新入队并充实,因此不保证同步结果的强一致性,属于AP模式的一致性策略。

3、服务端注册的流程图

服务端注册的流程图.png

四、Nacos服务注册部分总结

1、Nacos的注册表结构

Nacos是多级存储模型,最外层通过namespace来实现环境隔离,然后是group分组,分组下就是服务,一个服务有可以分为不同的集群,集群中包含多个实例。

因此其注册表结构为一个Map,类型是:

  • Map<String, Map<String, Service>>:外层key是namespace_id,内层key是group+serviceName
  • ② Service内部维护一个Map,结构是:Map<String,Cluster>,key是clusterName,值是集群信息;
  • ③ Cluster内部维护一个Set集合Set<Instance> ephemeralInstancesSet<Instance> persistentInstances,元素是Instance类型,代表集群中的多个实例。

2、Nacos保证并发写的安全性

  • ① 在注册实例时,会对service加锁,不同service之间本身就不存在并发写问题,互不影响;相同service时通过锁来互斥。
  • ② 在更新实例列表时,是基于异步的线程池来完成,而线程池的线程数量为1。

3、Nacos避免并发读写的冲突

Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将Old实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表。

4、Nacos应对内部数十万服务的并发写请求

Nacos内部会将服务注册的任务放入阻塞队列,采用线程池异步来完成实例更新,从而提高并发写能力。

五、结尾

以上即为Nacos源码分析-服务注册的全部内容,感谢阅读。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,012评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,628评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,653评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,485评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,574评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,590评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,596评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,340评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,794评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,102评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,276评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,940评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,583评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,201评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,441评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,173评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,136评论 2 352

推荐阅读更多精彩内容