在此之前我们理清了pigeon服务方的初始化、注册和消息处理逻辑,本篇我们来看看pigeon调用方的实现。
第一部分我们先看看服务调用的实现。
服务调用示例:
@RestController
@RequestMapping("/common")
public class CommonController {
@Autowired
private CommonService commonService;
@RequestMapping(value = "/hello")
@ResponseBody
public String hello(@RequestParam("name") String name) {
System.out.println("enter hello");
return commonService.hello(name);
}
}
CommonService 就是服务方发布的服务接口,可以看到在调用方只需要引入相应服务的api jar包,就可以像调用本地方法一样调用对应的服务接口,这也是大部分RPC框架的实现效果。
CommonService 通过@Autowired注解在spring容器中找到对应的bean,我们来看看相应的bean配置
<bean id="commonService" class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean" init-method="init">
<!-- 服务全局唯一的标识url,默认是服务接口类名,必须设置 -->
<property name="url" value="http://service.dianping.com/rpcserver/commonService_1.0.0" />
<!-- 接口名称,必须设置 -->
<property name="interfaceName" value="com.study.rpcserver.api.CommonService" />
<!-- 超时时间,毫秒,默认5000,建议自己设置 -->
<property name="timeout" value="2000" />
<!-- 序列化,hessian/fst/protostuff,默认hessian,可不设置-->
<property name="serialize" value="hessian" />
<!-- 调用方式,sync/future/callback/oneway,默认sync,可不设置 -->
<property name="callType" value="sync" />
<!-- 失败策略,快速失败failfast/失败转移failover/失败忽略failsafe/并发取最快返回forking,默认failfast,可不设置 -->
<property name="cluster" value="failfast" />
<!-- 是否超时重试,默认false,可不设置 -->
<property name="timeoutRetry" value="false" />
<!-- 重试次数,默认1,可不设置 -->
<property name="retries" value="1" />
</bean>
ReferenceBean继承了spring的FactoryBean接口,来处理复杂bean的生成,通过getObject()方法来返回对应bean实例。接下来我们就以ReferenceBean为入口来切入pigeon调用方的实现思路。
public void init() throws Exception {
if (StringUtils.isBlank(interfaceName)) {
throw new IllegalArgumentException("invalid interface:" + interfaceName);
}
this.objType = ClassUtils.loadClass(this.classLoader, this.interfaceName.trim());
//服务调用相关的配置信息,就是我们对每一个接口服务在xml文件中的配置
InvokerConfig<?> invokerConfig = new InvokerConfig(this.objType, this.url, this.timeout, this.callType,
this.serialize, this.callback, this.group, this.writeBufferLimit, this.loadBalance, this.cluster,
this.retries, this.timeoutRetry, this.vip, this.version, this.protocol);
invokerConfig.setClassLoader(classLoader);
invokerConfig.setSecret(secret);
invokerConfig.setRegionPolicy(regionPolicy);
if (!CollectionUtils.isEmpty(methods)) {
Map<String, InvokerMethodConfig> methodMap = new HashMap<String, InvokerMethodConfig>();
invokerConfig.setMethods(methodMap);
for (InvokerMethodConfig method : methods) {
methodMap.put(method.getName(), method);
}
}
checkMock(); // 降级配置检查
invokerConfig.setMock(mock);
checkRemoteAppkey();
invokerConfig.setRemoteAppKey(remoteAppKey);
//生成接口的代理对象
this.obj = ServiceFactory.getService(invokerConfig);
configLoadBalance(invokerConfig);
}
//FactoryBean返回的bean实例
public Object getObject() {
return this.obj;
}
ServiceFactory.getService(invokerConfig);根据配置的interfaceName生成一个java代理对象
private static ServiceProxy serviceProxy = ServiceProxyLoader.getServiceProxy();
public static <T> T getService(InvokerConfig<T> invokerConfig) throws RpcException {
return serviceProxy.getProxy(invokerConfig);
}
跟踪代码,进入AbstractServiceProxy.getProxy方法,核心代码如下:
protected final static Map<InvokerConfig<?>, Object> services = new ConcurrentHashMap<InvokerConfig<?>, Object>();
@Override
public <T> T getProxy(InvokerConfig<T> invokerConfig) {
//InvokerConfig实现了自定义equals和hashCode方法
service = services.get(invokerConfig);
if (service == null) {
synchronized (interner.intern(invokerConfig)) {
service = services.get(invokerConfig);
if (service == null) {
//此处执行调用方的一些初始化逻辑,包括InvokerProcessHandlerFactory.init();初始化调用方Filter责任链等
InvokerBootStrap.startup();
//生成代理对象
service = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);
try {
//获取服务信息,创建Client实例
ClientManager.getInstance().registerClients(invokerConfig);
} catch (Throwable t) {
logger.warn("error while trying to setup service client:" + invokerConfig, t);
}
services.put(invokerConfig, service);
}
}
return (T) service;
}
AbstractSerializer.proxyRequest使用我们熟悉的JDK动态代理来生成服务接口的代理对象
@Override
public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException {
return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()),
new Class[] { invokerConfig.getServiceInterface() }, new ServiceInvocationProxy(invokerConfig,
InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig)));
}
//InvokerProcessHandlerFactory.selectInvocationHandler获取调用方请求责任链
public static void init() {
if (!isInitialized) {
if (Constants.MONITOR_ENABLE) {
registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
}
registerBizProcessFilter(new TraceFilter());
registerBizProcessFilter(new DegradationFilter());
//关于ClusterInvokeFilter后文详细介绍
registerBizProcessFilter(new ClusterInvokeFilter());
registerBizProcessFilter(new GatewayInvokeFilter());
registerBizProcessFilter(new ContextPrepareInvokeFilter());
registerBizProcessFilter(new SecurityFilter());
//远程调用
registerBizProcessFilter(new RemoteCallInvokeFilter());
bizInvocationHandler = createInvocationHandler(bizProcessFilters);
isInitialized = true;
}
}
public static ServiceInvocationHandler selectInvocationHandler(InvokerConfig<?> invokerConfig) {
return bizInvocationHandler;
}
ServiceInvocationProxy继承了java.lang.reflect.InvocationHandler接口,invoke实现逻辑如下:
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
//代理对象的非服务方法调用走原有逻辑
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(handler, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return handler.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return handler.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return handler.equals(args[0]);
}
//服务接口执行责任链处理逻辑
return extractResult(handler.handle(new DefaultInvokerContext(invokerConfig, methodName, parameterTypes, args)),
method.getReturnType());
}
同服务端责任链的分析一样,我们首先重点看下RemoteCallInvokeFilter的处理逻辑,核心代码如下:
@Override
public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
throws Throwable {
Client client = invocationContext.getClient();
InvocationRequest request = invocationContext.getRequest();
InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
。。。
//以同步调用场景分析下远程调用逻辑
CallbackFuture future = new CallbackFuture();
response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
invocationContext.getTimeline().add(new TimePoint(TimePhase.Q));
if (response == null) {
response = future.getResponse(request.getTimeout());
}
return response;
}
public static InvocationResponse sendRequest(Client client, InvocationRequest request, Callback callback) {
InvocationResponse response = response = client.write(request);
return response;
}
client.write(request);最终调用NettyClient或HttpInvokerClient的doWrite方法发送请求消息体。
至此我们理清了服务调用的逻辑,简单来说就是通过JDK动态代理来生成服务方接口对应的实例对象,在方法执行逻辑中调用远程服务。
但对于每一个服务接口,调用方是如何知道远程服务的访问地址的呢?以及新注册或者下线的服务地址,调用方如何得到即时通知?
接下来进入本篇第二部分,远程调用Client的初始化和调用方对服务信息的心跳监听。
以请求责任链的ClusterInvokeFilter为入口:
public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
throws Throwable {
InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
//失败策略cluster可配,默认为快速失败failfast
Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
if (cluster == null) {
throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
}
return cluster.invoke(handler, invocationContext);
}
跟踪代码进入FailfastCluster.invoke方法,核心代码如下:
private ClientManager clientManager = ClientManager.getInstance();
@Override
public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
throws Throwable {
InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
//构造请求消息对象
InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
//是否超时重试
boolean timeoutRetry = invokerConfig.isTimeoutRetry();
//重试次数
int retry = invokerConfig.getRetries(invocationContext.getMethodName());
//关于重试和重试次数的逻辑在此不做过多说明,只摘取主干代码
//获取远程客户端
Client remoteClient = clientManager.getClient(invokerConfig, request, null);
//就是在这里设置的RemoteCallInvokeFilter中用到的客户端Client
invocationContext.setClient(remoteClient);
try {
//向后执行责任链
return handler.handle(invocationContext);
} catch (NetworkException e) {
remoteClient = clientManager.getClient(invokerConfig, request, null);
invocationContext.setClient(remoteClient);
return handler.handle(invocationContext);
}
}
ClientManager 为单例模式,我们看看内部实现
//私有构造函数
private ClientManager() {
this.providerAvailableListener = new ProviderAvailableListener();
this.clusterListener = new DefaultClusterListener(providerAvailableListener);
this.clusterListenerManager.addListener(this.clusterListener);
providerAvailableThreadPool.execute(this.providerAvailableListener);
RegistryEventListener.addListener(providerChangeListener);
RegistryEventListener.addListener(registryConnectionListener);
RegistryEventListener.addListener(groupChangeListener);
registerThreadPool.getExecutor().allowCoreThreadTimeOut(true);
}
private RouteManager routerManager = DefaultRouteManager.INSTANCE;
public Client getClient(InvokerConfig<?> invokerConfig, InvocationRequest request, List<Client> excludeClients) {
//根据全局唯一标识url获取Client集合
List<Client> clientList = clusterListener.getClientList(invokerConfig);
List<Client> clientsToRoute = new ArrayList<Client>(clientList);
if (excludeClients != null) {
clientsToRoute.removeAll(excludeClients);
}
//根据负载均衡策略选取有效的Client
//此处细节比较多,感兴趣的朋友可以自行细致浏览下源码,限于篇幅不一一讲解了
return routerManager.route(clientsToRoute, invokerConfig, request);
}
距离目标越来越近了,我们继续跟踪代码DefaultClusterListener的实现
private ConcurrentHashMap<String, List<Client>> serviceClients = new ConcurrentHashMap<String, List<Client>>();
public List<Client> getClientList(InvokerConfig<?> invokerConfig) {
//根据url获取对应的Client集合
List<Client> clientList = this.serviceClients.get(invokerConfig.getUrl());
return clientList;
}
问题来了,serviceClients是在什么时候创建的Client实例呢?
我们回顾下AbstractServiceProxy.getProxy中的一段逻辑:
try {
ClientManager.getInstance().registerClients(invokerConfig);
} catch (Throwable t) {
logger.warn("error while trying to setup service client:" + invokerConfig, t);
}
从异常信息我们可以清晰的看到,这里就是创建service client的入口,最终调用到DefaultClusterListener.addConnect添加Client映射关系到serviceClients。调用链路比较长,在此简单贴一下线程调用栈:
至此我们理清了Client的创建,接下来我们看看调用方的心跳监听。
我们直接连接注册中心zookeeper的相关类CuratorClient,用的是curator-framework-2.7.1.jar,这个ZK客户端功能很强大,可以非常方便的对具体的zk节点添加listener回调。
private boolean newCuratorClient() throws InterruptedException {
//根据zk地址创建zkClient
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(address)
.sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout)
.retryPolicy(new MyRetryPolicy(retries, retryInterval)).build();
//监听连接状态,掉线重连
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
logger.info("zookeeper state changed to " + newState);
if (newState == ConnectionState.RECONNECTED) {
RegistryEventListener.connectionReconnected();
}
monitor.logEvent(EVENT_NAME, "zookeeper:" + newState.name().toLowerCase(), "");
}
});
//监听change事件!!!
client.getCuratorListenable().addListener(new CuratorEventListener(this), curatorEventListenerThreadPool);
client.start();
boolean isConnected = client.getZookeeperClient().blockUntilConnectedOrTimedOut();
CuratorFramework oldClient = this.client;
this.client = client;
close(oldClient);
return isConnected;
}
CuratorEventListener继承org.apache.curator.framework.api.CuratorListener,看下事件处理逻辑
@Override
public void eventReceived(CuratorFramework client, CuratorEvent curatorEvent) throws Exception {
WatchedEvent event = (curatorEvent == null ? null : curatorEvent.getWatchedEvent());
//过滤不敢兴趣的EventType
if (event == null
|| (event.getType() != EventType.NodeCreated && event.getType() != EventType.NodeDataChanged
&& event.getType() != EventType.NodeDeleted && event.getType() != EventType.NodeChildrenChanged)) {
return;
}
try {
//解析节点路径并分类
PathInfo pathInfo = parsePath(event.getPath());
if (pathInfo.type == ADDRESS) {//服务地址
addressChanged(pathInfo);
} else if (pathInfo.type == WEIGHT) {//权重
weightChanged(pathInfo);
} else if (pathInfo.type == APP) {
appChanged(pathInfo);
} else if (pathInfo.type == VERSION) {
versionChanged(pathInfo);
} else if (pathInfo.type == PROTOCOL) {
protocolChanged(pathInfo);
} else if (pathInfo.type == HOST_CONFIG) {
registryConfigChanged(pathInfo);
}
} catch (Throwable e) {
logger.error("Error in ZookeeperWatcher.process()", e);
return;
}
}
/*
* 1. Get newest value from ZK and watch again 2. Determine if changed
* against cache 3. notify if changed 4. pay attention to group fallback
* notification
*/
private void addressChanged(PathInfo pathInfo) throws Exception {
if (shouldNotify(pathInfo)) {
String hosts = client.get(pathInfo.path);
logger.info("Service address changed, path " + pathInfo.path + " value " + hosts);
List<String[]> hostDetail = Utils.getServiceIpPortList(hosts);
serviceChangeListener.onServiceHostChange(pathInfo.serviceName, hostDetail);
}
// Watch again
client.watch(pathInfo.path);
}
addressChanged难得加了注释,判断是否需要回调,回调。
本篇到此结束,内容较多,希望能对大家有所助益。
转载请备注原文链接。