1. FQueuedThreadPool & IQueuedWork
FQueuedThreadPool是UE4中抽象出的线程池。线程池由若干个Worker线程,和一个同步队列构成。UE4把同步队列执行的任务抽象为IQueuedWork. 线程池的同步队列,就是一个IQueuedWork的队列了。借用wiki上线程池的图, UE4的FQueuedThreadPool也是如图中所示的结构:
Thread pool
生产者生产IQueuedWork的实例对象。线程池会向生产者提供入队的接口。线程池中的Worker线程都是消费者,会不停地从队列中取出IQueuedWork,并执行work.
下面的代码就是FQueuedThreadPool给用户使用的接口:
class CORE_API FQueuedThreadPool
{
public:// 创建线程池,指定线程数,还有每个worker栈大小及优先级virtual bool Create( uint32 InNumQueuedThreads, uint32 StackSize, EThreadPriority ThreadPriority) = 0;// 销毁线程池,对Task Queue和每个worker线程执行清理操作virtual void Destroy() = 0;// 生产者使用这个接口,向同步队列添加IQueuedWorkvirtual void AddQueuedWork(IQueuedWork* InQueuedWork) = 0;// 生产者使用这个接口,尝试删除一个IQueuedWorkvirtual bool RetractQueuedWork(IQueuedWork* InQueuedWork) = 0;// 获取线程池中worker线程的数目virtual int32 GetNumThreads() const = 0;
};
需要提及的是,RetractQueuedWork接口只能尝试去删除或取消一个work对象。如果work不在队列当中,或者请求删除时已经在执行和执行完成,都无法取消。
IQueuedWork是同步队列中,任务对象的抽象。代码如下:
class IQueuedWork
{
public:virtual void DoThreadedWork() = 0;virtual void Abandon() = 0;
};
IQueuedWork的接口很简单,我们只需要实现代码中的两个接口,分别是任务的执行流程和废弃当前任务的接口。
2. FQueuedThread
FQueuedThread就是线程池worker线程的实现了。它是一个FRunnable的实现类,并内聚了一个FRunnableThread的实例对象。
class FQueuedThread : public FRunnable
{
protected:FRunnableThread* Thread;virtual uint32 Run() override;
};
FQueuedThread实现的Run函数,就是类似上一篇我们实现的MyRunnable的空闲等待的流程。我们回顾一下,实现所需的部件:
- 一个原子布尔变量作为循环的标识位
- 一个FEvent用来让线程在无任务可做时挂起,而不占用系统资源;
按照上面的思路,我们继续补完代码:
class FQueuedThread : public FRunnable
{
protected:FEvent* DoWorkEvent;TAtomic<bool> TimeToDie;FRunnableThread* Thread;virtual uint32 Run() override{while(TimeToDie.Load(EMemoryOrder::Relaxed)){DoWorkEvent->Wait();// TODO ... do work}}
};
这样的实现有很严重的缺陷。无穷时间的等待,线程被挂起后,UE4无法获取这些线程的状态了。因此,UE4采用的是等待10ms,再check是否继续等待。
while(TimeToDie.Load(EMemoryOrder::Relaxed))
{bool bContinueWaiting = true; while(bContinueWaiting){DECLARE_SCOPE_CYCLE_COUNTER(...); // record statusbContinueWaiting = !DoWorkEvent->Wait( 10 );}// TODO ... do work
}
被唤醒后意味着两种情况:
- 新的任务分配下来,有活干了;
- 线程池发出清理指令,线程即将退出;
把执行Work的代码加入,如下所示:
class FQueuedThread : public FRunnable
{
protected:FEvent* DoWorkEvent;TAtomic<bool> TimeToDie;FRunnableThread* Thread;IQueuedWork* volatile QueuedWork;virtual uint32 Run() override{while(TimeToDie.Load(EMemoryOrder::Relaxed)){bool bContinueWaiting = true; while(bContinueWaiting){DECLARE_SCOPE_CYCLE_COUNTER(...); // record statusbContinueWaiting = !DoWorkEvent->Wait( 10 );}IQueuedWork* LocalQueuedWork = QueuedWork;QueuedWork = nullptr;FPlatformMisc::MemoryBarrier();check(LocalQueuedWork || TimeToDie.Load(EMemoryOrder::Relaxed));while (LocalQueuedWork){LocalQueuedWork->DoThreadedWork();LocalQueuedWork = OwningThreadPool->ReturnToPoolOrGetNextJob(this);} }return 0;}
};
QueuedWork就是需要执行的work对象的指针,它被volatile修饰,说明还有其他的线程会修改这个指针,防止编译器生成直接从缓存中读取代码的优化。check方法,明显地指明了被唤醒时只有前面提及的两种情况。如果Work不为空,则调用IQueuedWork的DoThreadedWork接口。任务完成后的下一行代码,就是向所属线程池的同步队列再申请一个任务。如果队列中有任务,则继续执行新的任务。若队列已经为空,则将线程归还到线程池。线程池有一个QueuedThreads成员,记录线程池中的空闲的线程。
个人觉得UE4在check之后的实现略有不妥。在同时有Work要执行和TimeToDie为true时,UE4选择了继续执行完Work再退出。笔者认为TimeToDie为true时,应该放弃执行当前的work,直接退出。当然,这里不同的策略差别也不大,也不重要。
还有一个重要的函数,就是FQueuedThread::DoWork. 它是由生产者调用线程池的AddQueuedWork,线程池对象在进行调度的时候调用的。DoWork函数代码如下:
void FQueuedThread::DoWork(IQueuedWork* InQueuedWork)
{// ...QueuedWork = InQueuedWork;FPlatformMisc::MemoryBarrier();// Tell the thread to wake up and do its jobDoWorkEvent->Trigger();
}
值得提及的是两个函数中的内存屏障代码,FPlatformMisc::MemoryBarrier(). DoWork中会对QueuedWork进行写操作,而在Run函数中会对QueuedWork进行读操作,而且DoWork与Run发生在不同的线程,这样就产生了竞争条件(race condition). 一般的情况是上一个mutex lock,而UE4却没有,只使用了内存屏障。原因是这个竞争条件发生的时候,有且仅有一个线程写,有且仅有一个线程读;并且DoWork中的DoWorkEvent->Trigger(),发出一个事件告知已经准备好一个IQueuedWork,一定发生在Run函数中读取IQueuedWork之前。所以UE4使用内存屏障来保证顺序一致性,让Run函数从另外一个线程读取IQueuedWork时,能够读取到已经同步过后的值。关于无锁编程,大家感兴趣可以上purecpp的相关专题一起讨论。
3. FQueuedThreadPoolBase
再来看看线程池的实现类。FQueuedThreadPool的实现类只有一个,就是FQueuedThreadPoolBase类。我们从它的数据成员,可以很清晰地可以看出,该线程池的结构与第一节的所示的线程池的结构图是基本吻合的:
class FQueuedThreadPoolBase : public FQueuedThreadPool
{
protected:/** The work queue to pull from. */TArray<IQueuedWork*> QueuedWork;/** The thread pool to dole work out to. */TArray<FQueuedThread*> QueuedThreads;/** All threads in the pool. */TArray<FQueuedThread*> AllThreads;/** The synchronization object used to protect access to the queued work. */FCriticalSection* SynchQueue;/** If true, indicates the destruction process has taken place. */bool TimeToDie;// ....
}
数组QueuedWork和互斥锁SynchQueue,组成了一个线程安全的同步队列。AllThreads管理着全部的worker线程。TimeToDie是标识线程池生命状态,如果置为true,线程池的清理工作正在进行,或者已经进行完毕了。还有一个QueuedThreads成员,它管理着空闲的线程,也就是上一节FQueuedThread归还自己到线程池的空闲队列。
线程池的创建,会依次创建每个worker线程。线程池销毁的时候,会依次向每个worker线程发出销毁的命令,并等待线程退出。线程池的销毁会放弃还未执行的work. 创建和销毁的流程较为简单,就不详细展开了。后文着重讨论生产者向线程池添加work的流程。
生产者创建了一个IQueuedWork实现对象后,会调用第一节提及的AddQueuedWork接口,向线程池添加要执行的work. UE4控制线程池添加work的流程,实现的较为精细。它将线程池的状态分成了两类,来分别处理。这两种状态分别为: 1. 线程池中还有空闲线程,即QueuedThreads不为空,并且QueuedWork一定为空; 2. 线程池中已经没有空闲的线程,即QueuedThreads为空;
第一个情景的处理策略是从空闲线程数组中,取一个线程,并直接唤醒该线程执行由生产者当前传递进来的work. 第二个情景,较为简单,由于没有空闲线程可用,就直接将work入队即可。
void FQueuedThreadPoolBase::AddQueuedWork(IQueuedWork* InQueuedWork) /*override*/
{// ....FQueuedThread* Thread = nullptr;{FScopeLock sl(SynchQueue);const int32 AvailableThreadCount = QueuedThreads.Num();if (AvailableThreadCount == 0){// situation 2:QueuedWork.Add(InQueuedWork);return;}// situation 1:const int32 ThreadIndex = AvailableThreadCount - 1;Thread = QueuedThreads[ThreadIndex];QueuedThreads.RemoveAt(ThreadIndex, 1, false);}Thread->DoWork(InQueuedWork);
}
UE4处理情景一的实现,有两个优点。
第一,UE4并不是简单地让每个线程抢占任务队列中work. 而是在当有空闲线程的时候,小心地获取一个空闲线程,指定work并唤醒这一个线程。这样做的好处,是不会出现惊群效应,而让CPU浪费时间做无用的线程调度。
第二,从代码中可以看出,UE4每次获取空闲线程都是取数组的最末尾的空闲线程,也就是最近归还的work线程。这样做的好处是,最近归还的线程意味着它相比其他空闲线程是更近期使用过的。它有更大的概率,操作系统还未对它进行context切换,或者它的context数据还留存在缓存当中。优先使用该线程,就有更大的概率获取较为低廉的线程切换开销。
最后,线程池为worker线程提供的,从线程池获取下一个可用的work和归还空闲线程的接口,ReturnToPoolOrGetNextJob函数:
IQueuedWork* FQueuedThreadPoolBase::ReturnToPoolOrGetNextJob(FQueuedThread* InQueuedThread) /*override*/
{// ... omitted codesIQueuedWork* Work = nullptr;FScopeLock sl(SynchQueue);// ... omitted codesif (QueuedWork.Num() > 0){Work = QueuedWork[0];QueuedWork.RemoveAt(0, 1, false);}if (!Work)QueuedThreads.Add(InQueuedThread);return Work;
}
当任务队列中还有work时,就从队列头部取出一个,是一个FIFO的同步队列。当任务队列为空,无法取出新的任务时,线程就将自己归还给到线程池中,标记为空闲队列。UE4这里实现的不太妥当的就是QueuedWork是一个TArray<IQueuedWork*>数组。数组对非尾部元素的Remove操作,是会对数组元素进行移动的。虽然移动指针并不是很昂贵,而且UE4也禁止了Remove导致的shrink操作,但开销依然是存在的。这里最好的方案是使用一个可以扩容的环状队列。
4. 小结
本文讨论了UE4中线程池的实现细节。线程池FQueuedThreadPool的实现是由一个元素为IQueuedWork*的同步队列,及若干个worker线程所组成。UE4中的线程池,将IQueuedWork队列化,并用FIFO的调度策略。线程池为IQueuedWork的生产者提供了入队接口,并为worker线程(消费者)提供了获取出队接口。UE4对线程池的性能优化也做了不少的工作。例如避免线程池抢占IQueuedWork时可能会发生的惊群现象,以及取最近使用的线程,还有无锁编程等。
专题的下一篇,我们将讨论UE4中的AsyncTask. 这也是UE4迈向现代C++设计的有力步伐。