为了可以更灵活地在Webapi应用服务中分配线程资源,BeetleX.FastHttpApi在线程调度上直接细化到Action级别;组件不仅可以精准控制每个Action的最大RPS限制,还能精细到控制使用多少线程资源来处理这些API的请求。接下来详细讲解组件针对这一块的实现结构和代码。
需求
为什么要做到这么精细的控制呢?如果有足够资源那是不用考虑这方面的问题;但实际应用中资源不足是经常需要面对的问题。在整个服务中往往有些API非常占用资源,这个时候就希望通过简单配置来控制API使用的线程数达到一个理想的资源分配结果。在控制上最直接的办法是控制对应的RPS数量,但有时候希望以线程资源的方式来分配。
使用
组件可以在控制器的Action上根据需求标记对应的限制属性
//每秒最大处理数100,超过就拒绝[RequestMaxRPS(100)]public object MasRps(IHttpContext context){return DateTime.Now;}//不限制,由框架通过线程池调度public object None(string name, IHttpContext context){return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";}//所有请求用一个线程有序处理[ThreadQueue(ThreadQueueType.Single)]public object SingleQueue(string name, IHttpContext context){return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";}//所有请求分配两个线程有序处理[ThreadQueue(ThreadQueueType.Multiple, 2)]public object MultipleQueue(string name, IHttpContext context){return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";}//根据Name的值一致线程处理,同一值会分配到一个线程中有序处理[ThreadQueue("name")]public object UniqueQueue(string name, IHttpContext context){return $"Name:{name}|QueueID:{context.Queue?.ID}|Time:{DateTime.Now}";}
设计实现
接下来看一下BeetleX.FastHttpApi组件代码是如何进行工作的。由于需要线程控制,那自然就需要一个队列;组件提供一个NextQueue的队列来完成这方面的工作,每个NextQueue会分配一个线程来处理。
public class NextQueue : IDisposable{public NextQueue(){mQueue = new System.Collections.Concurrent.ConcurrentQueue<IEventWork>();ID = System.Threading.Interlocked.Increment(ref mID);}public long ID { get; set; }private static long mID;private readonly object _workSync = new object();private bool _doingWork;private int mCount;private System.Collections.Concurrent.ConcurrentQueue<IEventWork> mQueue;public int Count => mCount;//添加任务到队列中public void Enqueue(IEventWork item){mQueue.Enqueue(item);System.Threading.Interlocked.Increment(ref mCount);lock (_workSync){//当前队列是否工作中if (!_doingWork){//获取一个线程进行工作System.Threading.ThreadPool.QueueUserWorkItem(OnStart);_doingWork = true;}}}private void OnError(Exception e, IEventWork work){try{Error?.Invoke(e, work);}catch{}}public static Action<Exception, IEventWork> Error { get; set; }private async void OnStart(object state){while (true){//获取队列任务并执行while (mQueue.TryDequeue(out IEventWork item)){System.Threading.Interlocked.Decrement(ref mCount);using (item){try{//等待任务执行await item.Execute();}catch (Exception e_){OnError(e_, item);}}}lock (_workSync){//队列为空跑出线程if (mQueue.IsEmpty){try{Unused?.Invoke();}catch { }_doingWork = false;return;}}}}public Action Unused { get; set; }public void Dispose(){while (mQueue.TryDequeue(out IEventWork work)){try{work.Dispose();}catch{}}}}
NextQueue是一个支持异步任务的处理队列,它确保添加进来的任务都是有序执行,即使任务内部处理的任务是异步。
ActionContext
该对象是用于执行控制器方法,包括webapi控制器和Websocket控制器。在这里只讲述控制怎样调度执行的,更详细了解可以查看
https://github.com/beetlex-io/FastHttpApi/blob/master/src/ActionContext.cs
主要讲解一下Execute方法是怎样调用控制器方法的
internal async Task Execute(IActionResultHandler resultHandler){//验证RPSif (Handler.ValidateRPS()){Handler.IncrementRequest();//是否存在队列控制配置if (Handler.ThreadQueue == null || Handler.ThreadQueue.Type == ThreadQueueType.None){if (Handler.Async)//异步方法{await OnAsyncExecute(resultHandler);}else{//同步方法OnExecute(resultHandler);}}else{//配置了队列控制r妊ActionTask actionTask = new ActionTask(this, resultHandler,new TaskCompletionSource<object>());//获取异步队列var queue = Handler.ThreadQueue.GetQueue(this.HttpContext);//阶列是否有效,为了安全队列都有最大等待数限制,超过就拒绝处理if (Handler.ThreadQueue.Enabled(queue)){this.HttpContext.Queue = queue;//把当前任务插入队列queue.Enqueue(actionTask);//等待队执行结果通知await actionTask.CompletionSource.Task;}else{Handler.IncrementError();resultHandler.Error(new Exception($"{Handler.SourceUrl} process error,out of queue limit!"), EventArgs.LogType.Warring, 500);}}}else{Handler.IncrementError();resultHandler.Error(new Exception($"{Handler.SourceUrl} process error,out of max rps!"), EventArgs.LogType.Warring, 509);}}
GetQueue
应该方法根据当前请示信息和配置来获取对应的异步队列
public NextQueue GetQueue(IHttpContext context){//单队执行,永远返回针对当前控制器方法的第一个队列if (Type == ThreadQueueType.Single)return QueueGroup.Queues[0];//轮循当前分配最大队列数else if (Type == ThreadQueueType.Multiple)return QueueGroup.Next();//针对请求数据做一致性队列分配else if (Type == ThreadQueueType.DataUnique){string value = null;if (UniqueName != null){if (string.Compare(UniqueName, "$path", true) == 0){value = context.Request.GetSourcePath();}else if(UniqueName.IndexOf("__")==0){return mUniqueQueueGroup.Has(UniqueName.GetHashCode());}else{value = context.Request.Header[UniqueName];if (value == null)context.Data.TryGetString(UniqueName, out value);}}if (value == null)value = context.Request.GetSourceUrl();return mUniqueQueueGroup.Has(value.GetHashCode());}//如果都没匹配到就获取轮循的下一个return QueueGroup.Next();}
ActionTask
方法异步任务对象,队列会有序地执行相关对象,这对象的实现非常简单。
struct ActionTask : IEventWork{public ActionTask(ActionContext context, IActionResultHandler resultHandler, TaskCompletionSource<object> completionSource){Context = context;ResultHandler = resultHandler;CompletionSource = completionSource;}public TaskCompletionSource<object> CompletionSource { get; set; }public ActionContext Context { get; set; }public IActionResultHandler ResultHandler { get; set; }public void Dispose(){}public async Task Execute(){try{if (Context.Handler.Async){//异步方法await Context.OnAsyncExecute(ResultHandler);}else{//同步方法Context.OnExecute(ResultHandler);}}finally{//回调执行完成,让队列继续下一个任务。CompletionSource?.TrySetResult(new object());}}}
总结
到这里整个线程调度的核心就介绍完成了,如果不了解一些基础知识会感觉完成这些功能很复杂,其实都是一些基础功能的应用; 完成这些功能主要涉及几个基础知识分别是:队列,线程池和用于处理异步回调的TaskCompletionSource对象。
BeetleX
开源跨平台通讯框架(支持TLS)
提供高性能服务和大数据处理解决方案
https://beetlex.io