1,gateway配置类GatewayAutoConfiguration
- 读取配置文件(PropertiesRouteDefinitionLocator)
- 加载过滤器工厂(XXXGatewayFilterFactory)
- 加载路由断言工厂(XXXRoutePredicateFactory)
- 加载路由信息定位器类(RouteDefinitionRouteLocator)
注:配置中设置断言和过滤器 必须使用首字母大写,eg.
routes:
- id: r01
order: 1
#注:使用ld 注册中心必须是同一分组;引入spring-cloud-starter-loadbalancer 依赖;服务名不能有"_"
uri: lb://consumerService
####断言 匹配规则
predicates:
- Path=/v1/**,/v2/**
- Method=POST
- Query=kk,v2.*
- name: Query
args:
param: k1
regexp: v1.*
filters:
####去掉前缀
- StripPrefix=1
- name: Part
把各个断言工厂类名去掉RoutePredicateFactory后的字符串作为key,类实例为value 保存到list中,在绑定值时通过配置中的断言名称name 去list进行获取相应的断言工厂,同理过滤器工厂也是同样的处理。
- RouteDefinitionRouteLocator.java
public Flux<Route> getRoutes() {
Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions()
.map(this::convertToRoute);
if (!gatewayProperties.isFailOnRouteDefinitionError()) {
// instead of letting error bubble up, continue
routes = routes.onErrorContinue((error, obj) -> {
if (logger.isWarnEnabled()) {
logger.warn("RouteDefinition id " + ((RouteDefinition) obj).getId()
+ " will be ignored. Definition has invalid configs, "
+ error.getMessage());
}
});
}
private AsyncPredicate<ServerWebExchange> lookup(RouteDefinition route,
PredicateDefinition predicate) {
RoutePredicateFactory<Object> factory = this.predicates.get(predicate.getName());
if (factory == null) {
throw new IllegalArgumentException(
"Unable to find RoutePredicateFactory with name "
+ predicate.getName());
}
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition " + route.getId() + " applying "
+ predicate.getArgs() + " to " + predicate.getName());
}
// @formatter:off
Object config = this.configurationService.with(factory)
.name(predicate.getName())
.properties(predicate.getArgs())
.eventFunction((bound, properties) -> new PredicateArgsEvent(
RouteDefinitionRouteLocator.this, route.getId(), properties))
.bind();
// @formatter:on
return factory.applyAsync(config);
}
//加载配置中的过滤器
List<GatewayFilter> loadGatewayFilters(String id,
List<FilterDefinition> filterDefinitions) {
ArrayList<GatewayFilter> ordered = new ArrayList<>(filterDefinitions.size());
for (int i = 0; i < filterDefinitions.size(); i++) {
FilterDefinition definition = filterDefinitions.get(i);
GatewayFilterFactory factory = this.gatewayFilterFactories
.get(definition.getName());
if (factory == null) {
throw new IllegalArgumentException(
"Unable to find GatewayFilterFactory with name "
+ definition.getName());
}
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition " + id + " applying filter "
+ definition.getArgs() + " to " + definition.getName());
}
// @formatter:off
Object configuration = this.configurationService.with(factory)
.name(definition.getName())
.properties(definition.getArgs())
.eventFunction((bound, properties) -> new FilterArgsEvent(
// TODO: why explicit cast needed or java compile fails
RouteDefinitionRouteLocator.this, id, (Map<String, Object>) properties))
.bind();
// @formatter:on
// some filters require routeId
// TODO: is there a better place to apply this?
if (configuration instanceof HasRouteId) {
HasRouteId hasRouteId = (HasRouteId) configuration;
hasRouteId.setRouteId(id);
}
GatewayFilter gatewayFilter = factory.apply(configuration);
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
}
else {
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}
return ordered;
}
2,gateway 源码执行流程
gateway采用的是webFlux的响应式编程:
- 请求分发 DispatcherHandler
- 请求映射 HandlerMapping
- 请求适配 HanderAdaper
- 请求处理 WebHander
DispatcherHandler.java
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
在组装过滤器时会把全局过滤器和局部过滤器组装在一起,其中使用到适配器模式(GatewayFilterAdapter)。
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
// TODO: needed or cached?
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
其中使用负载均衡(uri: lb://consumerService)也是通过过滤器实现的。
LoadBalancerClientFilter.java
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}
final ServiceInstance instance = choose(exchange);
if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}
URI uri = exchange.getRequest().getURI();
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
参照:https://blog.csdn.net/qq_45076180/article/details/117112434