概述
客户端两种应用场景
- 已发布的Tars服务作为客户端调用其他Tars服务
- 独立的应用程序访问Tars服务的接口,通常测试使用
主要流程
获取ServantProxy实例的流程
Communicator.stringToProxy((Class<T> clazz, String objName)
ServantProxyFactory.getServantProxy()
ObjectProxyFactory.getObjectProxy()
{
1、初始化servantProxyConfig,依据Comunicator的配置,和obj的名称
createServantProxyConfig
2、更新Endpoint,// 从主控获取服务对应的Endpoint信息,ip和端口
ObjectProxyFactory.updateServantEndpoints(ServantProxyConfig cfg)
3、创建LoadBalance,现在只支持一种默认的负载均衡策略
createLoadBalance(servantProxyConfig)
4、创建protocolInvoker,
createProtocolInvoker(api, objName, servantProxyConfig)
5、上述信息初始化一个新的ObjectProxy实例
new ObjectProxy<T>(api, objName, servantProxyConfig, loadBalance, protocolInvoker, communicator)
}
ObjectProxyFactory.createProtocolInvoker()
1、创建编解码类实例Codec,目前只支持tars协议,若接口类带有@Servant注释,即为tars协议
2、初始化TarsProtocolInvoker
protocolInvoker = new TarsProtocolInvoker<T>(api, servantProxyConfig, new ServantProtocolFactory(codec), communicator.getThreadPoolExecutor());
ServantProtocolInvoker(Class<T> api, ServantProxyConfig config, ProtocolFactory protocolFactory, ThreadPoolExecutor threadPoolExecutor)
ServantProtocolInvoker.initInvoker()
1、从objectname中解析出来所有服务地址列表,不同的服务用冒号分隔
AppName.ServerName.ObjName@tcp -h 172.0.1.1 -t 60000 -p 12345:tcp -h 172.0.1.1 -t 60000 -p 12346
2、每个URL创建Invoker
TarsProtocolInvoker.create()
TarsProtocolInvoker.create();
1、通过url创建客户端
getClients(url)
1.1 根据配置的连接数量,初始化多个ServantClient,每个ServantClient对应一个Session,也就是一个连接
initClient
2、初始化TarsInvoker
Rpc函数调用流程
根据proxy,method,参数调用函数
ObjectProxy.invoke(Object proxy, Method method, Object[] args)
1、将proxy,method,arg组装成InvokeContext
InvokeContext context = protocolInvoker.createContext(proxy, method, args);
2、通过负载均衡选择合适的invoker
Invoker<T> invoker = loadBalancer.select(protocolInvoker.getInvokers(), context);
3、调用invoke执行函数
invoker.invoke(context);
TarsInvoker.doInvokeServant(ServantInvokeContext inv)
1、根据方法名可以知道是同步调用还是异步调用
boolean isAsync = TarsHelper.isAsync(inv.getMethodName());
2、异步调用,调用对应的函数后返回
invokeWithAsync(method, inv.getArguments(), inv.getAttachments());
return null;
3、同步调用,调用对应的函数返回结果
TarsServantResponse response = invokeWithSync(method, inv.getArguments(), inv.getAttachments());
return response.getResult();
TarsInvoker.invokeWithSync()
1、构造Request请求包
TarsServantRequest request = new TarsServantRequest(client.getIoSession());
2、发送请求
client.invokeWithSync(request);
ServantClient.invokeWithSync()
1、检查连接,若连接没有建立或者已经断开,则主动发起连接
ensureConnected();
2、创建一个Ticket,Ticket用户线程间通信,接收完Response后通知调用方,或者触发回调函数。
ticket = TicketManager.createTicket(request, session, this.syncTimeout);
3、发送请求,若为异步操作,则不会再等待返回结果
current.write(request);
4、等待返回结果
ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)
5、获取Response
response = ticket.response();
类及类之间的关系
Proxy相关类
- ObjectProxy实际是实现了InvocationHandler接口和ServantProxy接口,最主要的是实现了InvocationHandler的invoke()方法,以及保存了支持invoke的相关变量。
- ObjectProxyFactory 作为ObjectProxy的工厂类,完成ObjectProxy实例的创建。
- ServantProxyFactory 作为客户端调用远程方法的Proxy实例的创建和缓存,其中
ConcurrentHashMap<String, Object> cache
管理了objName,与实现接口Proxy的对象实例,Rpc调用时使用该实例直接调用的。
private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { clazz, ServantProxy.class }, objectProxy);
}
是有ObjectProxy类实现了InvocationHandler接口,函数调用的操作是ObjectProxy实现invoke()函数完成的。
- 总结
ObjectProxyFactory 和ObjectProxyFactory 容易混淆,从名称根本不知道两者职责的区别。其实ObjectProxy和JAVA SDK的Proxy名称搞的比较相近的原因。如果将ObjectProxy改为ServantInvocationHandler感觉会清晰一些。
另外,Communicator中保存了,ServantProxyFactory和ObjectProxyFactory实例,而ServantProxyFactory和ObjectProxyFactory也保存了Communicator的实例,这种互相引用会导致类之间的关系不清晰。可以简单的提取Comunicator中用户的Context信息组成一个独立的类,其他使用这些信息的类,引用这个类获取相关信息。
Invoker相关类
- Invoker 实现了invoke(),通过调用ServantClient进行网络通信,实现远程方法调用。
- ProtocolInvoker 维护了多个连接到不同服务器的Invoker,方法调用时通过LoadBalance从中选出一个invoker发送请求。
- 总结 感觉ProtocolInvoker,名字应该改为InvokerPool更形象一些。
Communicator相关类
- CommunicatorFactory
ConcurrentHashMap<Object, Communicator> CommunicatorMap = new ConcurrentHashMap<Object, Communicator>();
Communicator communicator = null;
Communicator通过一个Map管理所有Communicator。同时CommunicatorFactory维护了一个默认的Communicator,Tars服务初始化的时候默认会初始化这个Communicator。
- Communicator
对应的locator或者config,即一个Tars主控制器,一个Tars环境
client的配置文件格式如下:
<tars>
<application>
<client>
locator=tars.tarsregistry.QueryObj@tcp -h 192.168.10.115 -p 17890
sync-invoke-timeout=20000
async-invoke-timeout=20000
refresh-endpoint-interval=60000
stat=tars.tarsstat.StatObj
property=tars.tarsproperty.PropertyObj
report-interval=60000
modulename=TestApp.HelloJavaServer
</client>
</application>
</tars>
所以使用Tars服务中部署的方式Communicator通常是全局唯一,统一采用服务发布中模板的client配置
···
private volatile String id;
private volatile CommunicatorConfig communicatorConfig;
private volatile ThreadPoolExecutor threadPoolExecutor;
private final ServantProxyFactory servantProxyFactory = new ServantProxyFactory(this);
private final ObjectProxyFactory objectProxyFactory = new ObjectProxyFactory(this);
private final QueryHelper queryHelper = new QueryHelper(this);
private final StatHelper statHelper = new StatHelper(this);
private final ReentrantLock lock = new ReentrantLock();
private final AtomicBoolean inited = new AtomicBoolean(false);
···
其他类
- ClientPoolManager
private final static ConcurrentHashMap<CommunicatorConfig, ThreadPoolExecutor> clientThreadPoolMap = new ConcurrentHashMap<CommunicatorConfig, ThreadPoolExecutor>();
private final static ConcurrentHashMap<ServantProxyConfig, SelectorManager> selectorsMap = new ConcurrentHashMap<ServantProxyConfig, SelectorManager>();
管理了两个线程池
- clientThreadPoolMap : 管理所有业务线程池,每个Communicator一个业务线程池
- SelectorManager: 管理所有网络连接池,每个proxy一个网络连接池,这些网络连接都是连接的一个服务端地址。