整个Rest协议的引用过程按照 RegistryDirectory#notify => RegistryDirectory#refreshInvoker => RegistryDirectory#toInvokers => RestProtocol#refer => RestProtocol#doRefer的流程执行。
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
// 分解URLS,这里只关注invokerUrls即provider对应的Url地址
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
// configurators
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
this.configurators = toConfigurators(configuratorUrls);
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
List<Configurator> localConfigurators = this.configurators; // local reference
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
// 根据invokerUrls转为Invoker对象
- RegistryDirectory#notify核心操作包括两步:分类Url和转换Url。
- 分类Url是指根据Url的类别属性分为不同类别,这里关心invokerUrls。
- 转换Url是指对invokerUrls进行转换操作,通过refreshInvoker(invokerUrls)实现。
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
if (invokerUrls.isEmpty()) {
// toInvokers(invokerUrls)执行将url转为invoker对象
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
- RegistryDirectory#refreshInvoker核心通过toInvokers()转换invoker对象。
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
// newUrlInvokerMap的key为provider的URL,value为对应的invoker对象
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
if (!accept) {
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
if (enabled) {
// protocol.refer()创建invoker对象
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
} catch (Throwable t) {
// invoker不为null保存到newUrlInvokerMap当中
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
} else {
newUrlInvokerMap.put(key, invoker);
return newUrlInvokerMap;
- RegistryDirectory#toInvokers内部针对providerUrl执行refer()生成invoker对象。
- RegistryDirectory#toInvokers内部针对成功生成invoker对象按照Url和invoker的kv形式进行存储。
- 如果protocol.refer()生成invoker异常,导致newUrlInvokerMap永远不存在Url对应的invoker,那么该Url对应的invoker会被重复创建,但是因为失败永远不会建成功。
- RegistryDirectory#toInvokers内部针对newUrlInvokerMap中不存在的URL才会重新生成invoker。
public abstract class AbstractProxyProtocol extends AbstractProtocol {
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
// 执行 doRefer(type, url)
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
// 生成AbstractInvoker对应的invoker对象
Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
Result result = target.invoke(invocation);
Throwable e = result.getException();
if (e != null) {
for (Class<?> rpcException : rpcExceptions) {
if (rpcException.isAssignableFrom(e.getClass())) {
throw getRpcException(type, url, invocation, e);
return result;
} catch (RpcException e) {
if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
throw e;
} catch (Throwable e) {
throw getRpcException(type, url, invocation, e);
return invoker;
- RestProtocol#refer()中会调用proxyFactory.getInvoker(doRefer())方法生成target对象,调用RestProtocol#doRefer的方法。
- 通过AbstractInvoker类二次封装target返回invoker对象。
public class RestProtocol extends AbstractProxyProtocol {
private final List<ResteasyClient> clients = Collections.synchronizedList(new LinkedList<ResteasyClient>());
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
if (connectionMonitor == null) {
connectionMonitor = new ConnectionMonitor();
// TODO more configs to add
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
// 20 is the default maxTotal of current PoolingClientConnectionManager
connectionManager.setMaxTotal(url.getParameter(Constants.CONNECTIONS_KEY, 20));
connectionManager.setDefaultMaxPerRoute(url.getParameter(Constants.CONNECTIONS_KEY, 20));
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT))
.setSocketTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT))
SocketConfig socketConfig = SocketConfig.custom()
CloseableHttpClient httpClient = HttpClientBuilder.create()
.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() {
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
HeaderElementIterator it = new BasicHeaderElementIterator(response.headerIterator(HTTP.CONN_KEEP_ALIVE));
while (it.hasNext()) {
HeaderElement he = it.nextElement();
String param = he.getName();
String value = he.getValue();
if (value != null && param.equalsIgnoreCase("timeout")) {
return Long.parseLong(value) * 1000;
// TODO constant
return 30 * 1000;
ApacheHttpClient4Engine engine = new ApacheHttpClient4Engine(httpClient/*, localContext*/);
// 创建client并添加到clients
ResteasyClient client = new ResteasyClientBuilder().httpEngine(engine).build();
for (String clazz : Constants.COMMA_SPLIT_PATTERN.split(url.getParameter(Constants.EXTENSION_KEY, ""))) {
if (!StringUtils.isEmpty(clazz)) {
try {
} catch (ClassNotFoundException e) {
throw new RpcException("Error loading JAX-RS extension class: " + clazz.trim(), e);
// TODO protocol
ResteasyWebTarget target = client.target("http://" + url.getHost() + ":" + url.getPort() + "/" + getContextPath(url));
return target.proxy(serviceType);
- RestProtocol对象在Dubbo的上下文中只存在一个实例,所以类中的clients保存了所有的Rest的client对象。
- RestProtocol#doRefer方法内部会创建ResteasyClient的client对象。
- RestProtocol#doRefer方法内部通过client.target()方法返回target对象。
- 如果创建ResteasyClient对象成功但是创建ResteasyWebTarget失败,那么client依然会增加,但是外层的invoker却因为异常导致无法创建成功。
- 上述的异常导致了provider的Url在每次创建invoker对象都会失败进而造成每次该Url重新发布就回走一次创建invoker的过程,最终结果client不停增加而OOM。