工作以来在微服务实践中接触到了很多的RPC实现方式,一直没能做一个系统的分析和总结。本系列将以美团点评开源的pigeon框架为例,从服务方初始化处理和服务注册、端口绑定和消息处理、调用方服务监听和服务调用、熔断和降级多个方面来梳理RPC的来龙去脉。
pigeon版本:2.9.8
github地址:https://github.com/dianping/pigeon/tree/2.9.8
demo服务端:https://github.com/love4coding/rpcserver
demo客户端:https://github.com/love4coding/rpcclient
首先通过一张图来了解下调用方、服务方、注册中心三者之间的关系:
pigeon支持springXml服务配置和Annotation配置,demo中用的是spingXml配置。如下:
<bean class="com.dianping.pigeon.remoting.provider.config.spring.ServiceBean" init-method="init">
<property name="services">
<map>
<entry key="http://service.dianping.com/rpcserver/commonService_1.0.0" value-ref="commonService" />
</map>
</property>
</bean>
services属性下的key是服务全局唯一的标识url(如果一个远程服务未特别设置,url默认是服务接口类名),value是引用的服务bean。
进入到ServiceBean.init():
ServerConfig:为pigeon服务配置,各个属性取默认值,可配置;其中protocol默认值为"default";
ProviderConfig:对应上文配置文件services标签里的每一个实例bean;
进入ServiceFactory,循环处理ProviderConfig
/**
* add the services to pigeon and publish these services to registry
*/
public static void addServices(List<ProviderConfig<?>> providerConfigList) throws RpcException {
if (providerConfigList != null && !providerConfigList.isEmpty()) {
for (ProviderConfig<?> providerConfig : providerConfigList) {
addService(providerConfig);
}
}
}
public static <T> void addService(ProviderConfig<T> providerConfig) throws RpcException {
publishPolicy.doAddService(providerConfig);
}
具体实现在AbstractPublishPolicy.doAddService,添加服务实例并注册
@Override
public void doAddService(ProviderConfig providerConfig) {
try {
//对配置的url做合法性检查,如果为空,则取服务接口类名
//url = providerConfig.getServiceInterface().getCanonicalName();
checkServiceName(providerConfig);
//添加服务配置,将服务方的service bean以url为key缓存ConcurrentHashMap<String, ProviderConfig<?>>中
//将service bean的有效方法缓存到ConcurrentHashMap中
//详细实现见代码片段1》
ServicePublisher.addService(providerConfig);
//ProviderBootStrap.init()启动http协议JettyHttpServer, 此处启动default协议NettyServer
//详细逻辑见下篇端口绑定和消息处理
ServerConfig serverConfig = ProviderBootStrap.startup(providerConfig);
providerConfig.setServerConfig(serverConfig);
//服务注册,详细实现见代码片段2》
ServicePublisher.publishService(providerConfig, false);
} catch (Throwable t) {
throw new RpcException("error while adding service:" + providerConfig, t);
}
}
1》:在ServicePublisher.addService中将服务方的service bean以url为key添加到serviceCache map中,如果有版本号,则进行版本号处理
private static ConcurrentHashMap<String, ProviderConfig<?>> serviceCache = new ConcurrentHashMap<String, ProviderConfig<?>>();
public static <T> void addService(ProviderConfig<T> providerConfig) throws Exception {
String version = providerConfig.getVersion();
String url = providerConfig.getUrl();
if (StringUtils.isBlank(version)) {// default version
serviceCache.put(url, providerConfig);
} else {
//版本号逻辑处理
String urlWithVersion = getServiceUrlWithVersion(url, version);
if (serviceCache.containsKey(url)) {
serviceCache.put(urlWithVersion, providerConfig);
ProviderConfig<?> providerConfigDefault = serviceCache.get(url);
String defaultVersion = providerConfigDefault.getVersion();
if (!StringUtils.isBlank(defaultVersion)) {
if (VersionUtils.compareVersion(defaultVersion, providerConfig.getVersion()) < 0) {
// 以最新版本作为服务方默认处理bean
serviceCache.put(url, providerConfig);
}
}
} else {
serviceCache.put(urlWithVersion, providerConfig);
// use this service as the default provider
serviceCache.put(url, providerConfig);
}
}
T service = providerConfig.getService();
if (service instanceof InitializingService) {
((InitializingService) service).initialize();
}
//遍历服务方service bean的有效方法,以url为key添加到methods map缓存中
ServiceMethodFactory.init(url);
}
简单看下ServiceMethodFactory.init的实现
private static Map<String, ServiceMethodCache> methods = new ConcurrentHashMap<String, ServiceMethodCache>();
public static void init(String url) {
getServiceMethodCache(url);
}
public static ServiceMethodCache getServiceMethodCache(String url) {
ServiceMethodCache serviceMethodCache = methods.get(url);
if (serviceMethodCache == null) {
Map<String, ProviderConfig<?>> services = ServicePublisher.getAllServiceProviders();
ProviderConfig<?> providerConfig = services.get(url);
if (providerConfig != null) {
Object service = providerConfig.getService();
Method[] methodArray = service.getClass().getMethods();
serviceMethodCache = new ServiceMethodCache(url, service);
//遍历service bean的所有方法
for (Method method : methodArray) {
//ingoreMethods为java对象继承自Object.class和Class.class的方法,需要忽略掉
if (!ingoreMethods.contains(method.getName())) {
method.setAccessible(true);
serviceMethodCache.addMethod(method.getName(), new ServiceMethod(service, method));
//isCompact默认为true,按url和方法名称生成id隐射到map中
if (isCompact) {
int id = LangUtils.hash(url + "#" + method.getName(), 0, Integer.MAX_VALUE);
ServiceId serviceId = new ServiceId(url, method.getName());
ServiceId lastId = CompactRequest.PROVIDER_ID_MAP.putIfAbsent(id, serviceId);
if (lastId != null && !serviceId.equals(lastId)) {
throw new IllegalArgumentException("same id for service:" + url + ", method:"
+ method.getName());
}
}
}
}
methods.put(url, serviceMethodCache);
}
}
return serviceMethodCache;
}
以上代码主要是对每一个service bean和service bean中的方法进行map映射,便于pigeon服务接收调用方的消息后取对应的service bean进行消息处理。
2》:在ServicePublisher.publishService中将服务信息注册到注册中心zookeeper上,便于调用方监听和发起请求。
public static <T> void publishService(ProviderConfig<T> providerConfig, boolean forcePublish)
throws RegistryException {
String url = providerConfig.getUrl();
。。。
List<Server> servers = ProviderBootStrap.getServers(providerConfig);
int registerCount = 0;
for (Server server : servers) {
//调用ZK客户端CuratorClient注册服务信息,并设置权重为0
publishServiceToRegistry(url, server.getRegistryUrl(url), server.getPort(),
RegistryManager.getInstance().getGroup(url), providerConfig.isSupported());
registerCount++;
}
。。。
//设置服务权重为1,服务Online
ServiceOnlineTask.start();
providerConfig.setPublished(true);
}
//注册IP:PORT信息到zookeeper,初始化权重
private synchronized static <T> void publishServiceToRegistry(String url, String registryUrl, int port, String group, boolean support) {
String ip = configManager.getLocalIp();
String serverAddress = ip + ":" + port;
//初始化为0
int weight = Constants.WEIGHT_INITIAL;
//注册服务和权重信息
RegistryManager.getInstance().registerService(registryUrl, group, serverAddress, weight);
if (weight >= 0) {
if (!serverWeightCache.containsKey(serverAddress)) {
//设置服务的APP名称和版本号
RegistryManager.getInstance().setServerApp(serverAddress, configManager.getAppName());
RegistryManager.getInstance().setServerVersion(serverAddress, VersionUtils.VERSION);
}
serverWeightCache.put(serverAddress, weight);
}
}
publishServiceToRegistry注册后服务信息如下:
初始化权重信息:
ServiceOnlineTask执行后的服务权重信息:
关于服务启动和注册到此结束,欢迎评论区留言。
转载请备注原文链接。