这篇文章,我们将从Ocelot的中间件源码分析,目前Ocelot已经实现那些功能,还有那些功能在我们实际项目中暂时还未实现,如果我们要使用这些功能,应该如何改造等方面来说明。
一、Ocelot源码解读
在使用一个组件前,最好我们要了解其中的一些原理,否则在使用过程中遇到问题,也无从下手,今天我带着大家一起来解读下Ocelot源码,并梳理出具体实现的原理和流程,便于我们根据需求扩展应用。
Ocelot源码地址[https://github.com/ThreeMammals/Ocelot],
Ocelot文档地址[https://ocelot.readthedocs.io/en/latest/]
查看.NETCORE
相关中间件源码,我们优先找到入口方法,比如Ocelot中间件使用的是app.UseOcelot()
,我们直接搜索UserOcelot,我们会找到OcelotMiddlewareExtensions
方法,里面是Ocelot中间件实际运行的方式和流程。
然后继续顺藤摸瓜,查看详细的实现,我们会发现如下代码
public static async Task<IApplicationBuilder> UseOcelot(this IApplicationBuilder builder, OcelotPipelineConfiguration pipelineConfiguration) { //创建配置信息var configuration = await CreateConfiguration(builder); //监听配置信息ConfigureDiagnosticListener(builder); //创建执行管道return CreateOcelotPipeline(builder, pipelineConfiguration);}
然后我们继续跟踪到创建管道方法,可以发现Ocelot的执行流程已经被找到,现在问题变的简单了,直接查看
private static IApplicationBuilder CreateOcelotPipeline(IApplicationBuilder builder, OcelotPipelineConfiguration pipelineConfiguration){ var pipelineBuilder = new OcelotPipelineBuilder(builder.ApplicationServices); //详细创建的管道顺序在此方法pipelineBuilder.BuildOcelotPipeline(pipelineConfiguration); var firstDelegate = pipelineBuilder.Build(); /*inject first delegate into first piece of asp.net middleware..maybe not like thisthen because we are updating the http context in ocelot it comes out correct forrest of asp.net..*/builder.Properties["analysis.NextMiddlewareName"] = "TransitionToOcelotMiddleware";builder.Use(async (context, task) =>{ var downstreamContext = new DownstreamContext(context); await firstDelegate.Invoke(downstreamContext);}); return builder;
}
管道创建流程及实现,会不会感觉到摸到大动脉了,核心的功能及原理基本找到了,那以后动手术也就可以避开一些坑了,我们可以对着这个执行顺序,再查看详细的源码,按照这个执行顺序查看源码,您就会发现整个思路非常清晰,每一步的实现一目了然。为了更直观的介绍源码的解读方式,这里我们就拿我们后续要操刀的中间件来讲解下中间件的具体实现。
public static class OcelotPipelineExtensions{ public static OcelotRequestDelegate BuildOcelotPipeline(this IOcelotPipelineBuilder builder,OcelotPipelineConfiguration pipelineConfiguration){ // This is registered to catch any global exceptions that are not handled// It also sets the Request Id if anything is set globallybuilder.UseExceptionHandlerMiddleware(); // If the request is for websockets upgrade we fork into a different pipelinebuilder.MapWhen(context => context.HttpContext.WebSockets.IsWebSocketRequest,app =>{ app.UseDownstreamRouteFinderMiddleware(); app.UseDownstreamRequestInitialiser(); app.UseLoadBalancingMiddleware(); app.UseDownstreamUrlCreatorMiddleware(); app.UseWebSocketsProxyMiddleware();}); // Allow the user to respond with absolutely anything they want.builder.UseIfNotNull(pipelineConfiguration.PreErrorResponderMiddleware); // This is registered first so it can catch any errors and issue an appropriate responsebuilder.UseResponderMiddleware(); // Then we get the downstream route informationbuilder.UseDownstreamRouteFinderMiddleware(); // This security module, IP whitelist blacklist, extended security mechanismbuilder.UseSecurityMiddleware(); //Expand other branch pipesif (pipelineConfiguration.MapWhenOcelotPipeline != null){ foreach (var pipeline in pipelineConfiguration.MapWhenOcelotPipeline){ builder.MapWhen(pipeline);}} // Now we have the ds route we can transform headers and stuff?builder.UseHttpHeadersTransformationMiddleware(); // Initialises downstream requestbuilder.UseDownstreamRequestInitialiser(); // We check whether the request is ratelimit, and if there is no continue processingbuilder.UseRateLimiting(); // This adds or updates the request id (initally we try and set this based on global config in the error handling middleware)// If anything was set at global level and we have a different setting at re route level the global stuff will be overwritten// This means you can get a scenario where you have a different request id from the first piece of middleware to the request id middleware.builder.UseRequestIdMiddleware(); // Allow pre authentication logic. The idea being people might want to run something custom before what is built in.builder.UseIfNotNull(pipelineConfiguration.PreAuthenticationMiddleware); // Now we know where the client is going to go we can authenticate them.// We allow the ocelot middleware to be overriden by whatever the// user wantsif (pipelineConfiguration.AuthenticationMiddleware == null){ builder.UseAuthenticationMiddleware();} else{ builder.Use(pipelineConfiguration.AuthenticationMiddleware);} // The next thing we do is look at any claims transforms in case this is important for authorisationbuilder.UseClaimsToClaimsMiddleware(); // Allow pre authorisation logic. The idea being people might want to run something custom before what is built in.builder.UseIfNotNull(pipelineConfiguration.PreAuthorisationMiddleware); // Now we have authenticated and done any claims transformation we // can authorise the request// We allow the ocelot middleware to be overriden by whatever the// user wantsif (pipelineConfiguration.AuthorisationMiddleware == null){//使用自定义认证,移除默认的认证方式//builder.UseAuthorisationMiddleware();}else{ builder.Use(pipelineConfiguration.AuthorisationMiddleware);} // Now we can run the claims to headers transformation middlewarebuilder.UseClaimsToHeadersMiddleware(); // Allow the user to implement their own query string manipulation logicbuilder.UseIfNotNull(pipelineConfiguration.PreQueryStringBuilderMiddleware); // Now we can run any claims to query string transformation middlewarebuilder.UseClaimsToQueryStringMiddleware(); // Get the load balancer for this requestbuilder.UseLoadBalancingMiddleware(); // This takes the downstream route we retrieved earlier and replaces any placeholders with the variables that should be usedbuilder.UseDownstreamUrlCreatorMiddleware(); // Not sure if this is the best place for this but we use the downstream url // as the basis for our cache key.builder.UseOutputCacheMiddleware(); //We fire off the request and set the response on the scoped data repobuilder.UseHttpRequesterMiddleware(); return builder.Build();} private static void UseIfNotNull(this IOcelotPipelineBuilder builder,Func<DownstreamContext, Func<Task>, Task> middleware){ if (middleware != null){ builder.Use(middleware);}}}
限流中间件实现解析
实现代码如下builder.UseRateLimiting();
,我们转到定义,得到如下代码,详细的实现逻辑在ClientRateLimitMiddleware
方法里,继续转定义到这个方法,我把方法里用到的内容注释了下。
public static class RateLimitMiddlewareExtensions{ public static IOcelotPipelineBuilder UseRateLimiting(this IOcelotPipelineBuilder builder) { return builder.UseMiddleware<ClientRateLimitMiddleware>();}
}public class ClientRateLimitMiddleware : OcelotMiddleware{ private readonly OcelotRequestDelegate _next; private readonly IRateLimitCounterHandler _counterHandler; private readonly ClientRateLimitProcessor _processor; public ClientRateLimitMiddleware(OcelotRequestDelegate next,IOcelotLoggerFactory loggerFactory,IRateLimitCounterHandler counterHandler):base(loggerFactory.CreateLogger<ClientRateLimitMiddleware>()) {_next = next;_counterHandler = counterHandler;_processor = new ClientRateLimitProcessor(counterHandler);} //熟悉的Tnvoke方法,所有的逻辑都在此方法里。public async Task Invoke(DownstreamContext context) { var options = context.DownstreamReRoute.RateLimitOptions; // 校验是否启用限流配置if (!context.DownstreamReRoute.EnableEndpointEndpointRateLimiting){//未启用直接进入下一个中间件Logger.LogInformation($"EndpointRateLimiting is not enabled for {context.DownstreamReRoute.DownstreamPathTemplate.Value}"); await _next.Invoke(context); return;} // 获取配置的校验客户端的方式var identity = SetIdentity(context.HttpContext, options); // 校验是否为白名单if (IsWhitelisted(identity, options)){//白名单直接放行Logger.LogInformation($"{context.DownstreamReRoute.DownstreamPathTemplate.Value} is white listed from rate limiting"); await _next.Invoke(context); return;} var rule = options.RateLimitRule; if (rule.Limit > 0){//限流数是否大于0// 获取当前客户端请求情况,这里需要注意_processor是从哪里注入的,后续重var counter = _processor.ProcessRequest(identity, options); // 校验请求数是否大于限流数if (counter.TotalRequests > rule.Limit){ //获取下次有效请求的时间,就是避免每次请求,都校验一次var retryAfter = _processor.RetryAfterFrom(counter.Timestamp, rule); // 写入日志LogBlockedRequest(context.HttpContext, identity, counter, rule, context.DownstreamReRoute); var retrystring = retryAfter.ToString(System.Globalization.CultureInfo.InvariantCulture); // 抛出超出限流异常并把下次可请求时间写入header里。await ReturnQuotaExceededResponse(context.HttpContext, options, retrystring); return;}} //如果启用了限流头部if (!options.DisableRateLimitHeaders){ var headers = _processor.GetRateLimitHeaders(context.HttpContext, identity, options);context.HttpContext.Response.OnStarting(SetRateLimitHeaders, state: headers);} //进入下一个中间件await _next.Invoke(context);} public virtual ClientRequestIdentity SetIdentity(HttpContext httpContext, RateLimitOptions option) { var clientId = "client"; if (httpContext.Request.Headers.Keys.Contains(option.ClientIdHeader)){clientId = httpContext.Request.Headers[option.ClientIdHeader].First();} return new ClientRequestIdentity(clientId,httpContext.Request.Path.ToString().ToLowerInvariant(),httpContext.Request.Method.ToLowerInvariant());} public bool IsWhitelisted(ClientRequestIdentity requestIdentity, RateLimitOptions option) { if (option.ClientWhitelist.Contains(requestIdentity.ClientId)){ return true;} return false;} public virtual void LogBlockedRequest(HttpContext httpContext, ClientRequestIdentity identity, RateLimitCounter counter, RateLimitRule rule, DownstreamReRoute downstreamReRoute) {Logger.LogInformation( $"Request {identity.HttpVerb}:{identity.Path} from ClientId {identity.ClientId} has been blocked, quota {rule.Limit}/{rule.Period} exceeded by {counter.TotalRequests}. Blocked by rule { downstreamReRoute.UpstreamPathTemplate.OriginalValue }, TraceIdentifier {httpContext.TraceIdentifier}.");} public virtual Task ReturnQuotaExceededResponse(HttpContext httpContext, RateLimitOptions option, string retryAfter) { var message = string.IsNullOrEmpty(option.QuotaExceededMessage) ? $"API calls quota exceeded! maximum admitted {option.RateLimitRule.Limit} per {option.RateLimitRule.Period}." : option.QuotaExceededMessage; if (!option.DisableRateLimitHeaders){httpContext.Response.Headers["Retry-After"] = retryAfter;}httpContext.Response.StatusCode = option.HttpStatusCode; return httpContext.Response.WriteAsync(message);} private Task SetRateLimitHeaders(object rateLimitHeaders) { var headers = (RateLimitHeaders)rateLimitHeaders;headers.Context.Response.Headers["X-Rate-Limit-Limit"] = headers.Limit;headers.Context.Response.Headers["X-Rate-Limit-Remaining"] = headers.Remaining;headers.Context.Response.Headers["X-Rate-Limit-Reset"] = headers.Reset; return Task.CompletedTask;}}
通过源码解析,发现实现一个限流还是很简单的吗!再进一步解析,IRateLimitCounterHandler
ClientRateLimitProcessor里的相关接口
又是怎么实现的呢?这时候我们就需要了解下.NETCORE 的运行原理,其中ConfigureServices
方法实现了依赖注入(DI)的配置。这时候我们看下Ocelot
是在哪里进行注入的呢?
services.AddOcelot()
是不是印象深刻呢?原来所有的注入信息都写在这里,那么问题简单了,Ctrl+F
查找AddOcelot
方法,马上就能定位到ServiceCollectionExtensions
方法,然后再转到定义OcelotBuilder
public static class ServiceCollectionExtensions{ public static IOcelotBuilder AddOcelot(this IServiceCollection services) { var service = services.First(x => x.ServiceType == typeof(IConfiguration)); var configuration = (IConfiguration)service.ImplementationInstance; return new OcelotBuilder(services, configuration);} public static IOcelotBuilder AddOcelot(this IServiceCollection services, IConfiguration configuration) { return new OcelotBuilder(services, configuration);}
}
又摸到大动脉啦,现在问题迎刃而解,原来所有的注入都写在这里,从这里可以找下我们熟悉的几个接口注入。
public OcelotBuilder(IServiceCollection services, IConfiguration configurationRoot)
{Configuration = configurationRoot;Services = services;Services.Configure<FileConfiguration>(configurationRoot);Services.TryAddSingleton<IOcelotCache<FileConfiguration>, InMemoryCache<FileConfiguration>>();Services.TryAddSingleton<IOcelotCache<CachedResponse>, InMemoryCache<CachedResponse>>();Services.TryAddSingleton<IHttpResponseHeaderReplacer, HttpResponseHeaderReplacer>();Services.TryAddSingleton<IHttpContextRequestHeaderReplacer, HttpContextRequestHeaderReplacer>();Services.TryAddSingleton<IHeaderFindAndReplaceCreator, HeaderFindAndReplaceCreator>();Services.TryAddSingleton<IInternalConfigurationCreator, FileInternalConfigurationCreator>();Services.TryAddSingleton<IInternalConfigurationRepository, InMemoryInternalConfigurationRepository>();Services.TryAddSingleton<IConfigurationValidator, FileConfigurationFluentValidator>();Services.TryAddSingleton<HostAndPortValidator>();Services.TryAddSingleton<IReRoutesCreator, ReRoutesCreator>();Services.TryAddSingleton<IAggregatesCreator, AggregatesCreator>();Services.TryAddSingleton<IReRouteKeyCreator, ReRouteKeyCreator>();Services.TryAddSingleton<IConfigurationCreator, ConfigurationCreator>();Services.TryAddSingleton<IDynamicsCreator, DynamicsCreator>();Services.TryAddSingleton<ILoadBalancerOptionsCreator, LoadBalancerOptionsCreator>();Services.TryAddSingleton<ReRouteFluentValidator>();Services.TryAddSingleton<FileGlobalConfigurationFluentValidator>();Services.TryAddSingleton<FileQoSOptionsFluentValidator>();Services.TryAddSingleton<IClaimsToThingCreator, ClaimsToThingCreator>();Services.TryAddSingleton<IAuthenticationOptionsCreator, AuthenticationOptionsCreator>();Services.TryAddSingleton<IUpstreamTemplatePatternCreator, UpstreamTemplatePatternCreator>();Services.TryAddSingleton<IRequestIdKeyCreator, RequestIdKeyCreator>();Services.TryAddSingleton<IServiceProviderConfigurationCreator,ServiceProviderConfigurationCreator>();Services.TryAddSingleton<IQoSOptionsCreator, QoSOptionsCreator>();Services.TryAddSingleton<IReRouteOptionsCreator, ReRouteOptionsCreator>();Services.TryAddSingleton<IRateLimitOptionsCreator, RateLimitOptionsCreator>();Services.TryAddSingleton<IBaseUrlFinder, BaseUrlFinder>();Services.TryAddSingleton<IRegionCreator, RegionCreator>();Services.TryAddSingleton<IFileConfigurationRepository, DiskFileConfigurationRepository>();Services.TryAddSingleton<IFileConfigurationSetter, FileAndInternalConfigurationSetter>();Services.TryAddSingleton<IServiceDiscoveryProviderFactory, ServiceDiscoveryProviderFactory>();Services.TryAddSingleton<ILoadBalancerFactory, LoadBalancerFactory>();Services.TryAddSingleton<ILoadBalancerHouse, LoadBalancerHouse>();Services.TryAddSingleton<IOcelotLoggerFactory, AspDotNetLoggerFactory>();Services.TryAddSingleton<IRemoveOutputHeaders, RemoveOutputHeaders>();Services.TryAddSingleton<IClaimToThingConfigurationParser, ClaimToThingConfigurationParser>();Services.TryAddSingleton<IClaimsAuthoriser, ClaimsAuthoriser>();Services.TryAddSingleton<IScopesAuthoriser, ScopesAuthoriser>();Services.TryAddSingleton<IAddClaimsToRequest, AddClaimsToRequest>();Services.TryAddSingleton<IAddHeadersToRequest, AddHeadersToRequest>();Services.TryAddSingleton<IAddQueriesToRequest, AddQueriesToRequest>();Services.TryAddSingleton<IClaimsParser, ClaimsParser>();Services.TryAddSingleton<IUrlPathToUrlTemplateMatcher, RegExUrlMatcher>();Services.TryAddSingleton<IPlaceholderNameAndValueFinder, UrlPathPlaceholderNameAndValueFinder>();Services.TryAddSingleton<IDownstreamPathPlaceholderReplacer, DownstreamTemplatePathPlaceholderReplacer>();Services.TryAddSingleton<IDownstreamRouteProvider, DownstreamRouteFinder>();Services.TryAddSingleton<IDownstreamRouteProvider, DownstreamRouteCreator>();Services.TryAddSingleton<IDownstreamRouteProviderFactory, DownstreamRouteProviderFactory>();Services.TryAddSingleton<IHttpRequester, HttpClientHttpRequester>();Services.TryAddSingleton<IHttpResponder, HttpContextResponder>();Services.TryAddSingleton<IErrorsToHttpStatusCodeMapper, ErrorsToHttpStatusCodeMapper>();Services.TryAddSingleton<IRateLimitCounterHandler, MemoryCacheRateLimitCounterHandler>();Services.TryAddSingleton<IHttpClientCache, MemoryHttpClientCache>();Services.TryAddSingleton<IRequestMapper, RequestMapper>();Services.TryAddSingleton<IHttpHandlerOptionsCreator, HttpHandlerOptionsCreator>();Services.TryAddSingleton<IDownstreamAddressesCreator, DownstreamAddressesCreator>();Services.TryAddSingleton<IDelegatingHandlerHandlerFactory, DelegatingHandlerHandlerFactory>();Services.TryAddSingleton<IHttpRequester, HttpClientHttpRequester>();// see this for why we register this as singleton http://stackoverflow.com/questions/37371264/invalidoperationexception-unable-to-resolve-service-for-type-microsoft-aspnetc// could maybe use a scoped data repositoryServices.TryAddSingleton<IHttpContextAccessor, HttpContextAccessor>();Services.TryAddSingleton<IRequestScopedDataRepository, HttpDataRepository>();Services.AddMemoryCache();Services.TryAddSingleton<OcelotDiagnosticListener>();Services.TryAddSingleton<IMultiplexer, Multiplexer>();Services.TryAddSingleton<IResponseAggregator, SimpleJsonResponseAggregator>();Services.TryAddSingleton<ITracingHandlerFactory, TracingHandlerFactory>();Services.TryAddSingleton<IFileConfigurationPollerOptions, InMemoryFileConfigurationPollerOptions>();Services.TryAddSingleton<IAddHeadersToResponse, AddHeadersToResponse>();Services.TryAddSingleton<IPlaceholders, Placeholders>();Services.TryAddSingleton<IResponseAggregatorFactory, InMemoryResponseAggregatorFactory>();Services.TryAddSingleton<IDefinedAggregatorProvider, ServiceLocatorDefinedAggregatorProvider>();Services.TryAddSingleton<IDownstreamRequestCreator, DownstreamRequestCreator>();Services.TryAddSingleton<IFrameworkDescription, FrameworkDescription>();Services.TryAddSingleton<IQoSFactory, QoSFactory>();Services.TryAddSingleton<IExceptionToErrorMapper, HttpExeptionToErrorMapper>();//add security this.AddSecurity();//add asp.net services..var assembly = typeof(FileConfigurationController).GetTypeInfo().Assembly;Services.AddMvcCore().AddApplicationPart(assembly).AddControllersAsServices().AddAuthorization().AddJsonFormatters();Services.AddLogging();Services.AddMiddlewareAnalysis();Services.AddWebEncoders();
}
至此Ocelot
源码解析就到这里了,其他的具体实现代码就根据流程一个一个查看即可,这里就不详细讲解了,因为我们已经掌握整个Ocelot代码的运行原理和实现方式及流程,项目里其他的一大堆的代码都是围绕这个流程去一步一步实现的。
有没有感觉添加一个中间件不是很复杂呢,是不是都跃跃欲试,准备尝试开发自己的自定义中间件啦,本篇就不介绍中间件的具体开发流程了,后续实战中会包含部分项目中需要用到的中间件,到时候会详细讲解如何规划和开发一个满足自己项目需求的中间件。
二、结合项目梳理功能
在完整学习完Ocelot文档和源码后,我们基本掌握了Ocelot目前已经实现的功能,再结合我们实际项目需求,我们梳理下还有哪些功能可能需要自己扩展实现。
项目设计网关基本需求包括路由、认证、授权、限流、缓存,仔细学习文档和源码后发现功能都已经存在,那是不是我们就可以直接拿来使用呢?这时候我们需要拿出一些复杂业务场景来对号入座,看能否实现复杂场景的一些应用。
1、授权
能否为每一个客户端设置独立的访问权限,如果客户端A可以访问服务A、服务B,客户端B只能访问服务A,从网关层面直接授权,不满足需求不路由到具体服务。从文档和代码分析后发现暂时未实现。
2、限流
能否为每一个客户端设置不能限流规则,例如客户端A为我们内容应用,我希望对服务A不启用限流,客户端B为第三方接入应用,我需要B访问服务A访问进行单独限流(30次/分钟),看能否通过配置实现自定义限流。从文档和代码分析后发现暂时未实现。
3、缓存
通过代码发现目前缓存实现的只是Dictionary方式实现的缓存,不能实现分布式结构的应用。
通过分析我们发现列举的5个基本需求,尽然有3个在我们实际项目应用中可能会存在问题,如果不解决这些问题,很难直接拿这个完美的网关项目应用到正式项目,所以我们到通过扩展Ocelot方法来实现我们的目的。
如何扩展呢
为了满足我们项目应用的需要,我们需要为每一个路由进行单独设置,如果还采用配置文件的方式,肯定无法满足需求,且后续网关动态增加路由、授权、限流等无法控制,所以我们需要把网关配置信息从配置文件中移到数据库中,由数据库中的路由表、限流表、授权表等方式记录当前网关的应用,且后续扩展直接在数据库中增加或减少相关配置,然后动态更新网关配置实现网关的高可用。
想一想是不是有点小激动,原来只要稍微改造下宝骏瞬间变宝马,那接下来的课程就是网关改造之旅,我会从设计、思想、编码等方面讲解下如何实现我们的第一辆宝马。
本系列文章我也是边想边写边实现,如果发现中间有任何描述或实现不当的地方,也请各位大神批评指正,我会第一时间整理并修正,避免让后续学习的人走弯路。
相关文章:
AspNetCore中使用Ocelot之 IdentityServer4
Ocelot-基于.NET Core的开源网关实现
.NET Core微服务之基于Ocelot+IdentityServer实现统一验证与授权
Swagger如何访问Ocelot中带权限验证的API
Ocelot.JwtAuthorize:一个基于网关的Jwt验证包
.NET Core微服务之基于Ocelot实现API网关服务
.NET Core微服务之基于Ocelot实现API网关服务(续)
.NET微服务体系结构中为什么使用Ocelot实现API网关
Ocelot简易教程(一)之Ocelot是什么
Ocelot简易教程(二)之快速开始1
Ocelot简易教程(二)之快速开始2
Ocelot简易教程(三)之主要特性及路由详解
Ocelot简易教程(四)之请求聚合以及服务发现
Ocelot简易教程(五)之集成IdentityServer认证以及授权
Ocelot简易教程(六)之重写配置文件存储方式并优化响应数据
Ocelot简易教程(七)之配置文件数据库存储插件源码解析
ASP.NET Core中Ocelot的使用:API网关的应用
ASP.NET Core中Ocelot的使用:基于Spring Cloud Netflix Eureka的动态路由
ASP.NET Core中Ocelot的使用:基于服务发现的负载均衡
原文地址: https://www.cnblogs.com/jackcao/p/9937213.html
.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com