spring_cloud_gateway源码分析

网关启动源码

自动装配AutoConfiguration

使用时加入jar包,spring-cloud-starter-gateway -> pom.xml -> spring-cloud-gateway-server -> spring.factories -> GatewayAutoConfiguration.java

配置信息映射

1
2
3
4
@Bean
public GatewayProperties gatewayProperties() {
return new GatewayProperties();
}

这个类对应着yml文件中的参数。

Locator 定位器

发现各种Locator:

  • PropertiesRouteDefinitionLocator:从Properties中读取
  • InMemoryRouteDefinitionRepository:对RouteDefinition进行增、删、查操作,基于内存存储
  • CompositeRouteDefinitionLocator:组合的Locator,在构造函数中设置委托,将PropertiesRouteDefinitionLocator和InMemoryRouteDefinitionRepository组合。

初始化GlobalFilters【FilteringWebHandler】

1
2
3
4
@Bean
public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
return new FilteringWebHandler(globalFilters);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FilteringWebHandler implements WebHandler {

private final List<GatewayFilter> globalFilters;
//构造函数中设置globalFiltersglobalFilters
public FilteringWebHandler(List<GlobalFilter> globalFilters) {
this.globalFilters = loadFilters(globalFilters);
}
//设置globalFilters
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
return filters.stream().map(filter -> {
GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
if (filter instanceof Ordered) {
int order = ((Ordered) filter).getOrder();
return new OrderedGatewayFilter(gatewayFilter, order);
}
return gatewayFilter;
}).collect(Collectors.toList());
}
}

初始化predicates,gatewayFilters,getRoutes【GatewayAutoConfiguration–>RouteDefinitionRouteLocator】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public class RouteDefinitionRouteLocator
implements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {
//构造函数中初始化
public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,
List<RoutePredicateFactory> predicates,
List<GatewayFilterFactory> gatewayFilterFactories,
GatewayProperties gatewayProperties, ConversionService conversionService) {
this.routeDefinitionLocator = routeDefinitionLocator;
this.conversionService = conversionService;
initFactories(predicates);
gatewayFilterFactories.forEach(
factory -> this.gatewayFilterFactories.put(factory.name(), factory));
this.gatewayProperties = gatewayProperties;
}

//设置predicate工厂
private void initFactories(List<RoutePredicateFactory> predicates) {
predicates.forEach(factory -> {
String key = factory.name();
if (this.predicates.containsKey(key)) {
this.logger.warn("A RoutePredicateFactory named " + key
+ " already exists, class: " + this.predicates.get(key)
+ ". It will be overwritten.");
}
this.predicates.put(key, factory);
if (logger.isInfoEnabled()) {
logger.info("Loaded RoutePredicateFactory [" + key + "]");
}
});
}

public Flux<Route> getRoutes() {
//从RouteDefinitions转换为Route,转换过程在convertToRoute方法中实现
return this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute)
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition matched: " + route.getId());
}
return route;
});
}

//RouteDefinition到Route的转换
private Route convertToRoute(RouteDefinition routeDefinition) {
//从routeDefinition获取predicate
AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);
//从routeDefinition获取gatewayFilters
List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);
//构造Route
return Route.async(routeDefinition).asyncPredicate(predicate)
.replaceFilters(gatewayFilters).build();
}

//获取GatewayFilters
private List<GatewayFilter> getFilters(RouteDefinition routeDefinition) {
List<GatewayFilter> filters = new ArrayList<>();
//如果默认filter不为空,则去加载
if (!this.gatewayProperties.getDefaultFilters().isEmpty()) {
filters.addAll(loadGatewayFilters(DEFAULT_FILTERS,
this.gatewayProperties.getDefaultFilters()));
}
//如果Filter不为空,则
if (!routeDefinition.getFilters().isEmpty()) {
filters.addAll(loadGatewayFilters(routeDefinition.getId(),
routeDefinition.getFilters()));
}

AnnotationAwareOrderComparator.sort(filters);
return filters;
}

