UE4异步编程专题 - 线程池FQueuedThreadPool

1. FQueuedThreadPool & IQueuedWork

FQueuedThreadPool是UE4中抽象出的线程池。线程池由若干个Worker线程,和一个同步队列构成。UE4把同步队列执行的任务抽象为IQueuedWork. 线程池的同步队列,就是一个IQueuedWork的队列了。借用wiki上线程池的图, UE4的FQueuedThreadPool也是如图中所示的结构:

Thread pool

生产者生产IQueuedWork的实例对象。线程池会向生产者提供入队的接口。线程池中的Worker线程都是消费者,会不停地从队列中取出IQueuedWork,并执行work.

下面的代码就是FQueuedThreadPool给用户使用的接口:

class CORE_API FQueuedThreadPool
{
public:// 创建线程池,指定线程数,还有每个worker栈大小及优先级virtual bool Create( uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority) = 0;// 销毁线程池,对Task Queue和每个worker线程执行清理操作virtual void Destroy() = 0;// 生产者使用这个接口,向同步队列添加IQueuedWorkvirtual void AddQueuedWork(IQueuedWork* InQueuedWork) = 0;// 生产者使用这个接口,尝试删除一个IQueuedWorkvirtual bool RetractQueuedWork(IQueuedWork* InQueuedWork) = 0;// 获取线程池中worker线程的数目virtual int32 GetNumThreads() const = 0;
};

需要提及的是,RetractQueuedWork接口只能尝试去删除或取消一个work对象。如果work不在队列当中,或者请求删除时已经在执行和执行完成,都无法取消。

IQueuedWork是同步队列中,任务对象的抽象。代码如下:

class IQueuedWork
{
public:virtual void DoThreadedWork() = 0;virtual void Abandon() = 0;
};

IQueuedWork的接口很简单,我们只需要实现代码中的两个接口,分别是任务的执行流程和废弃当前任务的接口。

2. FQueuedThread

FQueuedThread就是线程池worker线程的实现了。它是一个FRunnable的实现类,并内聚了一个FRunnableThread的实例对象。

class FQueuedThread : public FRunnable
{
protected:FRunnableThread* Thread;virtual uint32 Run() override;
};

FQueuedThread实现的Run函数,就是类似上一篇我们实现的MyRunnable的空闲等待的流程。我们回顾一下,实现所需的部件:

  1. 一个原子布尔变量作为循环的标识位
  2. 一个FEvent用来让线程在无任务可做时挂起,而不占用系统资源;

按照上面的思路,我们继续补完代码:

