说明:本文以以下配置进行服务发布流程分析:
注册中心:zookeeper;
发布协议:dobbo
1、服务发布流程解析
服务发布流程图如下:
(1)、应用使用<dubbo:service>配置
<!-- 声明需要暴露的服务接口 -->
<dubbo:service id="dubboTestApi" interface="dubbo.demo.api.DubboTestApi" ref="dubboTestApiImpl" cache="false" />
(2)、DubboNamespaceHandler
模块:dubbo-config -> dubbo-config-spring
包:org.apache.dubbo.config.spring.schema
主要方法:init()
主要源码:
@Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
本处主要定义dubbo的自定义标签的解析处理。
(3)、ServiceBean.export()
模块:dubbo-config -> dubbo-config-spring
包:org.apache.dubbo.config.spring.schema
主要方法:export()
主要源码:
public synchronized void export() {
checkAndUpdateSubConfigs();
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
if (delay != null && delay > 0) {
delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
主要流程:
- 获取provider的export属性及delay属性;
- 若export=false表示不对当前服务进行暴露,则直接返回;
- 若delay有合理值,表示需要延迟暴露,则将暴露操作放到任务队列中;否则立即调用doExport()进行暴露处理。
(4)、ServiceBean.doExport()
模块:dubbo-config -> dubbo-config-spring
包:org.apache.dubbo.config.spring.schema
主要方法:doExport()
主要源码:
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
doExportUrls();
}
(5)、ServiceBean.doExportUrls()
模块:dubbo-config -> dubbo-config-spring
包:org.apache.dubbo.config.spring.schema
主要方法:doExportUrls()
主要源码:
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
此处主要是根据根据配置的协议及注册中心进行分别暴露;
(6)、ServiceBean.doExportUrlsFor1Protocol()
模块:dubbo-config -> dubbo-config-spring
包:org.apache.dubbo.config.spring.schema
主要方法:doExportUrlsFor1Protocol()
主要源码:
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
主要处理流程:
- 通过proxyFactory生成实现的代理类,proxyFactory实现为:JavassistProxyFactory;
- 通过protocol对代理类进行暴露处理,protocol实现为:RegistryProtocol。具体为何为RegistryProtocol实现类请参照:
(7)、JavassistProxyFactory.getInvoker()
模块:dubbo-rpc-> dubbo-rpc-api
包:org.apache.dubbo.rpc.proxy.javassist
主要方法:getInvoker()
主要源码:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
处理流程:
- 通过Wrapper.getWrapper()动态生成接口的实现类;
- 创建代理的invoker实现;
(8)、Wrapper.getWrapper()
模块:dubbo-common
包:org.apache.dubbo.common.bytecode
主要方法:getWrapper()
主要源码:
public static Wrapper getWrapper(Class<?> c) {
while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
{
c = c.getSuperclass();
}
if (c == Object.class) {
return OBJECT_WRAPPER;
}
Wrapper ret = WRAPPER_MAP.get(c);
if (ret == null) {
ret = makeWrapper(c);
WRAPPER_MAP.put(c, ret);
}
return ret;
}
主要实现是通过makeWrapper()动态生成接口的代理类,makeWrapper()主要是通过动态拼接编译实现。主要实现方法为invokeMethod(),此方法根据调用的具体的实现、方法、参数类型、参数等调用其具体的实现类。
(9)、RegistryProtocol.export()
模块:dubbo-registry->dubbo-registry-api
包:org.apache.dubbo.registry.integration
主要方法:export()
主要源码:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
主要处理流程:
- getRegistryUrl()获取注册中心的url,以“zeekeeper://xxx.xxx.xx.xxx:2181/”开头,后面跟随接口参数等信息;
- getProviderUrl()获取服务提供者的url,以“dubbo://xxx.xxx.xxx.xxx:20880/"开头,后面跟随接口参数等信息;
- getSubscribedOverrideUrl()获取需订阅的url;
- doLocalExport()主要进行本地暴露处理;
- getRegistry()获取注册中心;
- register为true表示配置需要进行注册,则调用register()将接口信息注册到注册中心;
- subscribe()进行接口信息订阅;
注:此处为何调用的是RegistryProtocol请参照://www.greatytc.com/p/336310167bcb
(10)、RegistryProtocol.doLocalExport()
模块:dubbo-registry->dubbo-registry-api
包:org.apache.dubbo.registry.integration
主要方法:doLocalExport()
主要源码:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
此处主要是调用DubboProtocol.export()进行接口暴露。
注:此处为何调用的是DubboProtocol,请参考://www.greatytc.com/p/336310167bcb
(11)、DubboProtocol.export()
模块:dubbo-rpc->dubbo-rpc-dubbo
包:org.apache.dubbo.rpc.protocol.dubbo
主要方法:doLocalExport()
主要源码:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
主要调用openServer()进行本地服务暴露。
(12)、DubboProtocol.openServer()
模块:dubbo-rpc->dubbo-rpc-dubbo
包:org.apache.dubbo.rpc.protocol.dubbo
主要方法:openServer()
主要源码:
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
主要是通过createServer创建并初始化ExchangeServer对象。
(13)、DubboProtocol.createServer()
模块:dubbo-rpc->dubbo-rpc-dubbo
包:org.apache.dubbo.rpc.protocol.dubbo
主要方法:createServer()
主要源码:
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
主要是通过Exchangers.bind()生成服务端的ExchangeServer。
(14)、Exchangers.bind()
模块:dubbo-remoting->dubbo-remoting-api
包:org.apache.dubbo.remoting.exchange
主要方法:bind()
主要源码:
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
getExchanger通过spi动态加载HeaderExchanger类型的Exchanger,故bind()实际调用的是HeaderExchanger.bind()。
(15)、HeaderExchanger.bind()
模块:dubbo-remoting->dubbo-remoting-api
包:org.apache.dubbo.remoting.exchange.support.header
主要方法:bind()
主要源码:
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
此处创建HeaderExchangeServer类型ExchangeServer,实际bind()操作由Transporters.bind()实现,解码器为DecodeHandler,解码器为包装handler的HeaderExchangeHandler类;hander的实现为DubboProtocol中的requestHandler对象。
(16)、Transporters.bind()
模块:dubbo-remoting->dubbo-remoting-api
包:org.apache.dubbo.remoting
主要方法:bind()
主要源码:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
此处最终调用的是NettyTransporter.bind()对底层连接进程处理的。
(17)、RegistryProtocol.getRegistry()
模块:dubbo-registry->dubbo-registry-api
包:org.apache.dubbo.registry
主要方法:getRegistry()
主要源码:
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
此处主要是获取注册中心的url,通过RegistryFactory.getRegistry()获取注册中心接口的实现。
(18)、ZookeeperRegistryFactory.getRegistry()
模块:dubbo-registry->dubbo-registry-api
包:org.apache.dubbo.registry.support
主要方法:getRegistry()
主要源码:
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
此处主要是通过createRegistry()创建实际的Registry,即ZookeeperRegistry。
(19)、ZookeeperRegistryFactory.getRegistry()
模块:dubbo-registry->dubbo-registry-zookeeper
包:org.apache.dubbo.registry.zookeeper
主要方法:createRegistry()
主要源码:
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
此处通过工厂模式创建实际的Registry类ZookeeperRegistry。
(20)、RegistryProtocol.register()
模块:dubbo-registry->dubbo-registry-api
包:org.apache.dubbo.registry
主要方法:register()
主要源码:
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
此处主要是获取注册中心的实现,并将服务提供者注册到注册中心。
(21)、ZookeeperRegistry.register()
模块:dubbo-registry->dubbo-registry-zookeeper
包:org.apache.dubbo.registry.zookeeper
主要方法:register()
主要源码:
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
处理流程:
- 将服务提供者的url放入注册队列;
- 从注册失败队列及注销失败队列中移除本url;
- 调用doRegistery()进行实际的注册中心注册操作;
- 若注册失败则将url放入注册失败队列中,等待一定延迟后继续注册;
(22)、ZookeeperRegistry.doRegister()
模块:dubbo-registry->dubbo-registry-zookeeper
包:org.apache.dubbo.registry.zookeeper
主要方法:doRegister()
主要源码:
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
本处主要是调用ZookeeperClient.create()在zookeeper集群中创建一个节点;
(23)、CuratorZookeeperClient.create()
模块:dubbo-remoting->dubbo-remoting-zookeeper
包:org.apache.dubbo.remoting.zookeeper.curator
主要方法:create()
主要源码:
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
}
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
本处主要是在zookeeper集群上创建目录及节点。
(24)、ZookeeperRegistry.subscribe()
模块:dubbo-registry->dubbo-registry-api
包:org.apache.dubbo.registry.zookeeper
主要方法:subscribe()
主要源码:
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
主要处理流程:
- 将服务提供者的url及监听器放入订阅队列中;
- 从订阅失败队列中移除本url;
- 创建对当前url的监听器;
- 若订阅失败,则将当前url及监听器添加到订阅失败队列中;
2、服务端对客户端请求的处理流程
2.1、传输层相关的处理分析
(1)、Netty服务端创建
以下源码为NettyServer中打开服务端网络监听的实现:
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
.getExtension(codecName));
}
}
由以上代码可知:
- 编解码器为getCodec()函数获取的,而实际是通过getChannelCodec()获取的编解码器实现;在getChannelCodec中根据SPI扩展获取暴露协议对应的编解码器,本处为获取dubbo协议对应的编解码器;其具体实现类为:DubboCodec;
- nettyServerHandler为服务端的实际业务的handler,其只是NettyServer相应接口的简单包装,而NettyServer这个handler又是其构造函数中传过来的handler的简单封装,而这个handler的实现请看以下代码。
注:在此主要对流程进行分析,故不对DubboCodec的具体实现细节做过多分析。
(2)、Exchanger层服务的创建:
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
由以上可知,NettyServer中的handler实现为DecodeHandler,而DecodeHandler是HeaderExchangeHandler的简单封装,HeaderExchangeHandler主要根据请求类型进行处理,其实际请求的处理是交由handler进行处理的,而此处的handler实现为DubboProtocol中的requestHandler对象,requestHandler的实现类为
ExchangeHandlerAdapter的匿名子类。
2.2、服务端请求处理流程
以下即为请求处理的主要调用流程:
(1)、DubboCodec.decode()
模块:dubbo-rpc->dubbo-rpc-dubbo
包:org.apache.dubbo.rpc.protocol.dubbo
主要方法:decode()
主要源码:
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// check length.
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// get data length.
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
此处主要是对请求数据进行基本的解码处理,然后将消息交由decodeBody()进行进一步的解码处理。
(2)、DubboCodec.decodeBody()
模块:dubbo-rpc->dubbo-rpc-dubbo
包:org.apache.dubbo.rpc.protocol.dubbo
主要方法:decodeBody()
主要源码:
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// get status.
byte status = header[3];
res.setStatus(status);
try {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (res.isEvent()) {
data = decodeEventData(channel, in);
} else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
res.setResult(data);
} else {
res.setErrorMessage(in.readUTF());
}
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
// decode request.
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(true);
}
try {
Object data;
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, in);
} else if (req.isEvent()) {
data = decodeEventData(channel, in);
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}
此处主要是将消息解码成DecodeableRpcInvocation对象,并交由业务层的具体的业务handler进行处理。
(3)、DecodeHandler.received()
模块:dubbo-remoting->dubbo-remoting-api
包:org.apache.dubbo.remoting.transport
主要方法:received()
主要源码:
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
此处主要调用handler.received()进行处理,实际调用的是HeaderExchangeHandler.received()进程处理。
(4)、HeaderExchangeHandler.received()
模块:dubbo-remoting->dubbo-remoting-api
包:org.apache.dubbo.remoting.exchange.support.header
主要方法:received()
主要源码:
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
主要处理流程:
- 若消息类型是Request,若是事件消息,则进行事件处理;若是需要应答的消息,则调用handleRequest()进行进一步处理;若是无需应答的消息,则直接调用实际handler.received()进行实际的消息处理;
- 若未应答消息,则调用handleResponse()进行应答处理;
- 若为String类型的消息,则表示通过telnet进行通信,调用handler.telnet()对消息进行处理;
(5)、DecodeHandler.handleRequest()
模块:dubbo-remoting->dubbo-remoting-api
包:org.apache.dubbo.remoting.exchange.support.header
主要方法:handleRequest()
主要源码:
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
CompletableFuture<Object> future = handler.reply(channel, msg);
if (future.isDone()) {
res.setStatus(Response.OK);
res.setResult(future.get());
channel.send(res);
return;
}
future.whenComplete((result, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(result);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
此处主要是调用handler.reply()处理消息,获得返回结果,将结果返回给客户端;
(6)、DubboProtocol.requestHandler.reply()
模块:dubbo-rpc->dubbo-rpc-dubbo
包:org.apache.dubbo.rpc.protocol.dubbo
主要方法:reply()
主要源码:
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext rpcContext = RpcContext.getContext();
boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false);
if (supportServerAsync) {
CompletableFuture<Object> future = new CompletableFuture<>();
rpcContext.setAsyncContext(new AsyncContextImpl(future));
}
rpcContext.setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
if (result instanceof AsyncRpcResult) {
return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
} else {
return CompletableFuture.completedFuture(result);
}
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
处理流程:
- getInvoker():通调用接口信息获取对应的代理实现类invoke;
- 调用invoker.invoke()实现对具体接口实现类的调用并获取返回数据;
(7)、DubboProtocol.requestHandler.getInvoke()
模块:dubbo-rpc->dubbo-rpc-dubbo
包:org.apache.dubbo.rpc.protocol.dubbo
主要方法:getInvoke()
主要源码:
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);
// if it's callback service on client side
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " +
exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
}
return exporter.getInvoker();
}
此处主要是通过接口信息,从exporterMap中获取对应的DubboExporter,此处的exporter即为服务提供者暴露流程生成的exporter;
(8)、AbstractProxyInvoker.invoke()
模块:dubbo-rpc->dubbo-rpc-apr
包:org.apache.dubbo.rpc.proxy
主要方法:invoke()
主要源码:
public Result invoke(Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
try {
Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
if (RpcUtils.isReturnTypeFuture(invocation)) {
return new AsyncRpcResult((CompletableFuture<Object>) obj);
} else if (rpcContext.isAsyncStarted()) { // ignore obj in case of RpcContext.startAsync()? always rely on user to write back.
return new AsyncRpcResult(rpcContext.getAsyncContext().getInternalFuture());
} else {
return new RpcResult(obj);
}
} catch (InvocationTargetException e) {
// TODO async throw exception before async thread write back, should stop asyncContext
if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) {
logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
}
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
此处主要调用子类的doInvoke()进行实际的接口调用处理;
(9)、AbstractProxyInvoker.doInvoke()
模块:dubbo-rpc->dubbo-rpc-apr
包:org.apache.dubbo.rpc.proxy
主要方法:doInvoke()
主要源码:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
此处是服务暴露处理过程中生成动态代理的地方,具体为JavassistProxyFactory类中;此处创建了一个AbstractProxyInvoker的匿名,此类重写了doInvoke()方法,在方法内调用代理类的invokeMethod()方法对具体调用进行处理。上一步中调用的doInvoke()即为本处匿名类实现的doInvoke()。
(10)、接口的Wrapper类
模块:dubbo-rpc->dubbo-rpc-apr
包:org.apache.dubbo.rpc.proxy
主要方法:invokeMethod()
主要源码:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
通过Wrapper.getWrapper()动态编译生成的接口的代理类;其invokeMethod()具体实现是,通过方法名称的对比来调用实际的实现类相应的方法;
(11)调用接口的具体实现类
以下即为动态生成的invokeMethod的demo,接口名称为:DubboTestApi,接口只有一个方法:echoTest;通过以下动态生成的代码可以看出,其实现是根据客户端调用的方法名称调用具体实现类的对应方法。
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
dubbo.demo.api.DubboTestApi w;
try {
w = ((dubbo.demo.api.DubboTestApi) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("echoTest".equals($2) && $3.length == 1) {
return ($w) w.echoTest((java.lang.String) $4[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class dubbo.demo.api.DubboTestApi.");
}