Tars Java客户端-原理篇

概述

客户端两种应用场景

  • 已发布的Tars服务作为客户端调用其他Tars服务
  • 独立的应用程序访问Tars服务的接口,通常测试使用

主要流程

获取ServantProxy实例的流程

Communicator.stringToProxy((Class<T> clazz, String objName)


ServantProxyFactory.getServantProxy()


ObjectProxyFactory.getObjectProxy()
{
1、初始化servantProxyConfig,依据Comunicator的配置,和obj的名称
createServantProxyConfig
2、更新Endpoint,// 从主控获取服务对应的Endpoint信息,ip和端口
ObjectProxyFactory.updateServantEndpoints(ServantProxyConfig cfg)
3、创建LoadBalance,现在只支持一种默认的负载均衡策略
createLoadBalance(servantProxyConfig)
4、创建protocolInvoker,
createProtocolInvoker(api, objName, servantProxyConfig)
5、上述信息初始化一个新的ObjectProxy实例
new ObjectProxy<T>(api, objName, servantProxyConfig, loadBalance, protocolInvoker, communicator)
}


ObjectProxyFactory.createProtocolInvoker()
1、创建编解码类实例Codec,目前只支持tars协议,若接口类带有@Servant注释,即为tars协议

2、初始化TarsProtocolInvoker
protocolInvoker = new TarsProtocolInvoker<T>(api, servantProxyConfig, new ServantProtocolFactory(codec), communicator.getThreadPoolExecutor());


ServantProtocolInvoker(Class<T> api, ServantProxyConfig config, ProtocolFactory protocolFactory, ThreadPoolExecutor threadPoolExecutor)

ServantProtocolInvoker.initInvoker()
1、从objectname中解析出来所有服务地址列表,不同的服务用冒号分隔
AppName.ServerName.ObjName@tcp -h 172.0.1.1 -t 60000 -p 12345:tcp -h 172.0.1.1 -t 60000 -p 12346

2、每个URL创建Invoker
TarsProtocolInvoker.create()


TarsProtocolInvoker.create();
1、通过url创建客户端
getClients(url)
1.1 根据配置的连接数量,初始化多个ServantClient,每个ServantClient对应一个Session,也就是一个连接
initClient
2、初始化TarsInvoker

Rpc函数调用流程

根据proxy,method,参数调用函数
ObjectProxy.invoke(Object proxy, Method method, Object[] args)


1、将proxy,method,arg组装成InvokeContext
InvokeContext context = protocolInvoker.createContext(proxy, method, args);

2、通过负载均衡选择合适的invoker
Invoker<T> invoker = loadBalancer.select(protocolInvoker.getInvokers(), context);

3、调用invoke执行函数
invoker.invoke(context);


TarsInvoker.doInvokeServant(ServantInvokeContext inv)

1、根据方法名可以知道是同步调用还是异步调用
boolean isAsync = TarsHelper.isAsync(inv.getMethodName());

2、异步调用,调用对应的函数后返回
invokeWithAsync(method, inv.getArguments(), inv.getAttachments());
return null;

3、同步调用,调用对应的函数返回结果
TarsServantResponse response = invokeWithSync(method, inv.getArguments(), inv.getAttachments());
return response.getResult();


TarsInvoker.invokeWithSync()

1、构造Request请求包
TarsServantRequest request = new TarsServantRequest(client.getIoSession());

2、发送请求
client.invokeWithSync(request);


ServantClient.invokeWithSync()

1、检查连接,若连接没有建立或者已经断开,则主动发起连接
ensureConnected();

2、创建一个Ticket,Ticket用户线程间通信,接收完Response后通知调用方,或者触发回调函数。
ticket = TicketManager.createTicket(request, session, this.syncTimeout);

3、发送请求,若为异步操作,则不会再等待返回结果
current.write(request);

4、等待返回结果
ticket.await(this.syncTimeout, TimeUnit.MILLISECONDS)

5、获取Response
response = ticket.response();

类及类之间的关系

Proxy相关类

image.png
  • ObjectProxy实际是实现了InvocationHandler接口和ServantProxy接口,最主要的是实现了InvocationHandler的invoke()方法,以及保存了支持invoke的相关变量。
  • ObjectProxyFactory 作为ObjectProxy的工厂类,完成ObjectProxy实例的创建。
  • ServantProxyFactory 作为客户端调用远程方法的Proxy实例的创建和缓存,其中
    ConcurrentHashMap<String, Object> cache
    管理了objName,与实现接口Proxy的对象实例,Rpc调用时使用该实例直接调用的。
    private <T> Object createProxy(Class<T> clazz, ObjectProxy<T> objectProxy) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[] { clazz, ServantProxy.class }, objectProxy);
    }

