Dubbo客户端发送请求调用栈:
Dubbo服务端接受请求图:
介绍下两个概念:
过滤器链
各类协议protocol类均是由ProtocolFilterWrapper类封装的,ProtocolFilterWrapper在服务的暴露与引用的过程中根据KEY是PROVIDER还是CONSUMER来构建服务提供者与消费者的调用过滤器链。ProtocolFilterWrapper的export和refer方法代码如下:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
构建过滤器链的方法是ProtocolFilterWrapper类中的buildInvokerChain,代码如下:
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
在构建调用链时方法先获取Filter列表,然后创建与Fitler数量一样多Invoker结点,接着将这些结点串联在一起,构成一个链表,最后将这个链的首结点返回,随后的调用中,将从首结点开始,依次调用各个结点,完成调用后沿调用链返回。这里各个Invoker结点的串联是通过与其关联的invoke方法来完成的。
在invoke方法内部,通过调用与该invoker关联的filter中的invoke方法来实现结点的连接。调用时将next传入invoke方法,在调用时首先会调用该结点对应的filter的invoke方法,接着调用传入参数next的invoke方法。Next的invoke方法同样会调用自己所关联的filter的invoke方法,这样就完成了结点的串联。可以看到链表的最后一个结点就是buildInvokerChain 方法的入参invoker。最终buildInvokerChain方法通过链表头插法完成调用链的创建。因此在真正的调用请求处理前会经过若干filter进行预处理。
过滤器链的创建可以看作是职责链模式(Chain of Responsibility Pattern)的一个实现。这样系统中增加一个新的过滤器预处理请求时,无须修改原有系统的代码,只需重新建调用链即可。
监听器链
各类协议protocol类均是由ProtocolListenerWrapper类封装的,在服务的暴露和引用过程中,都是调用该类的export和refer方法,在这些方法中完成监听器链的创建。
一、ExporterListener
ProtocolListenerWrapper是在服务暴露时构建了监听器链,在服务暴露(exporter)的过程中调用监听器所提供的回调函数,Dubbo没有实现监听器,但提供了业务扩展的接口,业务可以自定义实现了ExporterListener接口的类,并在xml的<dubbo:service>标签listener属性中配置监听器的类名即可。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
二、InvokerListener
ProtocolListenerWrapper在服务引用的时候对Invoker构建了监听器链并封装成ListenerInvokerWrapper对象返回,在封装过程中执行业务定义的监听器类的回调函数,Dubbo没有实现监听器,但提供了业务扩展的接口,业务可以自定义实现了InvokerListener接口的类,并在xml的<dubbo:reference>标签listener属性中配置监听器的类名即可。
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}
服务端接收请求的处理流程
在创建NettyServer对象时,初始化了NettyHandler对象,该对象中的ChannlHandler赋值为NettyServer对象。在接受到消息之后,调用NettyHandler.messageReceived方法,后续的调用链为:NettyServer—>MultiMessageHandler-->HeartbeatHandler—> AllChannelHandler->DecodeHandler->HeaderExchangerHandler->DubboProtocol$requestHandler。在上述调用链中创建了Response对象并封装了RpcInvocation对象。
最后在DubboProtocol$requestHandler处理RpcInvocation对象,根据URL地址生成serviceKey,再查找到服务端的Invoker对象,然后经过过滤器链和监听器链(在暴露服务时创建的),最后在JavassistProxyFactory.getInvoker方法创建的AbstractProxyInvoker对象中通过反射的方式完成具体业务层的调用。
具体流程如下:
参考:https://blog.csdn.net/meilong_whpu/article/details/72178566