1.前言
spring cloud gateway的基本组成和作用就不细赘述,此篇适合对此有一定了解的人阅读。
spring cloud gateway版本: Hoxton.SR1
spring cloud gateway的配置使用yml配置:
server:port: 9527y#根据微服务名称进行动态路由的配置
spring:application:name: cloud-gatewaycloud:gateway:discovery:locator:enabled: true #开启从注册中心动态创建路由的功能,利用微服务名称进行路由routes: - id: config-clienturi: lb://config-clientpredicates:- Path=/config/**filters:- RewritePath=/config/?(?<segment>.*),/config/v1/$\{segment}
2. 流程图
先看一张官网文档给的图,此图大概描述了请求的处理原理,各个组件大致的位置。
3.源码剖析
http底层处理是基于netty,netty是一个高性能异步事件驱动的通讯框架,对于netty的处理流程可以查阅其源码。netty读取完数据经过pipeline管道处理后,最终调用到reactor.netty.http.server.HttpServerHandle#onStateChange方法。然后经过层层方法调用到核心类org.springframework.web.reactive.DispatcherHandler#handle
public Mono<Void> handle(ServerWebExchange exchange) {if (this.handlerMappings == null) {//没有合适的handler返回失败return createNotFoundError();}return Flux.fromIterable(this.handlerMappings)//mapping.getHandler是关键方法,根据handlerMapping找到对应的handler.concatMap(mapping -> mapping.getHandler(exchange)).next().switchIfEmpty(createNotFoundError())//invokeHandler是关键方法,调用处理逻辑.flatMap(handler -> invokeHandler(exchange, handler))//处理结果,写出.flatMap(result -> handleResult(exchange, result));}private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {if (this.handlerAdapters != null) {for (HandlerAdapter handlerAdapter : this.handlerAdapters) {if (handlerAdapter.supports(handler)) {//查找合适的handlerAdapter处理,默认会调用到SimpleHandlerAdapter#handlereturn handlerAdapter.handle(exchange, handler);}}}return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));}private Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {return getResultHandler(result).handleResult(exchange, result).checkpoint("Handler " + result.getHandler() + " [DispatcherHandler]").onErrorResume(ex ->result.applyExceptionHandler(ex).flatMap(exResult -> {String text = "Exception handler " + exResult.getHandler() +", error=\"" + ex.getMessage() + "\" [DispatcherHandler]";return getResultHandler(exResult).handleResult(exchange, exResult).checkpoint(text);}));}private HandlerResultHandler getResultHandler(HandlerResult handlerResult) {if (this.resultHandlers != null) {for (HandlerResultHandler resultHandler : this.resultHandlers) {if (resultHandler.supports(handlerResult)) {return resultHandler;}}}throw new IllegalStateException("No HandlerResultHandler for " + handlerResult.getReturnValue());}
handlerMappings的注入类看下图,最后通过RoutePredicateHandlerMapping找到合适的处理类。handlerMappings中的其他几种Mapping方式,是别的策略或者配置时会用到,可以思考是怎么用的。
先来看看mapping.getHandler的处理逻辑,默认会调用到org.springframework.web.reactive.handler.AbstractHandlerMapping#getHandler
@Overridepublic Mono<Object> getHandler(ServerWebExchange exchange) {//getHandlerInternal,根据exchange真正去查找合适的处理handler,根据上面解释,//getHandlerInternal调用到RoutePredicateHandlerMapping类中去return getHandlerInternal(exchange).map(handler -> {if (logger.isDebugEnabled()) {logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);}//跨域处理if (hasCorsConfigurationSource(handler)) {ServerHttpRequest request = exchange.getRequest();CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null);CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);config = (config != null ? config.combine(handlerConfig) : handlerConfig);if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {return REQUEST_HANDLED_HANDLER;}}return handler;});}
现在调用到了org.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping#getHandlerInternal方法中,这里一个关键点就来了。Predicates断言,是路由配置的关键,根据predicates的结果,满足的话就会转发请求到对应的Router配置的uri上。
@Overrideprotected Mono<?> getHandlerInternal(ServerWebExchange exchange) {// don't handle requests on management port if set and different than server portif (this.managementPortType == DIFFERENT && this.managementPort != null&& exchange.getRequest().getURI().getPort() == this.managementPort) {return Mono.empty();}exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());//lookupRoute(exchange)去查找满足断言条件的路由Routerreturn lookupRoute(exchange)// 满足的router会被组装到exchange中,然后返回webHandler.flatMap((Function<Route, Mono<?>>) r -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isDebugEnabled()) {logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);}exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);return Mono.just(webHandler);}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isTraceEnabled()) {logger.trace("No RouteDefinition found for ["+ getExchangeDesc(exchange) + "]");}})));}protected Mono<Route> lookupRoute(ServerWebExchange exchange) {return this.routeLocator.getRoutes()// 遍历所有的Router,此行r.getPredicate().apply(exchange)是验证是否满足断言要求,// 满足的Router会被返回.concatMap(route -> Mono.just(route).filterWhen(r -> {// add the current route we are testingexchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());return r.getPredicate().apply(exchange);})// instead of immediately stopping main flux due to error, log and// swallow it.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(),e)).onErrorResume(e -> Mono.empty()))// .defaultIfEmpty() put a static Route not found// or .switchIfEmpty()// .switchIfEmpty(Mono.<Route>empty().log("noroute")).next()// TODO: error handling.map(route -> {if (logger.isDebugEnabled()) {logger.debug("Route matched: " + route.getId());}validateRoute(route, exchange);return route;});/** TODO: trace logging if (logger.isTraceEnabled()) {* logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }*/}
apply调用进入org.springframework.cloud.gateway.handler.AsyncPredicate.DefaultAsyncPredicate#apply方法,查看delegate.test(t)调用的实现类,可以发现所有断言的调用,此处根据我们配置的断言规则调用对应的断言,返回Boolean。
通过断言拿到对应的handler后回到DispatcherHandler#handle方法接下来调用invokeHandler(exchange, handler)
//这个handler 是 FilteringWehHandler
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {if (this.handlerAdapters != null) {for (HandlerAdapter handlerAdapter : this.handlerAdapters) {if (handlerAdapter.supports(handler)) {//使用SimpleHandlerAdapter来处理return handlerAdapter.handle(exchange, handler);}}}return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));}
然后调用org.springframework.web.reactive.result.SimpleHandlerAdapter#handle
@Overridepublic Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {WebHandler webHandler = (WebHandler) handler;//handler是FilteringWehHandler,所以调用到FilteringWehHandler.handleMono<Void> mono = webHandler.handle(exchange);return mono.then(Mono.empty());}
org.springframework.cloud.gateway.handler.FilteringWebHandler#handle
@Overridepublic Mono<Void> handle(ServerWebExchange exchange) {Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);//取出之前匹配的Router,取出filters,如果配置了的话List<GatewayFilter> gatewayFilters = route.getFilters();//GatewayFilter和globalFilter合并,并按order排序List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);combined.addAll(gatewayFilters);// TODO: needed or cached?AnnotationAwareOrderComparator.sort(combined);if (logger.isDebugEnabled()) {logger.debug("Sorted gatewayFilterFactories: " + combined);}//进入过滤链调用filtersreturn new DefaultGatewayFilterChain(combined).filter(exchange);}@Overridepublic Mono<Void> filter(ServerWebExchange exchange) {return Mono.defer(() -> {if (this.index < filters.size()) {GatewayFilter filter = filters.get(this.index);//index 每次+1,设置到chain中,传递到下一次filter,下一次filter时取就是next的filterDefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,this.index + 1);//filter链执行return filter.filter(exchange, chain);}else {return Mono.empty(); // complete}});}
filter链很重要,是spring cloud gateway的扩展点,可以做扩展逻辑,比如权限校验,登录认证,日志等。默认情况下的filter链如下,需要关注一下LoadBalancerClientFilter和NettyRoutingFilter
org.springframework.cloud.gateway.filter.LoadBalancerClientFilter#filter
@Override@SuppressWarnings("Duplicates")public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);//使用协议 http还是lbString schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);if (url == null|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {return chain.filter(exchange);}// 保存原始请求urladdOriginalRequestUrl(exchange, url);if (log.isTraceEnabled()) {log.trace("LoadBalancerClientFilter url before: " + url);}//根据注册中心的信息,使用负载均衡算法,找一个可用的服务final ServiceInstance instance = choose(exchange);if (instance == null) {throw NotFoundException.create(properties.isUse404(),"Unable to find instance for " + url.getHost());}URI uri = exchange.getRequest().getURI();// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,// if the loadbalancer doesn't provide one.String overrideScheme = instance.isSecure() ? "https" : "http";if (schemePrefix != null) {overrideScheme = url.getScheme();}//替换成真实服务器的地址,后续调用使用URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);if (log.isTraceEnabled()) {log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);}exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);return chain.filter(exchange);}
org.springframework.cloud.gateway.filter.NettyRoutingFilter#filter 处理http和https请求的发送
@Override@SuppressWarnings("Duplicates")public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if (isAlreadyRouted(exchange)|| (!"http".equals(scheme) && !"https".equals(scheme))) {return chain.filter(exchange);}setAlreadyRouted(exchange);ServerHttpRequest request = exchange.getRequest();final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());final String url = requestUrl.toASCIIString();HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();filtered.forEach(httpHeaders::set);boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);//发送请求Flux<HttpClientResponse> responseFlux = httpClientWithTimeoutFrom(route).headers(headers -> {headers.add(httpHeaders);// Will either be set below, or later by Nettyheaders.remove(HttpHeaders.HOST);if (preserveHost) {String host = request.getHeaders().getFirst(HttpHeaders.HOST);headers.add(HttpHeaders.HOST, host);}}).request(method).uri(url).send((req, nettyOutbound) -> {if (log.isTraceEnabled()) {nettyOutbound.withConnection(connection -> log.trace("outbound route: "+ connection.channel().id().asShortText()+ ", inbound: " + exchange.getLogPrefix()));}return nettyOutbound.send(request.getBody().map(dataBuffer -> ((NettyDataBuffer) dataBuffer).getNativeBuffer()));}).responseConnection((res, connection) -> {// Defer committing the response until all route filters have run// Put client response as ServerWebExchange attribute and write// response later NettyWriteResponseFilterexchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);ServerHttpResponse response = exchange.getResponse();// put headers and status so filters can modify the responseHttpHeaders headers = new HttpHeaders();res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);if (StringUtils.hasLength(contentTypeValue)) {exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,contentTypeValue);}setResponseStatus(res, response);// make sure headers filters run after setting status so it is// available in responseHttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE);if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {// It is not valid to have both the transfer-encoding header and// the content-length header.// Remove the transfer-encoding header in the response if the// content-length header is present.response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);}exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,filteredResponseHeaders.keySet());response.getHeaders().putAll(filteredResponseHeaders);return Mono.just(res);});Duration responseTimeout = getResponseTimeout(route);if (responseTimeout != null) {responseFlux = responseFlux.timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))).onErrorMap(TimeoutException.class,th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,th.getMessage(), th));}return responseFlux.then(chain.filter(exchange));}
以上,就是请求进来的处理过程。