RestTemplate简单示例
注册restTemplate
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
使用restTemplate发送GET请求
@RequestMapping("/ask")
public String getHello(@RequestParam(value = "name",required = false) String name) {
return restTemplate.getForEntity("http://SERVICE-NAME/hello?hello={1}",String.class,name).getBody();
}
源码分析
通过@LoadBalanced注解可知该注解用来给restTemplate做标记,使用客户端负载均衡进行配置,搜索LoadBalancerClient类
public interface LoadBalancerClient extends ServiceInstanceChooser {
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
URI reconstructURI(ServiceInstance instance, URI original);
}
ServiceInstanceChooser用来根据serviceId进行选择服务
public interface ServiceInstanceChooser {
ServiceInstance choose(String serviceId);
}
通过以上得知,客户端负载均衡器应具备的几种能力
- 根据传入的服务名,从负载均衡器中选择一个对应服务的实例
- 从选择的负载均衡器中的服务实例来发送请求
- 构建一个合适请求URI
LoadBalancerAutoConfiguration为实现客户端负载均衡器的自动化配置类
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnClass({RestTemplate.class})
@ConditionalOnBean({LoadBalancerClient.class})
@EnableConfigurationProperties({LoadBalancerRetryProperties.class})
public class LoadBalancerAutoConfiguration {
...
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> {
restTemplateCustomizers.ifAvailable((customizers) -> {
Iterator var2 = this.restTemplates.iterator();
while(var2.hasNext()) {
RestTemplate restTemplate = (RestTemplate)var2.next();
Iterator var4 = customizers.iterator();
while(var4.hasNext()) {
RestTemplateCustomizer customizer = (RestTemplateCustomizer)var4.next();
customizer.customize(restTemplate);
}
}
});
};
}
...
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
static class LoadBalancerInterceptorConfig {
LoadBalancerInterceptorConfig() {
}
@Bean
public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return (restTemplate) -> {
List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
}
该配置类需要满足两个条件
- RestTemplate必须在工程目录下
- 必须有LoadBalancerClient类的实例
在示例中没有retryTemplate,则会创建一个LoadBalancerInterceptor对象
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnMissingClass({"org.springframework.retry.support.RetryTemplate"})
static class LoadBalancerInterceptorConfig {
LoadBalancerInterceptorConfig() {
}
@Bean
public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return (restTemplate) -> {
List<ClientHttpRequestInterceptor> list = new ArrayList(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
}
在LoadBalancerInterceptor的构造方法中注入了LoadBalancerClient的实现
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
private LoadBalancerClient loadBalancer;
private LoadBalancerRequestFactory requestFactory;
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}
public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
当一个被@LoadBalance标注的RestTemplate向外发送Http请求时,会被interceptor拦截,调用execute发起实际请求
return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
LoadBalancerClient的具体实现为org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient
execute方法
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
ILoadBalancer loadBalancer = this.getLoadBalancer(serviceId);
Server server = this.getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
} else {
RibbonLoadBalancerClient.RibbonServer ribbonServer = new RibbonLoadBalancerClient.RibbonServer(serviceId, server, this.isSecure(server, serviceId), this.serverIntrospector(serviceId).getMetadata(server));
return this.execute(serviceId, (ServiceInstance)ribbonServer, (LoadBalancerRequest)request);
}
}
首先根据传入的serviceId获取具体的服务实例
根据getServer接口,可知并没有使用choose函数,而是使用了ribbon本身的chooseServer函数
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
return loadBalancer == null ? null : loadBalancer.chooseServer(hint != null ? hint : "default");
}
在ribbonClientConfiguration中可知默认采用了ZoneAwareLoadBalancer实现负载均衡器
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater));
}
回到RibbonBalancerClient的execute函数
通过ZoneAwareLoadBalancer的chooseServer获取负载均衡策略分配到的服务实例后,将其包装成RibbonServer对象,RibbonServer对象除了包含服务实例对象外,还包含serviceId等信息,然后使用该对象回调LoadBalancerInteceptor请求拦截器中LoadBalancerRequest的apply函数,向一个实际的具体服务发起请求
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {
...
try {
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
} catch (IOException var8) {
statsRecorder.recordStats(var8);
throw var8;
} catch (Exception var9) {
statsRecorder.recordStats(var9);
ReflectionUtils.rethrowRuntimeException(var9);
return null;
}
}
}
ServiceInstance对象是对服务的抽象定义,包含了每个服务实例需要提供一些基础信息
public interface ServiceInstance {
default String getInstanceId() {
return null;
}
String getServiceId();
String getHost();
int getPort();
boolean isSecure();
URI getUri();
Map<String, String> getMetadata();
default String getScheme() {
return null;
}
}
而RibbonServer就是ServiceInstance的实现,除了Server实例,还包含了serviceId,是否使用https,和一个map的元数据
public static class RibbonServer implements ServiceInstance {
private final String serviceId;
private final Server server;
private final boolean secure;
private Map<String, String> metadata;
...
}
在LoadBalanceRequest的工厂方法中
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
return (instance) -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
LoadBalancerRequestTransformer transformer;
if (this.transformers != null) {
for(Iterator var6 = this.transformers.iterator(); var6.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) {
transformer = (LoadBalancerRequestTransformer)var6.next();
}
}
return execution.execute((HttpRequest)serviceRequest, body);
};
}
传入了一个ServiceRequestWrapper对象
public class ServiceRequestWrapper extends HttpRequestWrapper {
private final ServiceInstance instance;
private final LoadBalancerClient loadBalancer;
...
public URI getURI() {
URI uri = this.loadBalancer.reconstructURI(this.instance, this.getRequest().getURI());
return uri;
}
}
在LoadBalanceRequest的工厂方法中的execute具体执行时,会调用InteceptingClientHttpRequest下的execute函数
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
return nextInterceptor.intercept(request, body, this);
} else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> {
delegate.getHeaders().addAll(key, value);
});
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;
streamingOutputMessage.setBody((outputStream) -> {
StreamUtils.copy(body, outputStream);
});
} else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
而这里面的request.getUri则会调用ServiceRequestWrapper中LoadBalancerClient的getUri函数,至此构建了一个服务治理环境的访问URI,完成代理访问
负载均衡器
AbstractLoadBalancer
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public AbstractLoadBalancer() {
}
public Server chooseServer() {
return this.chooseServer((Object)null);
}
public abstract List<Server> getServerList(AbstractLoadBalancer.ServerGroup var1);
public abstract LoadBalancerStats getLoadBalancerStats();
public static enum ServerGroup {
ALL,
STATUS_UP,
STATUS_NOT_UP;
private ServerGroup() {
}
}
}
是ILoadBalancer的抽象实现,定义了服务的分组枚举类,实现了chooseServer,参数key为null,表示在选择具体服务实例时忽略key的条件判断
LoadBalancerStats用来记录负载均衡器中哥哥服务实例的属性和统计信息
getServerList根据分组类型来获取不同的服务实例列表
BaseLoadBalancer
是ribbon负载均衡器的基础实现类,在该类中定义了很多关于负载均衡器相关的基础内容
定义了所有服务清单和正常清单
@Monitor(
name = "LoadBalancer_AllServerList",
type = DataSourceType.INFORMATIONAL
)
protected volatile List<Server> allServerList;
@Monitor(
name = "LoadBalancer_UpServerList",
type = DataSourceType.INFORMATIONAL
)
protected volatile List<Server> upServerList;
定义了检查服务实例操作的执行策略对象,默认为SerialPingStrategy
private static final BaseLoadBalancer.SerialPingStrategy DEFAULT_PING_STRATEGY = new BaseLoadBalancer.SerialPingStrategy((SyntheticClass_1)null);
SerialPingStrategy默认采用遍历方式,性能欠佳,如有需要,可以实现IPingStrategy并重写pingServers方法
private static class SerialPingStrategy implements IPingStrategy {
private SerialPingStrategy() {
}
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
BaseLoadBalancer.logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for(int i = 0; i < numCandidates; ++i) {
results[i] = false;
try {
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception var7) {
BaseLoadBalancer.logger.error("Exception while pinging Server: '{}'", servers[i], var7);
}
}
return results;
}
}
定义了负载均衡的处理IRule对象,负载均衡策略委托给IRule,而默认的实现为RoundRobinRule
...
private static final IRule DEFAULT_RULE = new RoundRobinRule();
...
protected IRule rule;
...
public BaseLoadBalancer() {
...
this.rule = DEFAULT_RULE;
...
}
public Server chooseServer(Object key) {
if (this.counter == null) {
this.counter = this.createCounter();
}
this.counter.increment();
if (this.rule == null) {
return null;
} else {
try {
return this.rule.choose(key);
} catch (Exception var3) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", new Object[]{this.name, key, var3});
return null;
}
}
}
启动ping任务
class PingTask extends TimerTask {
PingTask() {
}
public void run() {
try {
(BaseLoadBalancer.this.new Pinger(BaseLoadBalancer.this.pingStrategy)).runPinger();
} catch (Exception var2) {
BaseLoadBalancer.logger.error("LoadBalancer [{}]: Error pinging", BaseLoadBalancer.this.name, var2);
}
}
}
DynamicServerListLoadBalancer
是BaseLoadBalancer的拓展,该负载均衡器提供了运行期间动态更新服务实例清单的能力,在成员定义中可发现
volatile ServerList<T> serverListImpl;
T在类名中限定为Server的子类
public class DynamicServerListLoadBalancer<T extends Server>
ServerList提供了两个抽象方法
public interface ServerList<T extends Server> {
//获取初始化服务实例清单
List<T> getInitialListOfServers();
//获取更新的服务实例清单
List<T> getUpdatedListOfServers();
}
在spring cloud整合ribbon和eureka的包下搜索EurekaRibbonClientConfiguration
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, this.serviceId)) {
return (ServerList)this.propertiesFactory.get(ServerList.class, config, this.serviceId);
} else {
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
}
DiscoveryEnabledNIWSServerList内部获取serverlist通过内部私有函数obtainServersViaDiscovery
public List<DiscoveryEnabledServer> getInitialListOfServers() {
return this.obtainServersViaDiscovery();
}
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
return this.obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList();
if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) {
EurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get();
if (this.vipAddresses != null) { //服务名
String[] var3 = this.vipAddresses.split(",");
int var4 = var3.length;
for(int var5 = 0; var5 < var4; ++var5) {
String vipAddress = var3[var5];
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion);
Iterator var8 = listOfInstanceInfo.iterator();
while(var8.hasNext()) {
InstanceInfo ii = (InstanceInfo)var8.next();
if (ii.getStatus().equals(InstanceStatus.UP)) { //判断状态
if (this.shouldUseOverridePort) {
if (logger.isDebugEnabled()) {
logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort);
}
InstanceInfo copy = new InstanceInfo(ii);
if (this.isSecure) {
ii = (new Builder(copy)).setSecurePort(this.overridePort).build();
} else {
ii = (new Builder(copy)).setPort(this.overridePort).build();
}
}
DiscoveryEnabledServer des = this.createServer(ii, this.isSecure, this.shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size() > 0 && this.prioritizeVipAddressBasedServers) {
break;
}
}
}
return serverList;
} else {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList();
}
}
对服务进行遍历,如果状态为UP,转换成DiscoveryEnabledServer对象,组织成list返回。
DiscoveryEnabledNIWSServerList从eureka中拿到server列表后,继续通过DomainExtractingServerList的setZone进行处理,加入了一些必要属性
private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
List<DiscoveryEnabledServer> result = new ArrayList();
boolean isSecure = this.ribbon.isSecure(true);
boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
Iterator var5 = servers.iterator();
while(var5.hasNext()) {
DiscoveryEnabledServer server = (DiscoveryEnabledServer)var5.next();
result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr, this.approximateZoneFromHostname));
}
return result;
}
更新服务待续,没看明白
ZoneAwareLoadBalancer
ZoneAwareLoadBalancer继承自DynamicServerListLoadBalancer并重写了setServerListForZones
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
if (this.balancers == null) {
this.balancers = new ConcurrentHashMap();
}
Iterator var2 = zoneServersMap.entrySet().iterator();
Entry existingLBEntry;
while(var2.hasNext()) {
existingLBEntry = (Entry)var2.next();
String zone = ((String)existingLBEntry.getKey()).toLowerCase();
this.getLoadBalancer(zone).setServersList((List)existingLBEntry.getValue());
}
var2 = this.balancers.entrySet().iterator();
while(var2.hasNext()) {
existingLBEntry = (Entry)var2.next();
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
((BaseLoadBalancer)existingLBEntry.getValue()).setServersList(Collections.emptyList());
}
}
}
负载均衡策略
- AbstractLoadBalanceRule:抽象负载均衡器
- RandomRule:随机实例
- RoundRobinRule:轮询
- RetryRule:重试,内部默认使用RoundRobinRule
- WeightedResponseTimeRule:权重
- BestAvailableRule:空闲
- PredicateBasedRule:抽象策略
- AvailabilityFilteringRule:PredicateBasedRule的实现,先过滤,后轮询
- ZoneAvoidanceRule:PredicateBasedRule的实现
个性化配置
@Bean
IPing ping(){
return new PingUrl();
}
@Bean
IRule iRule() {
return new ZoneAvoidanceRule();
}