class FQueuedThread : public FRunnable
{
protected:FEvent*             DoWorkEvent;TAtomic<bool>       TimeToDie;FRunnableThread*    Thread;virtual uint32 Run() override{while(TimeToDie.Load(EMemoryOrder::Relaxed)){DoWorkEvent->Wait();// TODO ... do work}}
};

这样的实现有很严重的缺陷。无穷时间的等待,线程被挂起后,UE4无法获取这些线程的状态了。因此,UE4采用的是等待10ms,再check是否继续等待。

while(TimeToDie.Load(EMemoryOrder::Relaxed))
{bool bContinueWaiting = true; while(bContinueWaiting){DECLARE_SCOPE_CYCLE_COUNTER(...);       // record statusbContinueWaiting = !DoWorkEvent->Wait( 10 );}// TODO ... do work
}

被唤醒后意味着两种情况:

  1. 新的任务分配下来,有活干了;
  2. 线程池发出清理指令,线程即将退出;

把执行Work的代码加入,如下所示:

class FQueuedThread : public FRunnable
{
protected:FEvent*                 DoWorkEvent;TAtomic<bool>           TimeToDie;FRunnableThread*        Thread;IQueuedWork* volatile   QueuedWork;virtual uint32 Run() override{while(TimeToDie.Load(EMemoryOrder::Relaxed)){bool bContinueWaiting = true; while(bContinueWaiting){DECLARE_SCOPE_CYCLE_COUNTER(...);       // record statusbContinueWaiting = !DoWorkEvent->Wait( 10 );}IQueuedWork* LocalQueuedWork = QueuedWork;QueuedWork = nullptr;FPlatformMisc::MemoryBarrier();check(LocalQueuedWork || TimeToDie.Load(EMemoryOrder::Relaxed));while (LocalQueuedWork){LocalQueuedWork->DoThreadedWork();LocalQueuedWork = OwningThreadPool->ReturnToPoolOrGetNextJob(this);} }return 0;}
};

QueuedWork就是需要执行的work对象的指针,它被volatile修饰,说明还有其他的线程会修改这个指针,防止编译器生成直接从缓存中读取代码的优化。check方法,明显地指明了被唤醒时只有前面提及的两种情况。如果Work不为空,则调用IQueuedWork的DoThreadedWork接口。任务完成后的下一行代码,就是向所属线程池的同步队列再申请一个任务。如果队列中有任务,则继续执行新的任务。若队列已经为空,则将线程归还到线程池。线程池有一个QueuedThreads成员,记录线程池中的空闲的线程。

个人觉得UE4在check之后的实现略有不妥。在同时有Work要执行和TimeToDie为true时,UE4选择了继续执行完Work再退出。笔者认为TimeToDie为true时,应该放弃执行当前的work,直接退出。当然,这里不同的策略差别也不大,也不重要。

还有一个重要的函数,就是FQueuedThread::DoWork. 它是由生产者调用线程池的AddQueuedWork,线程池对象在进行调度的时候调用的。DoWork函数代码如下:

void FQueuedThread::DoWork(IQueuedWork* InQueuedWork)
{// ...QueuedWork = InQueuedWork;FPlatformMisc::MemoryBarrier();// Tell the thread to wake up and do its jobDoWorkEvent->Trigger();
}

值得提及的是两个函数中的内存屏障代码,FPlatformMisc::MemoryBarrier(). DoWork中会对QueuedWork进行写操作,而在Run函数中会对QueuedWork进行读操作,而且DoWork与Run发生在不同的线程,这样就产生了竞争条件(race condition). 一般的情况是上一个mutex lock,而UE4却没有,只使用了内存屏障。原因是这个竞争条件发生的时候,有且仅有一个线程写,有且仅有一个线程读;并且DoWork中的DoWorkEvent->Trigger(),发出一个事件告知已经准备好一个IQueuedWork,一定发生在Run函数中读取IQueuedWork之前。所以UE4使用内存屏障来保证顺序一致性,让Run函数从另外一个线程读取IQueuedWork时,能够读取到已经同步过后的值。关于无锁编程,大家感兴趣可以上purecpp的相关专题一起讨论。

3. FQueuedThreadPoolBase

再来看看线程池的实现类。FQueuedThreadPool的实现类只有一个,就是FQueuedThreadPoolBase类。我们从它的数据成员,可以很清晰地可以看出,该线程池的结构与第一节的所示的线程池的结构图是基本吻合的:

class FQueuedThreadPoolBase : public FQueuedThreadPool
{
protected:/** The work queue to pull from. */TArray<IQueuedWork*> QueuedWork;/** The thread pool to dole work out to. */TArray<FQueuedThread*> QueuedThreads;/** All threads in the pool. */TArray<FQueuedThread*> AllThreads;/** The synchronization object used to protect access to the queued work. */FCriticalSection* SynchQueue;/** If true, indicates the destruction process has taken place. */bool TimeToDie;// ....
}

数组QueuedWork和互斥锁SynchQueue,组成了一个线程安全的同步队列。AllThreads管理着全部的worker线程。TimeToDie是标识线程池生命状态,如果置为true,线程池的清理工作正在进行,或者已经进行完毕了。还有一个QueuedThreads成员,它管理着空闲的线程,也就是上一节FQueuedThread归还自己到线程池的空闲队列。

线程池的创建,会依次创建每个worker线程。线程池销毁的时候,会依次向每个worker线程发出销毁的命令,并等待线程退出。线程池的销毁会放弃还未执行的work. 创建和销毁的流程较为简单,就不详细展开了。后文着重讨论生产者向线程池添加work的流程。

生产者创建了一个IQueuedWork实现对象后,会调用第一节提及的AddQueuedWork接口,向线程池添加要执行的work. UE4控制线程池添加work的流程,实现的较为精细。它将线程池的状态分成了两类,来分别处理。这两种状态分别为: 1. 线程池中还有空闲线程,即QueuedThreads不为空,并且QueuedWork一定为空; 2. 线程池中已经没有空闲的线程,即QueuedThreads为空;

第一个情景的处理策略是从空闲线程数组中,取一个线程,并直接唤醒该线程执行由生产者当前传递进来的work. 第二个情景,较为简单,由于没有空闲线程可用,就直接将work入队即可。

void FQueuedThreadPoolBase::AddQueuedWork(IQueuedWork* InQueuedWork) /*override*/
{// ....FQueuedThread* Thread = nullptr;{FScopeLock sl(SynchQueue);const int32 AvailableThreadCount = QueuedThreads.Num();if (AvailableThreadCount == 0){// situation 2:QueuedWork.Add(InQueuedWork);return;}// situation 1:const int32 ThreadIndex = AvailableThreadCount - 1;Thread = QueuedThreads[ThreadIndex];QueuedThreads.RemoveAt(ThreadIndex, 1, false);}Thread->DoWork(InQueuedWork);
}

UE4处理情景一的实现,有两个优点。

第一,UE4并不是简单地让每个线程抢占任务队列中work. 而是在当有空闲线程的时候,小心地获取一个空闲线程,指定work并唤醒这一个线程。这样做的好处,是不会出现惊群效应,而让CPU浪费时间做无用的线程调度。

第二,从代码中可以看出,UE4每次获取空闲线程都是取数组的最末尾的空闲线程,也就是最近归还的work线程。这样做的好处是,最近归还的线程意味着它相比其他空闲线程是更近期使用过的。它有更大的概率,操作系统还未对它进行context切换,或者它的context数据还留存在缓存当中。优先使用该线程,就有更大的概率获取较为低廉的线程切换开销。

最后,线程池为worker线程提供的,从线程池获取下一个可用的work和归还空闲线程的接口,ReturnToPoolOrGetNextJob函数:

IQueuedWork* FQueuedThreadPoolBase::ReturnToPoolOrGetNextJob(FQueuedThread* InQueuedThread) /*override*/
{// ... omitted codesIQueuedWork* Work = nullptr;FScopeLock sl(SynchQueue);// ... omitted codesif (QueuedWork.Num() > 0){Work = QueuedWork[0];QueuedWork.RemoveAt(0, 1, false);}if (!Work)QueuedThreads.Add(InQueuedThread);return Work;
}

当任务队列中还有work时,就从队列头部取出一个,是一个FIFO的同步队列。当任务队列为空,无法取出新的任务时,线程就将自己归还给到线程池中,标记为空闲队列。UE4这里实现的不太妥当的就是QueuedWork是一个TArray<IQueuedWork*>数组。数组对非尾部元素的Remove操作,是会对数组元素进行移动的。虽然移动指针并不是很昂贵,而且UE4也禁止了Remove导致的shrink操作,但开销依然是存在的。这里最好的方案是使用一个可以扩容的环状队列。

4. 小结

本文讨论了UE4中线程池的实现细节。线程池FQueuedThreadPool的实现是由一个元素为IQueuedWork*的同步队列,及若干个worker线程所组成。UE4中的线程池,将IQueuedWork队列化,并用FIFO的调度策略。线程池为IQueuedWork的生产者提供了入队接口,并为worker线程(消费者)提供了获取出队接口。UE4对线程池的性能优化也做了不少的工作。例如避免线程池抢占IQueuedWork时可能会发生的惊群现象,以及取最近使用的线程,还有无锁编程等。

专题的下一篇,我们将讨论UE4中的AsyncTask. 这也是UE4迈向现代C++设计的有力步伐。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/432181.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UE4异步编程专题 - 多线程

专题的第二篇&#xff0c;我们聊聊UE4中的多线程的基础设施。UE4中最基础的模型就是FRunnable和FRunnableThread&#xff0c;FRunnable抽象出一个可以执行在线程上的对象&#xff0c;而FRunnableThread是平台无关的线程对象的抽象。后面的篇幅会详细讨论这些基础设施。 1. FRu…

坑爹的UICollectionView

最近用UICoolectionView的时候遇到一个很DT的问题&#xff0c;我往VC里加12个视图&#xff0c;结果显示成这样&#xff08;右边是期待的样子&#xff09;&#xff1a; 研究了一下午&#xff0c;终于发现了问题&#xff1a; interface FpLabelCell : UICollectionViewCellproper…

UE4异步编程专题 - TFunction

0. 关于这个专题 游戏要给用户良好的体验&#xff0c;都会尽可能的保证60帧或者更高的fps。一帧留给引擎的时间也不过16ms的时长&#xff0c;再除去渲染时间&#xff0c;留给引擎时间连10ms都不到&#xff0c;能做的事情是极其有限的。同步模式执行耗时的任务&#xff0c;时长…

UE4高级功能--初探超大无缝地图的实现LevelStream

LevelStream 实现超大无缝地图--官方文档学习 The Level Streaming feature makes it possible to load and unload map files into memory as well as toggle their visibility all during play. This makes it possible to have worlds broken up into smaller chunks so th…

inside uboot (二) 启动流程

1. S3C6410 启动流程 1). 6410上电后&#xff0c;首先执行片内iROM的程序&#xff08;BL0&#xff09;&#xff0c;初始化时钟和看门狗等外围器件。 2). 然后把flash中头4K&#xff08;BL1&#xff09;的内容加载到片内的SRAM中执行。 3). 在SRAM中执行的BL1&#xff0c;初始…

你理解我的意思么?

在最近一次的电话会议里, 某leader前后说了十来句, "你理解我的意思么?". 说实话, 有些我都没理解, 不过我听到的大家的答复都是"理解!", "明白!". Leader问这样的问题, 期望是得到对方给你反馈, 结果大部分人不会直接对上级说, "是的,我不…

inside uboot (三) 异常向量表

1. 异常向量表概述 从上面的地址映射来看&#xff0c;中断向量表的地址为0xD0037400&#xff0c;因此如果我们想在SRAM中&#xff0c;也就是BL1中处理异常的话&#xff0c; 就需要把我们的异常向量表拷贝到这个地址上。或者我们可以在链接脚本中直接指定代码的地址。 如果在主…

有关GNU GCC的基本内容整理

一、GCC简介 GCC&#xff08;GNU Compiler Collection&#xff0c;GNU编译器集合&#xff09;是一套由GNU工程开发的支持多种编程语言的编译器。GCC是自由软件发展过程中的著名例子&#xff0c;由自由软件基金会 以GPL协议发布。当年Richard Stallman 刚开始写作 GCC 的时候&am…

[UE4]C++静态加载问题:ConstructorHelpers::FClassFinder()和FObjectFinder()

相关内容&#xff1a; C实现动态加载的问题&#xff1a;LoadClass<T>()和LoadObject<T>() http://aigo.iteye.com/blog/2281558C实现动态加载UObject&#xff1a;StaticLoadObject()&#xff1b;以Texture和Material为例 http://aigo.iteye.com/blog/2268056 这…

inside uboot (五) DRAM的构成

DRAM(Dynamic Random Access Memory)&#xff0c;即动态随机存取存储器. 1. Storage Cell 如上图&#xff0c;一个DRAM的基本存储单元由4个部分组成。 Storage Capacitor&#xff0c;即存储电容&#xff0c;它通过存储在其中的电荷的多和少&#xff0c;或者说电容两端电压差的…

使用json-lib进行Java和JSON之间的转换

转自http://www.cnblogs.com/mailingfeng/archive/2012/01/18/2325707.html 1. json-lib是一个java类库&#xff0c;提供将Java对象&#xff0c;包括beans, maps, collections, java arrays and XML等转换成JSON&#xff0c;或者反向转换的功能。 2. json-lib 主页 &#xff1a…

inside uboot (六) DRAM芯片的控制线及时序

Clock &#xff08;差分信号&#xff0c;CLK和nCLK&#xff09;为时钟信号 &#xff08;同一个rank共用&#xff09; CKE 时钟信号使能 &#xff08;同一个rank共用&#xff09; RAS 为行选…

MVC — 初步理解IIS工作流程

声明&#xff1a;本文只是自己的总结和积累。IIS7.x 目录 IIS流程及组成部分ASP.NET流程及组成部分IIS与ASP.NET MVC一、IIS流程及组成部分  1、Http.SYS&#xff1a;负责监听HTTP请求&#xff08;它不属于IIS范畴&#xff0c;但是和IIS联系紧密&#xff09; Http.SYS和IIS是…

卷积积分这样学!

卷积积分是一种数学运算&#xff0c;那么既然是数学运算&#xff0c;那么就得有数学的特性——定义、性质、定理。 本文将从卷积积分的理论、案例、求解方法、知识图谱四方面介绍卷积积分&#xff01; 一、【理论】卷积积分的理论 卷积积分定义&#xff1a; 卷积图解01 卷积…

世界地图并不是世界的真实样貌!甚至误差非常大

现在所用的世界地图并不是世界的真实样貌&#xff0c;甚至误差大的离谱。 地球属于三维球体&#xff0c;想完整地表现到二维平面上是不可能的&#xff0c;必须牺牲一些真实属性。因为三维降到二维肯定存在扭曲失真&#xff0c;这是维度差异所决定的&#xff0c;不可避免。 我们…

error MSB6006: cmd.exe exited with code 3

利用vs2012和qt5.5.1&#xff0c;在编译例子时发生如下错误&#xff1a; C:\Program Files (x86)\MSBuild\Microsoft.Cpp\v4.0\V110\Microsoft.CppCommon.targets(172,5): error MSB6006: “cmd.exe”已退出&#xff0c;代码为 3。 图片如下&#xff1a; 解决办法如下&#x…

数学天才用5万字让你读懂:微积分!

前面接连发了三篇麦克斯韦方程组的文章&#xff08;积分篇、微分篇和电磁波篇&#xff09;&#xff0c;从理论上来说&#xff0c;讲麦克斯韦方程组不讲微积分是不行的&#xff0c;因为人家本来就是一组积分方程和一组微分方程。 但是&#xff0c;为了让更多人&#xff0c;尤其是…

指定一个actor对pawn不可见

1. 把一个staticmesh作成一个actor 2. 给actor添加一个tag 3. 在pawn的beginPlay里面查找这个actor&#xff0c;并设置actor的owner为pawn&#xff0c;然后调用set owner no see

oracle中DECODE与CASE的用法区别

对于CASE与DECODE其实并没有太多的区别&#xff0c;他们都是用来实现逻辑判断。Oracle的DECODE函数功能很强&#xff0c;灵活运用的话可以避免多次扫描&#xff0c;从而提高查询的性能。而CASE是9i以后提供的语法&#xff0c;这个语法更加的灵活&#xff0c;提供了IF THEN ELSE…