网关启动源码 自动装配AutoConfiguration 使用时加入jar包,spring-cloud-starter-gateway -> pom.xml -> spring-cloud-gateway-server -> spring.factories -> GatewayAutoConfiguration.java
配置信息映射 1 2 3 4 @Bean public GatewayProperties gatewayProperties () { return new GatewayProperties (); }
这个类对应着yml文件中的参数。
Locator 定位器 发现各种Locator:
PropertiesRouteDefinitionLocator:从Properties中读取
InMemoryRouteDefinitionRepository:对RouteDefinition进行增、删、查操作,基于内存存储
CompositeRouteDefinitionLocator:组合的Locator,在构造函数中设置委托,将PropertiesRouteDefinitionLocator和InMemoryRouteDefinitionRepository组合。
初始化GlobalFilters【FilteringWebHandler】 1 2 3 4 @Bean public FilteringWebHandler filteringWebHandler (List<GlobalFilter> globalFilters) { return new FilteringWebHandler (globalFilters); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class FilteringWebHandler implements WebHandler { private final List<GatewayFilter> globalFilters; public FilteringWebHandler (List<GlobalFilter> globalFilters) { this .globalFilters = loadFilters(globalFilters); } private static List<GatewayFilter> loadFilters (List<GlobalFilter> filters) { return filters.stream().map(filter -> { GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter (filter); if (filter instanceof Ordered) { int order = ((Ordered) filter).getOrder(); return new OrderedGatewayFilter (gatewayFilter, order); } return gatewayFilter; }).collect(Collectors.toList()); } }
初始化predicates,gatewayFilters,getRoutes【GatewayAutoConfiguration–>RouteDefinitionRouteLocator】 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 public class RouteDefinitionRouteLocator implements RouteLocator , BeanFactoryAware, ApplicationEventPublisherAware { public RouteDefinitionRouteLocator (RouteDefinitionLocator routeDefinitionLocator, List<RoutePredicateFactory> predicates, List<GatewayFilterFactory> gatewayFilterFactories, GatewayProperties gatewayProperties, ConversionService conversionService) { this .routeDefinitionLocator = routeDefinitionLocator; this .conversionService = conversionService; initFactories(predicates); gatewayFilterFactories.forEach( factory -> this .gatewayFilterFactories.put(factory.name(), factory)); this .gatewayProperties = gatewayProperties; } private void initFactories (List<RoutePredicateFactory> predicates) { predicates.forEach(factory -> { String key = factory.name(); if (this .predicates.containsKey(key)) { this .logger.warn("A RoutePredicateFactory named " + key + " already exists, class: " + this .predicates.get(key) + ". It will be overwritten." ); } this .predicates.put(key, factory); if (logger.isInfoEnabled()) { logger.info("Loaded RoutePredicateFactory [" + key + "]" ); } }); } public Flux<Route> getRoutes () { return this .routeDefinitionLocator.getRouteDefinitions().map(this ::convertToRoute) .map(route -> { if (logger.isDebugEnabled()) { logger.debug("RouteDefinition matched: " + route.getId()); } return route; }); } private Route convertToRoute (RouteDefinition routeDefinition) { AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition); List<GatewayFilter> gatewayFilters = getFilters(routeDefinition); return Route.async(routeDefinition).asyncPredicate(predicate) .replaceFilters(gatewayFilters).build(); } private List<GatewayFilter> getFilters (RouteDefinition routeDefinition) { List<GatewayFilter> filters = new ArrayList <>(); if (!this .gatewayProperties.getDefaultFilters().isEmpty()) { filters.addAll(loadGatewayFilters(DEFAULT_FILTERS, this .gatewayProperties.getDefaultFilters())); } if (!routeDefinition.getFilters().isEmpty()) { filters.addAll(loadGatewayFilters(routeDefinition.getId(), routeDefinition.getFilters())); } AnnotationAwareOrderComparator.sort(filters); return filters; } @SuppressWarnings("unchecked") private List<GatewayFilter> loadGatewayFilters (String id, List<FilterDefinition> filterDefinitions) { List<GatewayFilter> filters = filterDefinitions.stream().map(definition -> { GatewayFilterFactory factory = this .gatewayFilterFactories .get(definition.getName()); if (factory == null ) { throw new IllegalArgumentException ( "Unable to find GatewayFilterFactory with name " + definition.getName()); } Map<String, String> args = definition.getArgs(); if (logger.isDebugEnabled()) { logger.debug("RouteDefinition " + id + " applying filter " + args + " to " + definition.getName()); } Map<String, Object> properties = factory.shortcutType().normalize(args, factory, this .parser, this .beanFactory); Object configuration = factory.newConfig(); ConfigurationUtils.bind(configuration, properties, factory.shortcutFieldPrefix(), definition.getName(), validator, conversionService); GatewayFilter gatewayFilter = factory.apply(configuration); if (this .publisher != null ) { this .publisher.publishEvent(new FilterArgsEvent (this , id, properties)); } return gatewayFilter; }).collect(Collectors.toList()); ArrayList<GatewayFilter> ordered = new ArrayList <>(filters.size()); for (int i = 0 ; i < filters.size(); i++) { GatewayFilter gatewayFilter = filters.get(i); if (gatewayFilter instanceof Ordered) { ordered.add(gatewayFilter); } else { ordered.add(new OrderedGatewayFilter (gatewayFilter, i + 1 )); } } return ordered; } }
请求处理源码
①ReactorHttpHandlerAdapter#apply方法是请求到网关执行的入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class ReactorHttpHandlerAdapter implements BiFunction <HttpServerRequest, HttpServerResponse, Mono<Void>> { public Mono<Void> apply (HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) { NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory (reactorResponse.alloc()); try { ReactorServerHttpRequest request = new ReactorServerHttpRequest (reactorRequest, bufferFactory); ServerHttpResponse response = new ReactorServerHttpResponse (reactorResponse, bufferFactory); if (request.getMethod() == HttpMethod.HEAD) { response = new HttpHeadResponseDecorator (response); } return this .httpHandler.handle(request, response) .doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage())) .doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed" )); } catch (URISyntaxException ex) { if (logger.isDebugEnabled()) { logger.debug("Failed to get request URI: " + ex.getMessage()); } reactorResponse.status(HttpResponseStatus.BAD_REQUEST); return Mono.empty(); } } }
②HttpWebHandlerAdapter#handle构建网关上下文ServerWebExchange
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHandler { public Mono<Void> handle (ServerHttpRequest request, ServerHttpResponse response) { if (this .forwardedHeaderTransformer != null ) { request = this .forwardedHeaderTransformer.apply(request); } ServerWebExchange exchange = createExchange(request, response); LogFormatUtils.traceDebug(logger, traceOn -> exchange.getLogPrefix() + formatRequest(exchange.getRequest()) + (traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : "" )); return getDelegate().handle(exchange) .doOnSuccess(aVoid -> logResponse(exchange)) .onErrorResume(ex -> handleUnresolvedError(exchange, ex)) .then(Mono.defer(response::setComplete)); } }
③DispatcherHandler用于Http请求处理器/控制器的中央分发处理器,把请求分发给已经注册的处理程序处理,DispatcherHandler遍历Mapping获取对应的handler,网关一共有6个handlerMapping【此处会找到RoutePredicateHandlerMapping,通过RoutePredicateHandlerMapping获取FilteringWebHandler,通过FilteringWebHandler获取】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class DispatcherHandler implements WebHandler , ApplicationContextAware { public Mono<Void> handle (ServerWebExchange exchange) { if (this .handlerMappings == null ) { return createNotFoundError(); } return Flux.fromIterable(this .handlerMappings) .concatMap(mapping -> mapping.getHandler(exchange)) .next() .switchIfEmpty(createNotFoundError()) .flatMap(handler -> invokeHandler(exchange, handler)) .flatMap(result -> handleResult(exchange, result)); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class RoutePredicateHandlerMapping extends AbstractHandlerMapping { private final FilteringWebHandler webHandler; private final RouteLocator routeLocator; private final Integer managementPort; private final ManagementPortType managementPortType; public RoutePredicateHandlerMapping (FilteringWebHandler webHandler, RouteLocator routeLocator, GlobalCorsProperties globalCorsProperties, Environment environment) { this .webHandler = webHandler; this .routeLocator = routeLocator; this .managementPort = getPortProperty(environment, "management.server." ); this .managementPortType = getManagementPortType(environment); setOrder(1 ); setCorsConfigurations(globalCorsProperties.getCorsConfigurations()); } protected Mono<?> getHandlerInternal(ServerWebExchange exchange) { if (this .managementPortType == DIFFERENT && this .managementPort != null && exchange.getRequest().getURI().getPort() == this .managementPort) { return Mono.empty(); } exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName()); return lookupRoute(exchange) .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isDebugEnabled()) { logger.debug( "Mapping [" + getExchangeDesc(exchange) + "] to " + r); } exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); return Mono.just(webHandler); }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]" ); } }))); } }
④ RoutePredicateHandlerMapping#lookupRoute匹配路由,根据routeLocator获取我们在配置我文件中配置的Route,和当前请求的路由做匹配
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class RoutePredicateHandlerMapping extends AbstractHandlerMapping { protected Mono<Route> lookupRoute (ServerWebExchange exchange) { return this .routeLocator.getRoutes() .concatMap(route -> Mono.just(route).filterWhen(r -> { exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); return r.getPredicate().apply(exchange); }) .doOnError(e -> logger.error( "Error applying predicate for route: " + route.getId(), e)) .onErrorResume(e -> Mono.empty())) .next() .map(route -> { if (logger.isDebugEnabled()) { logger.debug("Route matched: " + route.getId()); } validateRoute(route, exchange); return route; }); } }
⑤FilteringWebHandler创建过滤器链,执行过滤器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class FilteringWebHandler implements WebHandler { public Mono<Void> handle (ServerWebExchange exchange) { Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); List<GatewayFilter> gatewayFilters = route.getFilters(); List<GatewayFilter> combined = new ArrayList <>(this .globalFilters); combined.addAll(gatewayFilters); AnnotationAwareOrderComparator.sort(combined); if (logger.isDebugEnabled()) { logger.debug("Sorted gatewayFilterFactories: " + combined); } return new DefaultGatewayFilterChain (combined).filter(exchange); } private static class DefaultGatewayFilterChain implements GatewayFilterChain { public Mono<Void> filter (ServerWebExchange exchange) { return Mono.defer(() -> { if (this .index < filters.size()) { GatewayFilter filter = filters.get(this .index); DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain (this , this .index + 1 ); return filter.filter(exchange, chain); } else { return Mono.empty(); } }); } } }
限流和熔断 限流和熔断都是加入相应的过滤器实现的。
ribbon LoadBalancerClientFilter
请求的filter是哪个 NettyRoutingFilter
总结 spring cloud gateway 是由netty + webflux + filter组成
Filter :Filter是一个Servlet规范组件;一个请求可以在Http请求到达Servlet前被一个或多个Filter处理,Servlet处理完后返回给Filter,最后返回给用户。
WebFlux:它是一个异步非阻塞式的web框架,它的作用不是提升接口请求时间,而是在一些阻塞的场景【例如请求DB,等待DB响应数据、打开大文件等】,可以把线程给其它请求使用,从而提升系统吞吐量。Gateway属于网络IO密集型【网关转发请求到下游服务】,通过WebFlux有效的提升网关转发的吞吐量。