是有ObjectProxy类实现了InvocationHandler接口,函数调用的操作是ObjectProxy实现invoke()函数完成的。

  • 总结
    ObjectProxyFactory 和ObjectProxyFactory 容易混淆,从名称根本不知道两者职责的区别。其实ObjectProxy和JAVA SDK的Proxy名称搞的比较相近的原因。如果将ObjectProxy改为ServantInvocationHandler感觉会清晰一些。

另外,Communicator中保存了,ServantProxyFactory和ObjectProxyFactory实例,而ServantProxyFactory和ObjectProxyFactory也保存了Communicator的实例,这种互相引用会导致类之间的关系不清晰。可以简单的提取Comunicator中用户的Context信息组成一个独立的类,其他使用这些信息的类,引用这个类获取相关信息。

Invoker相关类

image.png
  • Invoker 实现了invoke(),通过调用ServantClient进行网络通信,实现远程方法调用。
  • ProtocolInvoker 维护了多个连接到不同服务器的Invoker,方法调用时通过LoadBalance从中选出一个invoker发送请求。
  • 总结 感觉ProtocolInvoker,名字应该改为InvokerPool更形象一些。

Communicator相关类

  • CommunicatorFactory
ConcurrentHashMap<Object, Communicator> CommunicatorMap = new ConcurrentHashMap<Object, Communicator>();
Communicator communicator = null;

Communicator通过一个Map管理所有Communicator。同时CommunicatorFactory维护了一个默认的Communicator,Tars服务初始化的时候默认会初始化这个Communicator。

  • Communicator

对应的locator或者config,即一个Tars主控制器,一个Tars环境

client的配置文件格式如下:

<tars>
    <application>
        <client>
            locator=tars.tarsregistry.QueryObj@tcp -h 192.168.10.115 -p 17890
            sync-invoke-timeout=20000
            async-invoke-timeout=20000
            refresh-endpoint-interval=60000
            stat=tars.tarsstat.StatObj
            property=tars.tarsproperty.PropertyObj
            report-interval=60000
            modulename=TestApp.HelloJavaServer
        </client>
    </application>
</tars>

所以使用Tars服务中部署的方式Communicator通常是全局唯一,统一采用服务发布中模板的client配置

···
private volatile String id;
private volatile CommunicatorConfig communicatorConfig;
private volatile ThreadPoolExecutor threadPoolExecutor;
private final ServantProxyFactory servantProxyFactory = new ServantProxyFactory(this);
private final ObjectProxyFactory objectProxyFactory = new ObjectProxyFactory(this);

private final QueryHelper queryHelper = new QueryHelper(this);
private final StatHelper statHelper = new StatHelper(this);

private final ReentrantLock lock = new ReentrantLock();
private final AtomicBoolean inited = new AtomicBoolean(false);

···

其他类

  • ClientPoolManager
    private final static ConcurrentHashMap<CommunicatorConfig, ThreadPoolExecutor> clientThreadPoolMap = new ConcurrentHashMap<CommunicatorConfig, ThreadPoolExecutor>();
    private final static ConcurrentHashMap<ServantProxyConfig, SelectorManager> selectorsMap = new ConcurrentHashMap<ServantProxyConfig, SelectorManager>();

管理了两个线程池

  • clientThreadPoolMap : 管理所有业务线程池,每个Communicator一个业务线程池
  • SelectorManager: 管理所有网络连接池,每个proxy一个网络连接池,这些网络连接都是连接的一个服务端地址。

参考资料

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355