上一节做了一个很简单的示例,微服务通过注册到eureka上,然后网关通过服务发现访问到对应的微服务。本节将简单地对整个gateway请求转发过程做一个简单的分析。
一、核心流程
主要流程:
- Gateway Client向 Spring Cloud Gateway 发送请求
- 请求首先会被HttpWebHandlerAdapter 进行提取组装成网关上下文
- 然后网关的上下文会传递到DispatcherHandler ,它负责将请求分发给 RoutePredicateHandlerMapping
- RoutePredicateHandlerMapping负责路由查找,并根据路由断言判断路由是否可用
- 如果过断言成功,由FilteringWebHandler 创建过滤器链并调用
- 通过特定于请求的 Fliter 链运行请求,Filter 被虚线分隔的原因是Filter可以在发送代理请求之前(pre)和之后(post)运行逻辑
- 执行所有pre过滤器逻辑。然后进行代理请求。发出代理请求后,将运行“post”过滤器逻辑。
- 处理完毕之后将 Response 返回到 Gateway 客户端
二、具体分析
请求过来,会经过HttpWebHandlerAdapter.handle方法,可以理解为这就是请求的主入口
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {if (this.forwardedHeaderTransformer != null) {try {request = this.forwardedHeaderTransformer.apply(request);}catch (Throwable ex) {if (logger.isDebugEnabled()) {logger.debug("Failed to apply forwarded headers to " + formatRequest(request), ex);}response.setStatusCode(HttpStatus.BAD_REQUEST);return response.setComplete();}}//组装上下文ServerWebExchange exchange = createExchange(request, response);LogFormatUtils.traceDebug(logger, traceOn ->exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
//委派给delegate来处理return getDelegate().handle(exchange).doOnSuccess(aVoid -> logResponse(exchange)).onErrorResume(ex -> handleUnresolvedError(exchange, ex)).then(Mono.defer(response::setComplete));
}
这一个delegate是个啥,看一下接口定义:
就是一个处理器,所有参数封装在上下文exchange中
public interface WebHandler {/*** Handle the web server exchange.* @param exchange the current server exchange* @return {@code Mono<Void>} to indicate when request handling is complete*/Mono<Void> handle(ServerWebExchange exchange);}
最终会调到DispatcherHandler
@Override
public Mono<Void> handle(ServerWebExchange exchange) {if (this.handlerMappings == null) {return createNotFoundError();}if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {return handlePreFlight(exchange);}return Flux.fromIterable(this.handlerMappings).concatMap(mapping -> mapping.getHandler(exchange)).next().switchIfEmpty(createNotFoundError()).flatMap(handler -> invokeHandler(exchange, handler)).flatMap(result -> handleResult(exchange, result));
}
handlerMappings是啥
是一个列表,HandlerMapping可以根据当前请求,找到对应的处理器
如果当前请求比如/hello/world,在gateway服务上是一个controller对应的接口,那这个就可以通过RequestMappingHandlerMapping找到一个RequestMappingHandlerAdapter。
如果当前请求,是需要转发给下游微服务的,则找到RoutePredicateHandlerMapping
RoutePredicateHandlerMapping查找路由主要逻辑
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {// don't handle requests on management port if set and different than server portif (this.managementPortType == DIFFERENT && this.managementPort != null&& exchange.getRequest().getLocalAddress() != null&& exchange.getRequest().getLocalAddress().getPort() == this.managementPort) {return Mono.empty();}exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());return lookupRoute(exchange)// .log("route-predicate-handler-mapping", Level.FINER) //name this.flatMap((Function<Route, Mono<?>>) r -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isDebugEnabled()) {logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);}//把找到的路由放到exchange上下文中exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);//返回的handler实际上是webHandlerreturn Mono.just(webHandler);}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isTraceEnabled()) {logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");}})));
}
看下查找路由的具体方式:原来是将所有的路由,用predicate作一下匹配,找出符合当前请求的路由
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {return this.routeLocator.getRoutes()// individually filter routes so that filterWhen error delaying is not a// problem.concatMap(route -> Mono.just(route).filterWhen(r -> {// add the current route we are testingexchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());//用predicate作一下匹配,找出符合当前请求的路由return r.getPredicate().apply(exchange);})// instead of immediately stopping main flux due to error, log and// swallow it.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e)).onErrorResume(e -> Mono.empty()))// .defaultIfEmpty() put a static Route not found// or .switchIfEmpty()// .switchIfEmpty(Mono.<Route>empty().log("noroute")).next()// TODO: error handling.map(route -> {if (logger.isDebugEnabled()) {logger.debug("Route matched: " + route.getId());}validateRoute(route, exchange);return route;});/** TODO: trace logging if (logger.isTraceEnabled()) {* logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }*/
}
routeLocator包含了哪些路由,Debug看一下
可以看到,用了服务注册和发现后,实际上,一个微服务会自动注册一个路由,比如上面的hello-service,自动注册了一个路径为:/hello-service/**的路由。这就是为什么我们yml配置文件中明明什么路由也没有配置,也能自动转发hello-service的请求。
同时,可以看到,这个路由下面有一个ReWritePathFilter,会自动去掉服务名,将请求转发给下游微服务。
下一步再看看FilteringWebHandler中的处理逻辑
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
//从上下文中取出路由,路由中包含filtersRoute route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);List<GatewayFilter> gatewayFilters = route.getFilters();
//spring容器中的Global Filter也取出来List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);combined.addAll(gatewayFilters);// TODO: needed or cached?//做个排序AnnotationAwareOrderComparator.sort(combined);if (logger.isDebugEnabled()) {logger.debug("Sorted gatewayFilterFactories: " + combined);}
//后面就是filter链式调用了return new DefaultGatewayFilterChain(combined).filter(exchange);
}
可以看到代码中主要有两个逻辑:
1、从上下文中取出路由,路由中包含filters
2、spring容器中的Global Filter也取出来
3、合并上面的两种filter,并且排序
4、filters列表组装成责任链来进行调用
可以通过源码,再来看看核心流程的那个图,这样就比较清楚了。
总结了另一个稍微详细一点的图:
个人看这块源码的体会:整个核心流程并不复杂,难点大概是reactor响应式编程,如果之前没接触过这一块,那么看着会有种不知道下一步往哪里的困惑!所以要学习这块源码,还得学习reactor。