在上一章中讲到了dubbo引用过程的第一步,在zk上注册和订阅。本章将接着上一章继续讲解之后的服务引用的步骤:
记得在上一章中我们看到了这段代码:
if (enabled) {
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
}
//InvokerDelegete中包装的Invoker实际上是DubboInvoker
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker,这里的url对应的地址和接口否是provider信息,parameters是providerUrl和consumerUrl的汇总信息
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
/**
* 此代码负责NIO框架Client创建及初始化,和提供者初始化相似,消费者的
* 初始化也基本上围绕着Client、Handler、Channel对象的创建与不断装饰
* 的过程,不同的是消费者底层是与提供者Server建立连接,此过程在remote
* 层下完成,remote层可分为消息交换层与网路传输层即Exchanger与
* Transporter层,还有,我们在说提供者初始化时,说过同个JVM中相同协议
* 的服务共享一个Server,同样在消费者初始化时,引用同一个提供者的所有
* 服务可以共享一个Client进行通信,这也就实现了Server-Client在同一个
* 通道中进行通信,实现长连接的高效通信,但是在服务请求数据量比较大时
* 或请求数比较多时,可以设置每服务每连接或每服务多连接可以提高通信效
* 率,具体是通过消费者方connections=2设置连接数。所有消费者端Client
* 有两种,一种是共享型Client,一种是创建型Client,当然共享型Client
* 属于创建型Client一部分,下面具体说说这两种Client创建的细节,也是服
* 务引用的重要细节
*/
private ExchangeClient[] getClients(URL url){
//是否共享连接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0){
service_share_connect = true;
connections = 1;
}
//有多少个连接就创建多少个Client
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect){ //如果是共享连接的话(客户端只创建一个Client)
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url); //这个初始化Client很重要
}
}
return clients;
}
/**
* 获取共享连接
* 此为创建共享型Client,共享型Client是指消费者引用同一
* 提供者的服务时,使用同一个Client来提高通信效率
*/
private ExchangeClient getSharedClient(URL url){
String key = url.getAddress(); //消费者的地址
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if ( client != null ){
if ( !client.isClosed()){
//如果这个共享client不等于null并且这个client没有关闭的话
//将client内部的引用计数加1
client.incrementAndGetCount();
return client;
} else {
//虽然这个client不为空,client确实关闭状态的
//于是要移除这个被关闭的client
referenceClientMap.remove(key);
}
}
//这个client内部已经建立了与服务端的特定端口号的连接
ExchangeClient exchagneclient = initClient(url);
//将client设置为幽灵Client,作用目前还不明确
client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
/**
* 创建客户端新连接.
*/
private ExchangeClient initClient(URL url) {
//Client类型设置
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
boolean compatible = (version != null && version.startsWith("1.0."));
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO存在严重性能问题,暂时不允许使用
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client ;
try {
//延迟加载
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
client = new LazyConnectExchangeClient(url ,requestHandler);
} else {
//在这里我们又看到这个Exchangers的门面了,这个要十分注意,因为在这之前讲解服务启动的时候有讲过这个,这个跟上次说的门面十分类似。
client = Exchangers.connect(url ,requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url
+ "): " + e.getMessage(), e);
}
return client;
}
上面提到过在包装的过程中,会初始化与服务端的连接,这一步是在NettyClient中完成的,实际上client = Exchangers.connect(url ,requestHandler);最后返回的实际类型也就是nettyClient,在NettyClient的构造函数中直接完成了与provider链接的建立,具体如下:
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException{
//wrapChannelHandler(url, handler)操作即返回了后面提到的MultiMessageHandler
super(url, wrapChannelHandler(url, handler));
}
//super(url, wrapChannelHandler(url, handler))
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
//见面我简化了代码,只写了核心的逻辑
super(url, handler);
doOpen();
connect();
}
//doOpen和connect都是调用子类NettyClient的方法
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// 设置全局的链接配置
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
//nettyHandler持有了NettyClient,NettyClient持有了上面说的多层Handler,最终还是委托到持有的Handler来处理
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
// 初始化或者重新覆盖channel
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
//直接建立与provider的链接,建立链接之后就可以调用channel.write发送信息到provider
ChannelFuture future = bootstrap.connect(getConnectAddress());
try{
//链接建立的超时时间默认为3s
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// 关闭旧的连接,因为一个NettyClient就对应一个链接,所以当创建新的链接的时候就代表要移除旧的链接
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
//完成链接替换
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.getCause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
}finally{
if (! isConnected()) {
future.cancel();
}
}
}
上面的过程就是建立链接的过程了,但是在这个过程中有个重要的地方就是多层Handler的包装,因为装饰者模式的层级比较多,所以我们还是采用一贯的措施:层层分解。
包装的层级依次是:
- requestHandler
- HeaderExchangeHandler 包装了requestHandler
- DecodeHandler 包装了HeaderExchangeHandler
- AllChannelHandler 包装了DecodeHandler
- HeartbeatHandler 包装了AllChannelHandler
- MultiMessageHandler 包装了HeartbeatHandler
最终调用分顺序是又下至上开始调用的。
//MultiMessageHandler
public void received(Channel channel, Object message) throws RemotingException {
//dubbo可以将多个返回信息包装到一个信息里去返回,这时候就就用到MultiMessageHandler了。这么做的目的的是节省传输量
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage)message;
for(Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message);
}
}
//HeartbeatHandler,判断收到的信息类型是否是心跳类型,如果是心跳类型的话就不会触发后面的handler,直接在此结束请求。
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
// oneWay代表不需要返回数据的请求,twoWay代表需要返回数据的请求
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if(logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
//直接结束请求
return;
}
//客户端只要知道服务端活这就可以了,不需要其他操作
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug(
new StringBuilder(32)
.append("Receive heartbeat response in thread ")
.append(Thread.currentThread().getName())
.toString());
}
return;
}
handler.received(channel, message);
}
//AllChannelHandler就是将所有的逻辑都通过线程池来调度,其余的就是直接调用内部handler的received方法
//DecodeHandler 处理编码和解码的逻辑
//HeaderExchangeHandler 主要处理请求的接收,回调(此时不会再对requestHandler进行调用)
在这里我们只是粗略的了解到这些Handler的层级以及各自的关系,但其实在发起请求调用的时候首先进行的是Invoker的invoke调用,这些Handler只是处理请求结果的一些过程逻辑,也就是consumer端收到provider的返回结果之后的处理逻辑。
下面我们要看一下作为consumer是如何把请求发送出去的:
因为我们看到的接口类其实都是代理,所以真正的请求开始也是从代理开始的。
//创建代理的时候指定了处理的InvokerInvocationHandler
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
//InvocationHandler的具体实现,在调用toString,hashCode和equals的时候会直接进行调用,不再产生远程的请求
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler){
this.invoker = handler;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
//invoke返回的结果是Result类型,recreate表示如果有结果的话就返回真正的调用结果,有异常的话就返回异常信息
//我们在外面看到的Invocation信息都源自这里,主要包含了方法和参数信息
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
//抛开集群和mock内容来看的话这里的Invoker可以理解为DubboInvoker,所以我们来看看DubboInvoker的invoke操作:
//首先是进入到AbstractInvoker的invoke操作:
public Result invoke(Invocation inv) throws RpcException {
if(destroyed) {
throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ " is DESTROYED, can not be invoked any more!");
}
RpcInvocation invocation = (RpcInvocation) inv;
//这里的this就是DubboInvoker本身
invocation.setInvoker(this);
//设置attachment参数,设置完之后就可以在服务端取到具体的参数值,很方便服务端和客户端进行通信
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
invocation.addAttachmentsIfAbsent(context);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
//如果是异步的话需要设置一下InvocationId,方便在回调的时候找到对应的数据
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
return doInvoke(invocation);
//将异常信息包装到RpcResult中,至于怎么处理这些异常信息,要看后面的逻辑了,原则上是RpcResult包含了调用的所有返回信息,包括异常信息
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
}
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
//同一个客户端只跟一个服务端建立一个长链接,也可能建立多个,如果是多个长链接的话就默认随机取一个
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
//是否是异步
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
//是否是单工
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
//oneWay形式最简单,发出消息之后就不用管其他的事情了
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
//异步调用的话在发出请求之后后面还要取到结果,后面有机会可以讲一下具体如何取
//本质上无论同步还是异步,在服务端都是异步的,调用方在调用发出之后就收到ResponseFuture对象,只不过异步的操作方式不会马上调用get方法获取结果,同步的方法会立马调用get获取数据
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {//同步非单工
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
}
// 捕捉异常的步骤忽略掉
}
//上面的request就是调用到了HeaderExchangeChannel.request
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
//创建request并设置req的属性,在生成的时候已经在req的内部创建了一个独有的ID
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
//request=RpcInvocation
req.setData(request);
//创建默认的Future
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
//继续调用channel的send,这里的channel其实就是NettyClient了,客户端的调用到这里其实已经结束了。接下来就是等待返回结果
channel.send(req);
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
//feature的ID非常重要,和request,response的ID保持一致,方便从返回的结果Response中找到对应的future
public DefaultFuture(Channel channel, Request request, int timeout){
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 放到全局的future容器中,以后方便根据ID寻找对应的feature
FUTURES.put(id, this);
//道理同上
CHANNELS.put(id, channel);
}
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
基本上到这里客户端调用的逻辑都讲到了,涉及到一些细节方面还没有进行梳理,但是大致上我们已经对于服务的调用有了一个轮廓性的认识,后面会逐篇展开其他细节的内容,希望在所有Dubbo内容都解释完的时候能有一个回归性的总结,把一个调用从开始到结束经过的所有过程都讲一下。