apollo configService用于提供给client获取配置信息,以及配置更新后实时通知client的服务;configservice仅为client提供服务,且每个环境对应相应的configsevice集群。
下面通过源码来分析configservice功能的具体实现
包路径
com.ctrip.framework.apollo.configservice
对外API
ConfigController
/**
* 配置获取控制层,供client根据命名空间获取Config数据信息
* @author Jason Song(song_s@ctrip.com)
*/
@RestController
@RequestMapping("/configs")
public class ConfigController {
/**
* 配置操作服务
*/
@Autowired
private ConfigService configService;
@Autowired
private AppNamespaceServiceWithCache appNamespaceService;
/**
* 命名空间工具类
*/
@Autowired
private NamespaceUtil namespaceUtil;
@Autowired
private InstanceConfigAuditUtil instanceConfigAuditUtil;
/**
* json解析器
*/
@Autowired
private Gson gson;
private static final Type configurationTypeReference = new TypeToken<Map<String, String>>() {
}.getType();
/**
* 查询配置信息
* @param appId 应用ID
* @param clusterName 集群名称
* @param namespace 命名空间
* @param dataCenter 数据中心
* @param clientSideReleaseKey
* @param clientIp 客户端IP
* @param messagesAsString
* @param request
* @param response
* @return
* @throws IOException
*/
@RequestMapping(value = "/{appId}/{clusterName}/{namespace:.+}", method = RequestMethod.GET)
public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
@PathVariable String namespace,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,
@RequestParam(value = "ip", required = false) String clientIp,
@RequestParam(value = "messages", required = false) String messagesAsString,
HttpServletRequest request, HttpServletResponse response) throws IOException {
//构建命名空间
String originalNamespace = namespace;
//strip out .properties suffix
namespace = namespaceUtil.filterNamespaceName(namespace);
//fix the character case issue, such as FX.apollo <-> fx.apollo
namespace = namespaceUtil.normalizeNamespace(appId, namespace);
if (Strings.isNullOrEmpty(clientIp)) {
clientIp = tryToGetClientIp(request);
}
//转换通知消息
ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);
//已发布配置集合
List<Release> releases = Lists.newLinkedList();
String appClusterNameLoaded = clusterName;
if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
//加载给定当前参数下的所有已发布的配置信息
Release currentAppRelease = configService.loadConfig(appId, clientIp, appId, clusterName, namespace,
dataCenter, clientMessages);
if (currentAppRelease != null) {
//添加发布信息
releases.add(currentAppRelease);
//we have cluster search process, so the cluster name might be overridden
appClusterNameLoaded = currentAppRelease.getClusterName();
}
}
//if namespace does not belong to this appId, should check if there is a public configuration
if (!namespaceBelongsToAppId(appId, namespace)) {
//查询公共的发布信息
Release publicRelease = this.findPublicConfig(appId, clientIp, clusterName, namespace,
dataCenter, clientMessages);
if (!Objects.isNull(publicRelease)) {
//添加公共的发布信息
releases.add(publicRelease);
}
}
if (releases.isEmpty()) {
response.sendError(HttpServletResponse.SC_NOT_FOUND,
String.format(
"Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
appId, clusterName, originalNamespace));
Tracer.logEvent("Apollo.Config.NotFound",
assembleKey(appId, clusterName, originalNamespace, dataCenter));
return null;
}
auditReleases(appId, clusterName, dataCenter, clientIp, releases);
//合并发布KEY 用于校验配置是否有变更操作
String mergedReleaseKey = releases.stream().map(Release::getReleaseKey)
.collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));
//校验发布KEY与客户端已存在的发布KEY是否一致
if (mergedReleaseKey.equals(clientSideReleaseKey)) {
//客户端发布EKY与查询到的KEY一致,标识配置未做变更过操作,客户端的配置为最新配置,返回304
// Client side configuration is the same with server side, return 304
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
Tracer.logEvent("Apollo.Config.NotModified",
assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));
return null;
}
//构建返回实例信息
ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,
mergedReleaseKey);
//添加发布配置信息
apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));
Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,
originalNamespace, dataCenter));
return apolloConfig;
}
private boolean namespaceBelongsToAppId(String appId, String namespaceName) {
//Every app has an 'application' namespace
if (Objects.equals(ConfigConsts.NAMESPACE_APPLICATION, namespaceName)) {
return true;
}
//if no appId is present, then no other namespace belongs to it
if (ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
return false;
}
AppNamespace appNamespace = appNamespaceService.findByAppIdAndNamespace(appId, namespaceName);
return appNamespace != null;
}
/**
* 查找所有的公共的发布信息记录
* @param clientAppId the application which uses public config
* @param namespace the namespace
* @param dataCenter the datacenter
*/
private Release findPublicConfig(String clientAppId, String clientIp, String clusterName,
String namespace, String dataCenter, ApolloNotificationMessages clientMessages) {
AppNamespace appNamespace = appNamespaceService.findPublicNamespaceByName(namespace);
//check whether the namespace's appId equals to current one
if (Objects.isNull(appNamespace) || Objects.equals(clientAppId, appNamespace.getAppId())) {
return null;
}
String publicConfigAppId = appNamespace.getAppId();
return configService.loadConfig(clientAppId, clientIp, publicConfigAppId, clusterName, namespace, dataCenter,
clientMessages);
}
/**
* 合并发布的配置信息
* Merge configurations of releases.
* Release in lower index override those in higher index
*/
Map<String, String> mergeReleaseConfigurations(List<Release> releases) {
//构建配置MAP key-V
Map<String, String> result = Maps.newHashMap();
//遍历所有发布的配置信息
for (Release release : Lists.reverse(releases)) {
//组装发布的配置到map集合中
result.putAll(gson.fromJson(release.getConfigurations(), configurationTypeReference));
}
return result;
}
}
ConfigController
- /configs/{appId}/{clusterName}/{namespace:.+} :查询给定参数下所有已发布的配置集合,返回ApolloConfig
- 已发布的配置,包含public和给定参数下配置两部分
- 此API同时记录当前应用实例信息(Instance)到DB中,通过InstanceConfigAuditUtil类
- 此API用于获取client端中Config的原始数据
ConfigFileController
代码略...
ConfigFileController
- /configfiles/{appId}/{clusterName}/{namespace:.+} : 查询给定参数下所有发布的配置集合,组装成给定文件格式的字符串形式返回,(JSON或properties)格式
- 此API存在缓存功能,缓存保存在内存中
- 已发布的配置集合通过ConfigController获取
NotificationControllerV2
代码略...
NotificationControllerV2
- /notifications/v2: 当发布消息有更新时通知client配置已变更
- 接口使用Http Long Polling方式实现,用于配置中心配置变更后动态通知客户端
长连接实际上我们是通过Http Long Polling实现的,具体而言:
- 客户端发起一个Http请求到服务端
- 服务端会保持住这个连接60秒
- 如果在60秒内有客户端关心的配置变化,被保持住的客户端请求会立即返回,并告知客户端有配置变化的namespace信息,客户端会据此拉取对应namespace的最新配置
- 如果在60秒内没有客户端关心的配置变化,那么会返回Http状态码304给客户端
- 客户端在收到服务端请求后会立即重新发起连接,回到第一步
配置查询服务
ConfigService
/**
* 配置加载服务接口,用于加载发布的配置信息
* @author Jason Song(song_s@ctrip.com)
*/
public interface ConfigService extends ReleaseMessageListener {
/**
* 加载发布配置信息
* Load config
*
* @param clientAppId the client's app id 客户端应用ID
* @param clientIp the client ip 客户端IP
* @param configAppId the requested config's app id 配置应用ID
* @param configClusterName the requested config's cluster name 配置的集群名称
* @param configNamespace the requested config's namespace name 配置的命名空间
* @param dataCenter the client data center 客户端的数据中心
* @param clientMessages the messages received in client side 通知消息
* @return the Release
*/
Release loadConfig(String clientAppId, String clientIp, String configAppId, String
configClusterName, String configNamespace, String dataCenter, ApolloNotificationMessages clientMessages);
}
ConfigService接口
- 用于加载给定参数下所有已发布的配置信息
AbstractConfigService
/**
* 抽象的配置加载服务,用于加载发布的配置信息
* @author Jason Song(song_s@ctrip.com)
*/
public abstract class AbstractConfigService implements ConfigService {
@Autowired
private GrayReleaseRulesHolder grayReleaseRulesHolder;
/**
* 加载发布配置的记录
* @param clientAppId the client's app id 客户端应用ID
* @param clientIp the client ip 客户端IP
* @param configAppId the requested config's app id 配置应用ID
* @param configClusterName the requested config's cluster name 配置的集群名称
* @param configNamespace the requested config's namespace name 配置的命名空间
* @param dataCenter the client data center 客户端的数据中心
* @param clientMessages the messages received in client side 通知消息
* @return
*/
@Override
public Release loadConfig(String clientAppId, String clientIp, String configAppId, String configClusterName,
String configNamespace, String dataCenter, ApolloNotificationMessages clientMessages) {
//判断集群名称是否为默认
// load from specified cluster fist
if (!Objects.equals(ConfigConsts.CLUSTER_NAME_DEFAULT, configClusterName)) {
//根据集群名称查配置发布记录
Release clusterRelease = findRelease(clientAppId, clientIp, configAppId, configClusterName, configNamespace,
clientMessages);
if (!Objects.isNull(clusterRelease)) {
return clusterRelease;
}
}
// try to load via data center
if (!Strings.isNullOrEmpty(dataCenter) && !Objects.equals(dataCenter, configClusterName)) {
//根据数据中心查询配置发布记录
Release dataCenterRelease = findRelease(clientAppId, clientIp, configAppId, dataCenter, configNamespace,
clientMessages);
if (!Objects.isNull(dataCenterRelease)) {
return dataCenterRelease;
}
}
//加载默认的配置发布记录
// fallback to default release
return findRelease(clientAppId, clientIp, configAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, configNamespace,
clientMessages);
}
/**
* 查询发布记录
* Find release
*/
private Release findRelease(String clientAppId, String clientIp, String configAppId, String configClusterName,
String configNamespace, ApolloNotificationMessages clientMessages) {
Long grayReleaseId = grayReleaseRulesHolder.findReleaseIdFromGrayReleaseRule(clientAppId, clientIp, configAppId,
configClusterName, configNamespace);
Release release = null;
if (grayReleaseId != null) {
//查询发布记录
release = findActiveOne(grayReleaseId, clientMessages);
}
if (release == null) {
//查询最后的发布记录
release = findLatestActiveRelease(configAppId, configClusterName, configNamespace, clientMessages);
}
return release;
}
/**
* 根据ID查询有效发布记录
* Find active release by id
*/
protected abstract Release findActiveOne(long id, ApolloNotificationMessages clientMessages);
/**
* 根据应用ID,集群名称、命名空间查询发布记录
* Find active release by app id, cluster name and namespace name
*/
protected abstract Release findLatestActiveRelease(String configAppId, String configClusterName,
String configNamespaceName, ApolloNotificationMessages clientMessages);
}
AbstractConfigService
- 查询发布配置的抽象实现,重新抽象了方法findLatestActiveRelease与findActiveOne供子类实现
DefaultConfigService
/**
* 默认的配置查询服务,无缓存功能
* config service with no cache
*
* @author Jason Song(song_s@ctrip.com)
*/
public class DefaultConfigService extends AbstractConfigService {
/**
* 发布记录操作服务,通过操作DB资源获取发布记录
*/
@Autowired
private ReleaseService releaseService;
@Override
protected Release findActiveOne(long id, ApolloNotificationMessages clientMessages) {
//调用发布记录操作服务查询配置发布记录
return releaseService.findActiveOne(id);
}
@Override
protected Release findLatestActiveRelease(String configAppId, String configClusterName, String configNamespace,
ApolloNotificationMessages clientMessages) {
//调用发布记录操作服务查询配置发布记录
return releaseService.findLatestActiveRelease(configAppId, configClusterName,
configNamespace);
}
@Override
public void handleMessage(ReleaseMessage message, String channel) {
// since there is no cache, so do nothing
//无本地缓存,每次获取都是数据库最近的发布记录,所以此处发布记录变更监听处理函数不做任何操作
}
}
DefaultConfigService
- 无缓存功能的实现,通过DB操作资源来查询库中的发布配置数据
- DB操作资源:ReleaseService
ConfigServiceWithCache
/**
* 配置查询服务,使用guava做本地缓存,带有本地缓存功能的实现
* config service with guava cache
*
* @author Jason Song(song_s@ctrip.com)
*/
public class ConfigServiceWithCache extends AbstractConfigService {
private static final Logger logger = LoggerFactory.getLogger(ConfigServiceWithCache.class);
/**
* 默认的缓存失效时长 1h
*/
private static final long DEFAULT_EXPIRED_AFTER_ACCESS_IN_MINUTES = 60;//1 hour
private static final String TRACER_EVENT_CACHE_INVALIDATE = "ConfigCache.Invalidate";
private static final String TRACER_EVENT_CACHE_LOAD = "ConfigCache.LoadFromDB";
private static final String TRACER_EVENT_CACHE_LOAD_ID = "ConfigCache.LoadFromDBById";
private static final String TRACER_EVENT_CACHE_GET = "ConfigCache.Get";
private static final String TRACER_EVENT_CACHE_GET_ID = "ConfigCache.GetById";
private static final Splitter STRING_SPLITTER =
Splitter.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).omitEmptyStrings();
/**
* 发布记录操作服务
*/
@Autowired
private ReleaseService releaseService;
/**
* 发布消息操作服务
*/
@Autowired
private ReleaseMessageService releaseMessageService;
/**
* 构建一个发布消息ID与配置发布记录对应的关系缓存
*/
private LoadingCache<String, ConfigCacheEntry> configCache;
private LoadingCache<Long, Optional<Release>> configIdCache;
/**
* 空的配置发布实体
*/
private ConfigCacheEntry nullConfigCacheEntry;
public ConfigServiceWithCache() {
nullConfigCacheEntry = new ConfigCacheEntry(ConfigConsts.NOTIFICATION_ID_PLACEHOLDER, null);
}
/**
* 初始化方法,在实例创建后调用
*/
@PostConstruct
void initialize() {
//初始化本地缓存
configCache = CacheBuilder.newBuilder()
.expireAfterAccess(DEFAULT_EXPIRED_AFTER_ACCESS_IN_MINUTES, TimeUnit.MINUTES)
.build(new CacheLoader<String, ConfigCacheEntry>() {
@Override
public ConfigCacheEntry load(String key) throws Exception {
//appId+clusterName+namespaceName
//根据KEY切分命名空间信息集合
List<String> namespaceInfo = STRING_SPLITTER.splitToList(key);
if (namespaceInfo.size() != 3) {
Tracer.logError(
new IllegalArgumentException(String.format("Invalid cache load key %s", key)));
return nullConfigCacheEntry;
}
Transaction transaction = Tracer.newTransaction(TRACER_EVENT_CACHE_LOAD, key);
try {
//查询最后配置发布消息
ReleaseMessage latestReleaseMessage = releaseMessageService.findLatestReleaseMessageForMessages(Lists
.newArrayList(key));
//查询最后的配置发布记录
Release latestRelease = releaseService.findLatestActiveRelease(namespaceInfo.get(0), namespaceInfo.get(1),
namespaceInfo.get(2));
transaction.setStatus(Transaction.SUCCESS);
//构建通知ID,当最后配置发布消息为null 通知ID=-1,标识无通知信息
long notificationId = latestReleaseMessage == null ? ConfigConsts.NOTIFICATION_ID_PLACEHOLDER : latestReleaseMessage
.getId();
//
if (notificationId == ConfigConsts.NOTIFICATION_ID_PLACEHOLDER && latestRelease == null) {
return nullConfigCacheEntry;
}
//构建缓存实例, 通知ID-最后的配置发布记录
return new ConfigCacheEntry(notificationId, latestRelease);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
});
configIdCache = CacheBuilder.newBuilder()
.expireAfterAccess(DEFAULT_EXPIRED_AFTER_ACCESS_IN_MINUTES, TimeUnit.MINUTES)
.build(new CacheLoader<Long, Optional<Release>>() {
@Override
public Optional<Release> load(Long key) throws Exception {
Transaction transaction = Tracer.newTransaction(TRACER_EVENT_CACHE_LOAD_ID, String.valueOf(key));
try {
//查询配置发布消息
Release release = releaseService.findActiveOne(key);
transaction.setStatus(Transaction.SUCCESS);
return Optional.ofNullable(release);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
});
}
@Override
protected Release findActiveOne(long id, ApolloNotificationMessages clientMessages) {
Tracer.logEvent(TRACER_EVENT_CACHE_GET_ID, String.valueOf(id));
return configIdCache.getUnchecked(id).orElse(null);
}
@Override
protected Release findLatestActiveRelease(String appId, String clusterName, String namespaceName,
ApolloNotificationMessages clientMessages) {
//构建缓存KEY
String key = ReleaseMessageKeyGenerator.generate(appId, clusterName, namespaceName);
Tracer.logEvent(TRACER_EVENT_CACHE_GET, key);
//缓存获取
ConfigCacheEntry cacheEntry = configCache.getUnchecked(key);
//校验缓存是否已经失效,失效更新缓存
//cache is out-dated
if (clientMessages != null && clientMessages.has(key) &&
clientMessages.get(key) > cacheEntry.getNotificationId()) {
//invalidate the cache and try to load from db again
invalidate(key);
cacheEntry = configCache.getUnchecked(key);
}
return cacheEntry.getRelease();
}
/**
* 校验缓存中的KEY
* @param key
*/
private void invalidate(String key) {
configCache.invalidate(key);
Tracer.logEvent(TRACER_EVENT_CACHE_INVALIDATE, key);
}
/**
* 发布消息监听回调函数,用于处理新记录的发布回调
* @param message
* @param channel
*/
@Override
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(message.getMessage())) {
return;
}
try {
//校验缓存
invalidate(message.getMessage());
//在缓存中获取当前KEY的值,用于更新缓存信息
//warm up the cache
configCache.getUnchecked(message.getMessage());
} catch (Throwable ex) {
//ignore
}
}
/**
* 发布消息ID与发布记录对应关系实体
*/
private static class ConfigCacheEntry {
private final long notificationId;
private final Release release;
public ConfigCacheEntry(long notificationId, Release release) {
this.notificationId = notificationId;
this.release = release;
}
public long getNotificationId() {
return notificationId;
}
public Release getRelease() {
return release;
}
}
}
ConfigServiceWithCache
带有本地缓存功能的查询服务实现,与DefaultConfigService比多了数据本地缓存功能。
ConfigService对应操作DB库
ApolloConfigDB