@SuppressWarnings("unchecked")
private List<GatewayFilter> loadGatewayFilters(String id,
List<FilterDefinition> filterDefinitions) {
List<GatewayFilter> filters = filterDefinitions.stream().map(definition -> {
//从gatewayFilterFactories中根据key获取factory
GatewayFilterFactory factory = this.gatewayFilterFactories
.get(definition.getName());
if (factory == null) {
throw new IllegalArgumentException(
"Unable to find GatewayFilterFactory with name "
+ definition.getName());
}
//获取definition设置的Filter值
Map<String, String> args = definition.getArgs();
if (logger.isDebugEnabled()) {
logger.debug("RouteDefinition " + id + " applying filter " + args + " to "
+ definition.getName());
}

Map<String, Object> properties = factory.shortcutType().normalize(args,
factory, this.parser, this.beanFactory);
//每一个工厂中都有一个静态内部类Config,目的是存储我们设置的Filter值
Object configuration = factory.newConfig();
//将后几个参数的信息绑定到configuration
ConfigurationUtils.bind(configuration, properties,
factory.shortcutFieldPrefix(), definition.getName(), validator,
conversionService);
//获得GatewayFilter
GatewayFilter gatewayFilter = factory.apply(configuration);
if (this.publisher != null) {
this.publisher.publishEvent(new FilterArgsEvent(this, id, properties));
}
return gatewayFilter;
}).collect(Collectors.toList());

ArrayList<GatewayFilter> ordered = new ArrayList<>(filters.size());
for (int i = 0; i < filters.size(); i++) {
GatewayFilter gatewayFilter = filters.get(i);
if (gatewayFilter instanceof Ordered) {
ordered.add(gatewayFilter);
}
else {
ordered.add(new OrderedGatewayFilter(gatewayFilter, i + 1));
}
}

return ordered;
}
}

请求处理源码

gateway请求源码流程

①ReactorHttpHandlerAdapter#apply方法是请求到网关执行的入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
try {
//获取请求的request和response
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);

if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
//给到HttpWebHandlerAdapter执行构建
return this.httpHandler.handle(request, response)
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
}
catch (URISyntaxException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to get request URI: " + ex.getMessage());
}
reactorResponse.status(HttpResponseStatus.BAD_REQUEST);
return Mono.empty();
}
}
}

②HttpWebHandlerAdapter#handle构建网关上下文ServerWebExchange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler {
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {

if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
//根据请求的request、response构建网关上下文
ServerWebExchange exchange = createExchange(request, response);

LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));

return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
}

③DispatcherHandler用于Http请求处理器/控制器的中央分发处理器,把请求分发给已经注册的处理程序处理,DispatcherHandler遍历Mapping获取对应的handler,网关一共有6个handlerMapping【此处会找到RoutePredicateHandlerMapping,通过RoutePredicateHandlerMapping获取FilteringWebHandler,通过FilteringWebHandler获取】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class DispatcherHandler implements WebHandler, ApplicationContextAware {
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
//遍历mapping获取handler
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {

private final FilteringWebHandler webHandler;

private final RouteLocator routeLocator;

private final Integer managementPort;

private final ManagementPortType managementPortType;
//网关启动时进行了初始化
public RoutePredicateHandlerMapping(FilteringWebHandler webHandler,
RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties,
Environment environment) {
this.webHandler = webHandler;
this.routeLocator = routeLocator;

this.managementPort = getPortProperty(environment, "management.server.");
this.managementPortType = getManagementPortType(environment);
setOrder(1);
setCorsConfigurations(globalCorsProperties.getCorsConfigurations());
}

protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug(
"Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}

exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
//返回FilteringWebHandler
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for ["
+ getExchangeDesc(exchange) + "]");
}
})));
}
}

④ RoutePredicateHandlerMapping#lookupRoute匹配路由,根据routeLocator获取我们在配置我文件中配置的Route,和当前请求的路由做匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
//routeLocator获取我们在配置我文件中配置的Route
return this.routeLocator.getRoutes()
.concatMap(route -> Mono.just(route).filterWhen(r -> {
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
//当前请求的路由做匹配
return r.getPredicate().apply(exchange);
})
.doOnError(e -> logger.error(
"Error applying predicate for route: " + route.getId(),
e))
.onErrorResume(e -> Mono.empty()))
.next()
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
}
}

⑤FilteringWebHandler创建过滤器链,执行过滤器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class FilteringWebHandler implements WebHandler {
//创建过滤器链
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();

List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
AnnotationAwareOrderComparator.sort(combined);

if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}

return new DefaultGatewayFilterChain(combined).filter(exchange);
}

private static class DefaultGatewayFilterChain implements GatewayFilterChain {
//调用过滤器
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
//执行调用
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
}
}

限流和熔断

限流和熔断都是加入相应的过滤器实现的。

ribbon

LoadBalancerClientFilter

请求的filter是哪个

NettyRoutingFilter

总结

spring cloud gateway 是由netty + webflux + filter组成

Filter :Filter是一个Servlet规范组件;一个请求可以在Http请求到达Servlet前被一个或多个Filter处理,Servlet处理完后返回给Filter,最后返回给用户。

WebFlux:它是一个异步非阻塞式的web框架,它的作用不是提升接口请求时间,而是在一些阻塞的场景【例如请求DB,等待DB响应数据、打开大文件等】,可以把线程给其它请求使用,从而提升系统吞吐量。Gateway属于网络IO密集型【网关转发请求到下游服务】,通过WebFlux有效的提升网关转发的吞吐量。


spring_cloud_gateway源码分析
http://hanqichuan.com/2022/06/28/spring_cloud/spring_cloud_gateway源码分析/
作者
韩启川
发布于
2022年6月28日
许可协议