c++11、14多线程从原理到线程池
- 一.初识
- 二.std::thread对象生命周期和线程等待与分离
- 1.主线程不退出,thread对象被销毁,子线程仍然在运行。
- 2.主线程阻塞,等待子线程退出
- 3.子线程与主线程分离(守护线程)
- 三.线程创建的多种方式
- 1.全局函数(静态成员函数)作为线程入口
- 线程的参数在传递进线程内时做了复制
- 参数的生命周期
- 为什么会有三次析构?
- 2.指针、引用参数传递的一些坑
- 传递的空间已经销毁
- 多线程共享访问同一块空间
- 传递的指针变量的生命周期小于线程
- 3.普通成员函数作为线程入口
- 4.lambda临时函数作为线程入口
- 临时的lambda
- 类成员的lambda
- 5.线程基类的封装
- 四.多线程通信和同步
- 1.多线程状态
- 2.竞争状态和临界区
- 3.互斥体和锁mutex
- 3.1 互斥锁的坑
- 3.2 超时锁应用(避免长时间死锁)
- 3.3 递归锁(可重入锁)应用
- 3.4 共享锁
- 1. 主要方法
- 2. 解决读写问题
- 3. 读写锁的问题
- 写饥饿问题
- 解决写饥饿问题的方法
- C++标准库中的解决方案
- 4.利用栈特性自动释放锁RAII
- 4.1 手动实现RAII
- 五.RAII管理锁
- 1. C++11支持的RAII lock_guard
- 2. C++11支持的unique_lock
- 2.1 临时释放锁
- 2.2 支持adopt_lock
- 2.3 支持defer_lock
- 2.4 支持try_to_lock
- 3.C++14 shared_lock
- 4.C++17 scoped_lock
- 六.项目案例:线程通信
- 七.条件变量
- 1.生产者-消费者模型
- 八.多线程异步通信
- 1.promise和future
- 值得注意的地方
- 2.packaged_task异步调用函数打包
- 3.async(异步函数调用)
- 九.C++多核运算实现
- 手动实现多核base16编码
- 1.测试单核base16编码效率
- 注意
- 2.测试多核base16编码效率
- 注意
- 3.测试C++17多核base16编码效率
- 十.线程池的实现
- 1.基础版本
- 1.1初始化线程池
- 1.2启动所有线程
- 1.3准备任务处理基类
- 1.4插入任务
- 1.5获取任务接口
- 1.6执行任务线程入口函数
- 2.版本v2.0
- 2.1 增加线程池的退出
- 2.2 显示线程池中正在运行的任务数量
- 2.3 使用智能指针管理线程对象和任务对象的生命周期
- 2.4 异步获取线程池中任务执行的结果
- 3.使用线程池实现音视频批量转码任务
C+11 14 17 多线程从原理到线程池
为什么要使用多线程?
- 任务分解
- 耗时的操作,任务分解,实时响应
- 数据分解
- 充分利用多核cpu处理数据
- 数据流分解
- 读写分离,解耦合设计
一.初识
当启动一个C++程序时,操作系统会为该程序创建一个新的进程。进程是程序的执行实例,它拥有自己的内存空间、资源和执行环境。在这个新创建的进程中,操作系统会自动为程序创建一个主线程,该线程的执行将从程序的
main
函数开始。这个主线程负责执行程序的主要逻辑,包括初始化、运行程序的主要功能以及清理工作。在主线程执行
main
函数期间,程序可以创建额外的线程来执行其他任务。这些额外的线程可以并行地执行不同的工作,例如处理用户输入、执行后台任务、响应网络请求等。与主线程不同,这些额外的线程通常被称为工作线程或辅助线程。
线程的相关函数都在命名空间std下。
如何区分每一个线程?
通过线程id号
让线程休眠一会
-
sleep_for
用于让当前线程休眠一段指定的时间。 -
chrono::seconds(1)
: chrono 是 C++11 中的命名空间,提供了时间相关的功能。seconds(1)
创建了一个chrono::seconds
类型的时间段,表示1秒的时间长度。
二.std::thread对象生命周期和线程等待与分离
测试线程对象生命周期
1.主线程不退出,thread对象被销毁,子线程仍然在运行。
2.主线程阻塞,等待子线程退出
但是我们不想主线程还要维护子线程,导致主线程执行受到影响。
3.子线程与主线程分离(守护线程)
不过最常用的还是join,但是需要设置某些变量,当主线程退出的时候可以通知子线程退出,以免子线程访问主线程拥有的堆栈空间,出现访问错误。
三.线程创建的多种方式
注意:线程的参数在传递进线程内时做了复制。
1.全局函数(静态成员函数)作为线程入口
线程的参数在传递进线程内时做了复制
- C++11中 thread 构造函数会将其参数进行复制,因此在创建线程时,传递的参数会被复制一份给线程函数。这样做的原因是为了确保每一个线程函数能够获取到一个独立的拷贝,避免数据竞争和线程安全问题。
在上面代码中,线程创建完后,局部变量f被回收,但是线程执行函数中正确打印了3.1,说明对参数进行了复制。
参数的生命周期
那么现在来研究一下参数的生命周期。
这里我们创建一个类,根据类对象的生命周期来跟踪线程中参数的生命周期。
为什么会有三次析构?
当调用 std::thread
的构造函数时,会发生以下过程:
- 首先,在
main
函数中创建的Para
对象p
会在其作用域结束时析构。 - 在创建线程时,
std::thread
的构造函数会复制参数,以便将参数传递给线程函数。这个过程中会调用拷贝构造函数创建一个新的Para
对象,并在构造函数结束后销毁这个临时对象。 - 线程开始执行后,
Para
对象作为参数被复制到线程函数的堆栈中。当线程函数执行结束时,这个对象会被销毁。
对于较大的对象,我们传指针或者引用会更高效,因为这样可以避免不必要的拷贝构造和析构操作。传递指针或引用只会传递对象的地址,不会复制对象的内容,从而减少了开销和潜在的性能问题。
2.指针、引用参数传递的一些坑
传递的空间已经被销毁
多线程共享访问同一块空间
传递的指针变量的生命周期小于线程
传递的空间已经销毁
可以看到,如果传递指针或引用进线程,那么如果指针所指的对象被销毁了,线程中也无法访问了。
解决的方法:
- 传堆中的参数,控制堆空间释放在线程结束后
- 静态对象在程序的生命周期内都存在,因此可以保证在任何线程使用时都不会被销毁。
- 参数放在类中,确保对象的生命周期和线程一致。
注意:
没有使用
std::ref()
,而直接传递参数p
给线程时,编译器会尝试匹配ThreadMainRef(Para p)
这样的函数。但是因为std::thread
的构造函数模板需要匹配线程函数的函数类型,而ThreadMainRef
函数期望的参数是一个引用类型,而不是一个对象。因此,编译器无法找到匹配的函数,导致编译失败。使用
std::ref()
可以将参数转换为引用类型,这样就能够匹配ThreadMainRef(Para& p)
这样的函数,从而解决了编译器无法找到匹配函数的问题。
多线程共享访问同一块空间
当多个线程共享访问同一个对象或内存区域时,如果没有正确的同步机制,可能会导致数据竞争,出现数据不一致的情况。这通常发生在以下场景:
-
一个线程在写入数据,而另一个线程在读取或写入同一数据。
-
没有使用任何同步机制,如互斥锁(std::mutex)。
传递的指针变量的生命周期小于线程
当你将指针传递给线程时,如果指针变量的生命周期结束,而线程仍在运行,可能会导致线程访问无效的指针,导致未定义行为或程序崩溃。
解决方法:
- 使用智能指针:使用
std::shared_ptr
或std::unique_ptr
来管理对象的生命周期,确保对象在所有线程使用完毕后才被销毁。 - 延长指针的生命周期:确保指针在所有线程结束之前有效。
3.普通成员函数作为线程入口
如果用一个对象本身来创建线程,那么这个对象的成员都是线程的参数,通过this指针来取,所以我们传递参数的时候也需要传递this指针。
4.lambda临时函数作为线程入口
[捕获列表] (参数) mutable -> 返回值类型 {函数体}
临时的lambda
类成员的lambda
注意:将this放进捕获列表中才可以访问到成员变量。
5.线程基类的封装
需求就是创建的每一种线程都是一种类,继承了线程基类,我们可以通过类的方法来控制线程。
线程管理的封装:
- 线程的启动、停止和等待操作被封装在基类中,派生类不需要重复实现这些逻辑,从而提高了代码的复用性和可维护性。
抽象与约束:
- 基类中的纯虚函数
Main
强制派生类必须实现具体的线程入口函数。这种设计提供了一种模板模式,确保所有派生类都有一个一致的线程入口。
隐藏实现细节:
- 子类只需要专注于具体任务的实现,而不需要关心线程的创建和管理细节。这使得代码更简洁,并降低了出错的可能性。
具体实现:
其中Stop函数里设置进程退出变量为true后,调用Wait
的目的是确保在线程停止之前,先等待线程执行完毕。这样做的原因是为了避免在主线程调用Stop
后立即销毁线程,从而导致线程对象在执行期间被销毁,可能会导致未定义的行为。通过调用Wait
方法,可以确保主线程等待子线程执行完毕后再继续执行后续操作。
cout << "." << flush;
的作用是将一个点字符.
输出到标准输出流,并立即刷新输出流,将其立即显示在终端上,而不是等到缓冲区满或者程序结束才显示。
四.多线程通信和同步
1.多线程状态
- 初始化(init):该线程正在被创建。
- 就绪(Ready):该线程在就绪列表中,等待CPU调度。
- 运行(Running):该线程正在运行。
- 阻塞(Blocked):该线程被阻塞挂起。Blocked状态包括:pend(锁、事件、信号量等阻塞)、suspend(主动pend)、delay(延时阻塞)、pendtime(因为锁、事件、信号量时间等超时等待)。
- 退出(Exit):线程运行结束,等待父线程回收其控制资源。
2.竞争状态和临界区
- 竞争状态
- 多线程同时读写共享数据:读没问题,写有问题
- 临界区
- 读写共享数据的代码片段
避免竞争状态的策略:对临界区进行保护,确保同时只能有一个线程进入临界区。
3.互斥体和锁mutex
不加互斥锁
加互斥锁
如果期望没有抢占到锁也做一些相应的处理,而不是阻塞等待,可以用try_lock()。
try_lock监控竞争锁的过程
缺点:try_lock有资源开销,但是try_lock这个函数有性能开销,因此try_lock后需要手动让线程阻塞一会。
3.1 互斥锁的坑
理想情况就是线程排队获取锁
- 线程抢占不到资源
- unlock后紧跟lock
因为unlock释放锁后,操作系统并不是立即响应,紧接着又获取锁,会造成同一个线程一直持有这种资源,其他线程抢占不到。
3.2 超时锁应用(避免长时间死锁)
3.3 递归锁(可重入锁)应用
可重入锁(Reentrant Lock),也称递归锁,是一种特殊的锁,允许同一个线程多次获取同一把锁而不会发生死锁。当线程持有锁时,可以再次获取这个锁,而不会被阻塞,这样就可以避免了死锁情况的发生。
可重入锁的实现通常会记录锁被持有的次数,每次获取锁时,记录持有锁的线程和获取锁的次数。只有当线程释放锁的次数与获取锁的次数相等时,其他线程才能获取这个锁。这样,线程在持有锁期间可以多次获取锁,而不会因为重复获取锁而发生死锁。
3.4 共享锁
- C++14共享超时互斥锁shared_timed_mutex
- c++17共享互斥锁shared_mutex
- 如果只有写时需要互斥,读时不需要,用普通锁如何实现?
共享锁(shared lock),也称为读写锁(reader-writer lock),是一种允许多线程并发读取但独占写入的同步机制。这种锁有两个主要操作模式:共享模式和独占模式。共享模式允许多个线程同时持有锁进行读操作,而独占模式只允许一个线程持有锁进行写操作。
1. 主要方法
- 共享锁操作:
lock_shared()
: 获取共享锁。如果其他线程已经持有独占锁,则阻塞。try_lock_shared()
: 尝试获取共享锁。如果成功则返回true
,否则返回false
。try_lock_shared_for()
: 尝试在指定的时间内获取共享锁。try_lock_shared_until()
: 尝试在指定的时间点前获取共享锁。unlock_shared()
: 释放共享锁。
- 独占锁操作:
lock()
: 获取独占锁。如果其他线程已经持有共享锁或独占锁,则阻塞。try_lock()
: 尝试获取独占锁。如果成功则返回true
,否则返回false
。try_lock_for()
: 尝试在指定的时间内获取独占锁。try_lock_until()
: 尝试在指定的时间点前获取独占锁。unlock()
: 释放独占锁。
2. 解决读写问题
-
线程读的时候,其他线程可以读。
-
线程写的时候,其他线程不能读也不能写。
-
主要针对写线程,如果写线程想要获取锁,需要等待所有持有读锁的线程全部释放完才可以获取。
3. 读写锁的问题
写饥饿问题
在高并发的读操作环境中,读线程可以频繁地获取和释放共享锁,而写线程必须等待所有读线程释放锁之后才能获取独占锁进行写操作。如果读线程源源不断,写线程可能永远无法获取到锁,导致写操作的延迟或完全无法执行。
解决写饥饿问题的方法
有几种方法可以缓解或解决写饥饿问题:
- 读者优先与写者优先策略:
- 读者优先(Reader-Preferred):默认策略,允许多个读线程同时持有锁,写线程只有在没有读线程时才能获取锁。这种策略容易导致写饥饿。
- 写者优先(Writer-Preferred):写线程一旦请求锁,会阻止后续读线程获取锁,直到写线程完成操作。这种策略可以避免写饥饿,但可能导致读饥饿。
- 公平读写锁:
- 使用公平锁(Fair Read-Write Lock)确保锁的获取顺序公平,无论是读线程还是写线程,都会按照请求的顺序获取锁。这种策略既可以避免写饥饿,又可以避免读饥饿。
- 提升写线程的优先级:
- 在读线程中检查是否有等待的写线程,如果有,则让出锁给写线程。
- 定时写操作:
- 写线程可以使用带超时的锁尝试函数(如
try_lock_for
),并在超时后进行一些处理或重试。
- 写线程可以使用带超时的锁尝试函数(如
C++标准库中的解决方案
C++标准库中没有直接提供解决写饥饿的高级同步机制,但可以通过自定义实现一些策略来缓解这个问题。
4.利用栈特性自动释放锁RAII
问题:之前锁都是我们自己手动释放的,但是如果忘记释放锁,就会导致死锁的发生。包括出现异常的情况,哪怕我们有对应的处理函数,发生异常后会导致锁没有被释放。
RAII(Resource Acquisition Is Initialization)使用局部对象来管理资源的技术称为资源获取即初始化。它的生命周期由操作系统来管理,无需人工介入。其核心思想是将资源的生命周期与对象的生命周期绑定在一起,通过对象的构造和析构来管理资源的获取和释放,从而确保在任何情况下资源都能被正确释放,避免资源泄漏。
RAII的主要原则包括:
- 资源获取即初始化: 在对象的构造函数中获取资源,在对象的析构函数中释放资源。通过对象的生命周期来管理资源的生命周期,确保资源在合适的时候被释放。
- 对象生命周期管理资源生命周期: 对象的创建和销毁是由编译器自动管理的,因此可以确保资源的获取和释放是在正确的时机进行的,不受外部因素的影响。
- 异常安全性: RAII可以保证在发生异常时资源能够被正确释放,避免资源泄漏和程序状态不一致的情况。
- 简化资源管理: RAII可以大大简化资源管理的代码,不再需要手动管理资源的获取和释放,提高了代码的可读性和可维护性。
4.1 手动实现RAII
五.RAII管理锁
1. C++11支持的RAII lock_guard
- lock_guard实现严格基于作用域的互斥体所有权包装器。
- 通过{}控制锁的临界区。
- 每一个lock_guard作用域结束,锁都会被自动释放
-
- lock_guard还可以对已经上锁的锁使用,再管理锁。
adopt_lock
关键字告诉lock_guard
构造函数,这个互斥锁已经被锁定,它只需要接管管理锁的责任,而不需要再次锁定它。
2. C++11支持的unique_lock
unique_lock实现可移动的互斥体所有权包装器。
支持临时释放锁。
支持adopt_lock。
支持defer_lock。
支持try_to_lock。
2.1 临时释放锁
2.2 支持adopt_lock
2.3 支持defer_lock
defer_lock
用于延迟锁定互斥锁。也就是说,在构造std::unique_lock
对象时,不会立即锁定互斥锁,而是由开发者在需要时手动锁定。
2.4 支持try_to_lock
try_to_lock
用于尝试锁定互斥锁。如果在构造std::unique_lock
对象时能够立即锁定互斥锁,则锁定成功;否则,不会阻塞,构造函数返回时互斥锁未锁定。
3.C++14 shared_lock
shared_lock实现可移动的共享互斥体所有权封装器。
4.C++17 scoped_lock
scoped_lock用于多个互斥体的免死锁RAII封装器。
先来看一下死锁的两个线程。
如果是C++11这么解决:
在C++11中,可以使用std::lock
函数来避免死锁问题。std::lock
函数提供了一种安全的方式来同时锁定多个互斥锁,从而避免死锁。std::lock
函数会尝试同时获取所有提供的锁,并确保在任何时候都不会发生死锁。
C++17:
六.项目案例:线程通信
封装线程基类控制线程的启动和停止。
模拟消息服务器线程,接收字符串消息,并模拟处理。
互斥访问list<string>消息队列。
主线程定时发送消息给子线程。
- 主线程通过向消息队列添加消息,将数据传递给子线程,子线程则从消息队列中读取并处理这些消息。
七.条件变量
1.生产者-消费者模型
生产者消费者共享资源变量(消息队列)。
生产者生产一个产品,通知消费者消费。
消费者阻塞等待信号-获取信号后消费产品。
在[项目六](# 六.项目案例:线程通信)中,实现的线程通信其实也是一种生产者消费者模型,但是两者是没有同步的,生产者生产一个产品没有立刻通知消费者,极端情况下,每次消费者都会延迟10ms(项目六)再去队列中取数据。
接下来引入条件变量解决这个问题。
std::condition_variable::wait
有两种常见的使用方式:一种是纯阻塞等待,另一种是使用 lambda 表达式或谓词进行条件检查。它们的行为略有不同。
- 纯阻塞等待的 wait:这种方式仅仅是阻塞线程,直到收到通知(
notify_one
或notify_all
),然后线程继续执行。 - 传入 lambda 的 wait:这种方式不仅阻塞线程,还在每次被唤醒时检查条件。首先通过 lambda 表达式(或谓词)检查条件,如果条件为真则立即继续执行,如果条件为假则继续等待。
纯阻塞等待 wait:
- 线程调用 wait,进入阻塞状态,释放互斥锁。
- 另一个线程调用 notify_one 或 notify_all,唤醒阻塞的线程。
- 被唤醒的线程重新获取互斥锁,继续执行后续代码。
传入 lambda 表达式的 wait:
- 线程调用 wait,首先检查 lambda 表达式的返回值。
- 如果返回 true,wait 立即返回,线程继续执行。
- 如果返回 false,线程进入阻塞状态,释放互斥锁。
- 另一个线程调用 notify_one或 notify_all,唤醒阻塞的线程。
- 被唤醒的线程重新获取互斥锁,再次检查 lambda 表达式的返回值。
- 如果返回 true,wait返回,线程继续执行。
- 如果返回 false,线程重新进入等待状态。
使用 lambda 表达式(或谓词)的 wait方法可以避免虚假唤醒。虚假唤醒是指线程在没有收到实际通知的情况下被唤醒。如果没有 lambda 表达式,每次被唤醒时都需要手动检查条件,并在条件不满足时再次调用 wait。
这里使用wait的第二种版本,如果条件为假,将会一直调用wait阻塞。那么在唤醒线程的时候,如果消息队列不为空,就会重新获得锁然后开始处理消息。
但是执行的时候会发现,线程一直没有退出,只有wait会造成线程卡死。分析一下,其实是lambda一直返回false,因为一共就发了十条消息,之后消息队列一直为空,此时恰巧线程在wait里,因为一直返回false,所以不停地wait,造成卡死。
归根到底,其实是Stop函数没有通知等待的线程,导致线程无法退出循环,同时在 cv.wait的条件中增加对 is_exit_的检查,以确保线程可以在收到退出信号后退出。
如果用线程基类提供的默认Stop函数,只是把最外层循环终止了,解决不了线程卡死在wait函数中的问题,因此需要修改一下Stop函数和wait函数使用的lambda表达式。
因此在Stop函数中增加一行通知所有线程。同时在线程处理函数中,判断如果线程此时需要退出(标志位),直接返回true。
八.多线程异步通信
1.promise和future
- promise用于异步传输变量
- promise提供存储异步通信的值,再通过其对象创建的future异步获得结果。promise只能使用一次。
- future提供访问异步操作结果的机制。
值得注意的地方
std::promise
对象不能被复制,只能被移动。- 只要子线程set_value,future.get()就停止阻塞开始返回。
future.get()
方法会阻塞调用它的线程,直到结果可用或出现异常,因此主线程不会在结果准备好之前继续执行。
2.packaged_task异步调用函数打包
packaged_task包装函数为一个对象,用于异步调用,其返回值能通过future对象访问。
与bind的区别,可以异步调用,函数访问和获取返回值分开调用。
不过一般来说,我们在执行异步操作的时候,主线程会等待多个执行异步操作的线程返回结果。但是如果出现死锁或者返回结果时间过长,会影响我们后续的操作,为了避免主线程无限期地等待某个异步操作的结果,可以使用超时机制。
3.async(异步函数调用)
异步运行函数,并返回保存函数运行结果的future。
- launch::deferred,延迟执行,在调用wait和get的时候,才调用函数代码,并且在调用时在主线程中执行。
- launch::async,创建线程执行线程函数,默认。
- 返回的futre类型是线程函数的返回值类型。
- get获取结果,会阻塞等待。
九.C++多核运算实现
手动实现多核base16编码
Base16编码将每个字节(8位)表示为两个十六进制字符(每个为4位)。
十六进制字符包括数字(0-9)和字母(A-F),或其小写形式(a-f)。每个十六进制字符表示4位二进制数据,因此两个十六进制字符可以表示一个字节的数据。
Base16编码的具体过程如下:
- 将二进制数据转换为十六进制表示:
- 每个字节(二进制数据的8位)被分成两个4位的部分。
- 每个4位部分被映射为一个十六进制字符。
- 组合十六进制字符:
- 将每个字节的两个十六进制字符组合起来,得到Base16编码的字符串。
1.测试单核base16编码效率
注意
在处理二进制数据时,我们主要关心的是数据的位级表示,而不需要关心数据的符号。使用无符号类型(如 unsigned char)可以避免与符号扩展相关的问题。
在 Base16 编码中,每个字节被分为两个 4 位的部分,高 4 位和低 4 位。这两个部分分别表示 0 到 15 的值,可以直接用作 base16 映射表的下标。
可以看到100兆字节数据,单线程处理耗时160毫秒。
2.测试多核base16编码效率
多线程切片划分的基本思想是将大的任务(数据集或计算任务)分割成若干较小的子任务,然后将这些子任务分配给多个线程并行处理。每个线程独立处理分配给它的子任务,所有线程完成各自任务后,主线程合并结果。
注意
当我们进行多线程处理时,每个线程处理一部分数据。如果不进行正确的偏移,多个线程可能会覆盖彼此的输出数据。因此,需要确保每个线程的输出数据位置正确。
in_data.data() + offset
确保每个线程从不同的位置开始处理数据。
因为每个输入字节会被转换成两个输出字符,所以输出数据的位置相对于输入数据的位置是两倍。out_data.data() + offset * 2
确保每个线程的输出数据不会覆盖其他线程的数据。
3.测试C++17多核base16编码效率
将输入大小改为256,打印容器的内容,这三种方法的结果都是一样的。
当数据大小为1G的时候,for_each的开销会很大,原因在于:for_each的并行执行策略会为每个元素创建一个独立的任务。如果数据量较大且每个任务处理时间较短,会导致大量的任务调度和函数调用开销。
对于较小的数据块,函数调用和任务调度的开销可能会超过并行处理带来的性能提升。
这次修改比之前耗时少多了,我们拷贝一份in_data和out_data的指针,通过减少容器相关操作( data()以及[ ] ),使得内存访问路径更短、更直接,优化了内存访问效率。
十.线程池的实现
1.基础版本
1.1初始化线程池
1.2启动所有线程
1.3准备任务处理基类
1.4插入任务
1.5获取任务接口
在获取任务接口的实现中,发现如果任务队列没有任务,需要阻塞线程直到有任务出现,因此需要用条件变量。
1.6执行任务线程入口函数
我们在任务基类中定义了一个纯虚函数Run,因此我们取到任务后,就通过对象调用自身的Run函数,这里用异常处理,以免一个线程发生异常导致整个程序崩溃。
2.版本v2.0
2.1 增加线程池的退出
is_exit_ 是一个标志变量,用于指示线程池是否应该退出。由于 is_exit_在整个程序生命周期中只会由 false变为true,并且这种状态变化是一次性的,因此它不需要特别的线程安全处理。多线程环境中对这个变量的读取操作不会导致数据竞争。
在Stop的具体实现中,我们设置完is_exit_的值后,通知所有阻塞在wait的线程。
同时,如果线程池退出了,此时执行任务的线程也要立刻退出,所以在线程池中开放了一个接口is_exit来访问is_exit_变量的值。在XTask基类中,增加一个函数指针,由线程池在添加任务的时候将接口is_exit传入。
function头文件:#include<functional>
2.2 显示线程池中正在运行的任务数量
run_task_num_ 是一个计数器,用于跟踪当前正在运行的任务数量。因为多个线程会同时读取和修改这个变量,所以需要使用线程安全的方式来管理它。这是典型的竞态条件(race condition),需要用原子操作来确保计数的正确性。
2.3 使用智能指针管理线程对象和任务对象的生命周期
我们的任务开辟在栈空间上,如果线程还在处理任务,结果任务对象被释放了,这就会导致程序崩溃。如果开辟在堆空间,那么又要考虑什么时候去delete这个对象。
采用智能指针的好处是,操作系统自动管理指针对象,我们不需要手动清理对象。
跟踪智能指针的生命周期。
从main函数开始分析,我们首先创建了一个task对象,然后调用线程池的AddTask方法将该对象存入任务队列中。
在main函数中,shared_ptr的引用计数是1,调用AddTask后shared_ptr的引用计数是2(在AddTask实现中,任务队列拷贝了一个副本指向同一块地址空间)。执行完这个语句块后,shared_ptr的引用计数减1。
在取出任务的实现中,我们创建一个临时对象保存这个指针,因此shared_ptr的引用计数加1,之后函数结束,栈空间释放,shared_ptr的引用计数又减1保持不变。
因为我们是在线程池的线程的入口函数中调用取出任务,同时定义一个对象存储取出的任务,此时shared_ptr的引用计数为2,直到任务执行完毕,离开作用域,shared_ptr的引用计数为0,系统自动清理资源。
2.4 异步获取线程池中任务执行的结果
我们首先在XTask基类中定义一个用来获取Run()执行结果的promise对象,同时定义一个接口设置promise对象的值,一个接口用来阻塞直到获得了设置的值。
接着我们在取出任务并执行任务的时候,获得任务的返回值,然后SetValue把值传递给promise对象。
最后我们在main函数中创建任务并插入任务队列后,调用GetReturn获得任务执行结果。
3.使用线程池实现音视频批量转码任务
这里使用控制台命令:ffmpeg -y -i test.mp4 -s 400x200 400.mp4
我们发现输出是直接打印在控制台中,我们不想要输出显示在控制台中,因此需要重定向到日志文件中。此外ffmpeg没使用默认输出,用的是错误输出2,因此2也重定向到日志文件中。
经过测试,控制台指令是可以用的,接下来考虑用linux和windows通用的c函数system在线程池中执行转码任务。
目标如下: