重点内容
视频讲解:《C++Linux编程进阶:从0实现muduo C++网络框架系列》-第6讲.C++死锁问题如何分析调试-原子操作,互斥量,条件变量的封装
代码改动
lesson6代码
-
实现:base/Atomic.h
-
实现:base/Mutex.h
-
实现:base/Condition.h/cc
-
examples/test_atomic_mutex.cc
1 AtomicIntegerT原子操作封装
1.1 封装意义
目的是更符号项目的用法,用起来更得心应手。
这种封装在网络库中特别有用,因为网络库经常需要处理并发场景,比如:
-
统计连接数
-
管理连接状态
-
处理引用计数
-
实现无锁队列
-
实现线程安全的计数器
1.2 类图
AtomicIntegerT<T>
├── 私有成员
│ └── volatile T value_
├── 构造/析构
│ ├── AtomicIntegerT() // 默认构造函数,初始值为0
│ └── explicit AtomicIntegerT(T x) // 构造函数,禁止隐式类型转换
├── 原子读取操作
│ └── T get() // 原子性地读取value_的值
├── 原子修改操作
│ ├── T getAndSet(T newValue) // 原子性地将value_设置为newValue,并返回之前的值
│ ├── T getAndAdd(T x)// 原子性地将value_加上x,并返回之前的值
│ ├── T addAndGet(T x)// 原子性地将value_加上x,并返回新的值
│ ├── T incrementAndGet() // 原子性地将value_加1,并返回之前的值
│ └── T decrementAndGet() // 原子性地将value_减1,并返回之前的值
└── 便捷操作├── void add(T x) // 原子性地将value_加上x├── void increment() // 原子性地将value_加1└── void decrement() // 原子性地将value_减1
使用typedef 定义了AtomicInt32 AtomicInt64
// 定义32位和64位原子整数类型
typedef detail::AtomicIntegerT<int32_t> AtomicInt32;
typedef detail::AtomicIntegerT<int64_t> AtomicInt64;
1.3 原子操作实现原理
-
使用GCC内置的原子操作函数:
__sync_val_compare_and_swap // CAS操作
__sync_lock_test_and_set // 原子交换
__sync_fetch_and_add // 原子加法// 原子性地读取value_的值
T get()
{return __sync_val_compare_and_swap(&value_, 0, 0);
}// 原子性地将value_设置为newValue,并返回之前的值
T getAndSet(T newValue)
{return __sync_lock_test_and_set(&value_, newValue);
}// 原子性地将value_加上x,并返回之前的值
T getAndAdd(T x)
{return __sync_fetch_and_add(&value_, x);
}
-
这些函数在编译时会被转换为CPU的原子指令
-
比如x86平台会使用lock前缀的指令
1.4 在muduo中的使用场景
class Thread : noncopyable
{static AtomicInt32 numCreated_; //静态变量,用来计算创建的线程序号
};void Thread::setDefaultName()
{int num = numCreated_.incrementAndGet(); //创建的线程序号if (name_.empty()){char buf[32];snprintf(buf, sizeof buf, "Thread%d", num); //设置唯一的线程名字name_ = buf;}
}
class TcpServer {AtomicInt32 started_; // 标记服务是否启动
};void TcpServer::start()
{if (started_.getAndSet(1) == 0) // 默认值是0,获取之前的值并设置新值,避免程序二次启动{threadPool_->start(threadInitCallback_);assert(!acceptor_->listening());loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));}
}
class Timer : noncopyable
{static AtomicInt64 s_numCreated_; //创建的定时器序号Timer(TimerCallback cb, Timestamp when, double interval): callback_(std::move(cb)),expiration_(when),interval_(interval),repeat_(interval > 0.0),sequence_(s_numCreated_.incrementAndGet()) //当前定时器的序号{ }
}
1.5 使用示例
// 测试原子操作
void testAtomic()
{printf("Testing Atomic operations...\n");AtomicInt32 a0;assert(a0.get() == 0);assert(a0.getAndAdd(1) == 0);assert(a0.get() == 1);assert(a0.addAndGet(2) == 3);assert(a0.get() == 3);assert(a0.incrementAndGet() == 4);assert(a0.get() == 4);assert(a0.decrementAndGet() == 3);assert(a0.get() == 3);printf("Atomic test passed!\n");
}
2 MutexLock/MutexLockGuard互斥量封装
2.1 封装意义
-
提供RAII风格的互斥锁管理
-
追踪锁的持有者,方便调试
-
防止误用和死锁
2.2 类图
MutexLock
├── 私有成员
│ ├── pthread_mutex_t mutex_
│ └── pid_t holder_
├── 构造/析构
│ ├── MutexLock()
│ └── ~MutexLock()
├── 锁操作
│ ├── void lock()
│ └── void unlock()
├── 调试功能
│ ├── bool isLockedByThisThread()
│ └── void assertLocked()
└── 友元类└── ConditionMutexLockGuard
├── 私有成员
│ └── MutexLock& mutex_
├── 构造/析构
│ ├── MutexLockGuard(MutexLock& mutex)
│ └── ~MutexLockGuard()
└── 禁止拷贝└── noncopyable
2.3 实现原理
-
使用pthread_mutex作为底层实现
-
通过RAII自动管理锁的获取和释放
-
通过gettid()追踪锁的持有者
2.4 在muduo中的使用场景
使用的场景较多,这里只展示了在EventLoop,ThreadPool中的使用。
void EventLoop::queueInLoop(Functor cb)
{{MutexLockGuard lock(mutex_); //加锁pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();}
}
size_t ThreadPool::queueSize() const
{MutexLockGuard lock(mutex_); //加锁return queue_.size();
}
2.4 使用示例
muduo库封装的Mutex功能很多,但我们目前知道调用
MutexLockGuard lock(mutex); 加锁就行,其他的用法不做要求。
isLockedByThisThread 是一个用于调试和断言的重要函数,让我详细解释它的使用:
1.基本用法(掌握这里就行)
MutexLock mutex;
{MutexLockGuard lock(mutex);
}
2.常见使用场景(不做要求)
class ThreadSafeClass {
public:void method1() {MutexLockGuard lock(mutex_);// 确保在持有锁的情况下调用method2method2(); // 内部方法调用}private:void method2() {// 确保调用此方法时已经持有锁mutex_.assertLocked(); // 如果未持有锁,会触发断言// 执行操作...}MutexLock mutex_;
};
3.调试死锁(不做要求)
class Resource {
public:void access() {MutexLockGuard lock(mutex_);// 检查是否真的持有锁if (!mutex_.isLockedByThisThread()) {printf("Warning: Lock not held by current thread!\n");}// 执行操作...}private:MutexLock mutex_;
};
4.防止误用(不做要求)
class SafeCounter {
public:void increment() {// 确保在持有锁的情况下修改数据if (!mutex_.isLockedByThisThread()) {printf("Error: Must hold lock to modify counter!\n");return;}count_++;}private:MutexLock mutex_;int count_;
};
5.实际应用示例(不做要求)
class ThreadSafeQueue {
public:void push(int value) {MutexLockGuard lock(mutex_);// 确保在持有锁的情况下调用内部方法doPush(value);}private:void doPush(int value) {// 确保调用此方法时已经持有锁mutex_.assertLocked();queue_.push(value);}MutexLock mutex_;std::queue<int> queue_;
};
3 Condition条件变量的封装
3.1 封装意义
-
提供线程间的同步机制
-
实现生产者-消费者模式
-
支持超时等待功能
-
与MutexLock配合使用,确保线程安全
3.2 类图
Condition
├── 私有成员
│ ├── MutexLock& mutex_ // 互斥锁引用
│ └── pthread_cond_t pcond_ // 条件变量
├── 构造/析构
│ ├── explicit Condition(MutexLock& mutex)
│ └── ~Condition()
├── 等待操作
│ ├── void wait() // 等待通知
│ └── bool waitForSeconds(double seconds) // 超时等待
└── 通知操作├── void notify() // 通知一个等待线程└── void notifyAll() // 通知所有等待线程
3.3 实现原理
本质是调用pthread线程库的接口 以及封装的MutexLock。
class Condition : noncopyable....MutexLock& mutex_; // 互斥锁引用pthread_cond_t pcond_; // 条件变量
};
3.4 在muduo中的使用场景
用于ThreadPool任务队列的生产者-消费者模型。
用于EventLoopThread等等IO Loop线程创建成功。
class ThreadPool : noncopyable
{mutable MutexLock mutex_;Condition notEmpty_ GUARDED_BY(mutex_);Condition notFull_ GUARDED_BY(mutex_); //用来处理任务队列(生产者消费者模式)
}
EventLoop* EventLoopThread::startLoop()
{assert(!thread_.started());thread_.start();EventLoop* loop = NULL;{MutexLockGuard lock(mutex_);while (loop_ == NULL){cond_.wait(); //等等loop线程创建成功}loop = loop_;}return loop;
}void EventLoopThread::threadFunc()
{EventLoop loop;if (callback_){callback_(&loop);}{MutexLockGuard lock(mutex_);loop_ = &loop;cond_.notify(); //发通知,io loop线程已经准备好了}loop.loop();//assert(exiting_);MutexLockGuard lock(mutex_);loop_ = NULL;
}
使用示例
// 生产者-消费者模式
class Buffer {
public:Buffer(int size) : size_(size), notEmpty_(mutex_), notFull_(mutex_) {}void put(int item) {MutexLockGuard lock(mutex_);while (queue_.size() == size_) {notFull_.wait(); // 等待队列不满}queue_.push(item);notEmpty_.notify(); // 通知消费者}int get() {MutexLockGuard lock(mutex_);while (queue_.empty()) {notEmpty_.wait(); // 等待队列不空}int item = queue_.front();queue_.pop();notFull_.notify(); // 通知生产者return item;}private:MutexLock mutex_;Condition notEmpty_;Condition notFull_;std::queue<int> queue_;int size_;
};
关键特性
-
与MutexLock配合使用
-
支持超时等待
-
提供广播通知
-
自动管理锁的释放和获取
使用建议
-
总是与MutexLock配合使用
-
使用while循环检查条件
-
注意通知的时机
-
合理使用notify和notifyAll
-
考虑使用超时机制避免死锁
这种封装在网络库中特别有用,因为:
-
需要处理异步事件
-
需要实现任务队列
-
需要处理定时器
-
需要实现线程池
-
需要处理生产者-消费者模式
4 完整的测试范例
完整测试代码:examples/test_atomic_mutex.cc
4.1 测试框架图
test_atomic_mutex.cc
├── 基础功能测试
│ ├── testAtomic() // 原子操作测试
│ └── testMutex() // 互斥锁测试
└── 条件变量测试└── testCondition() // 生产者-消费者模式测试├── Buffer类├── producer线程└── consumer线程
4.2 原子操作测试流程图
testAtomic()
├── 初始化测试
│ └── AtomicInt32 a0
├── 基本操作测试
│ ├── get() == 0
│ ├── getAndAdd(1) == 0
│ └── get() == 1
├── 复合操作测试
│ ├── addAndGet(2) == 3
│ └── get() == 3
└── 自增自减测试├── incrementAndGet() == 4├── get() == 4├── decrementAndGet() == 3└── get() == 3
4.3 互斥锁测试流程图
testMutex()
├── 创建互斥锁
│ └── MutexLock mutex
├── 加锁测试
│ ├── MutexLockGuard lock(mutex)
│ └── assert(mutex.isLockedByThisThread())
└── 解锁测试└── 作用域结束自动解锁
4.4 生产者-消费者模式原理图
[生产者线程1] [生产者线程2]↓ ↓└──────┐ ┌──┘↓ ↓[Buffer缓冲区]↓ ↓┌──────┘ └──┐↓ ↓
[消费者线程1] [消费者线程2] [消费者线程3]
4.5 Buffer类结构图
Buffer类
├── 私有成员
│ ├── size_t size_ // 缓冲区大小
│ ├── MutexLock mutex_ // 互斥锁
│ ├── Condition notEmpty_ // 不空条件变量
│ ├── Condition notFull_ // 不满条件变量
│ └── std::queue<int> queue_ // 数据队列
└── 公共方法├── put(int item) // 生产者方法└── get() // 消费者方法
4.6 线程同步流程图
4.6 测试输出
具体看代码打印
lesson6/build$ ./bin/test_atomic_mutex
5 章节总结
重点:
-
原子封装的目的是提供对于项目更易用的接口,并大致了解下
-
gcc原子操作相关的函数,比如__sync_fetch_and_add
-
了解Atomic MutexLock和Condition等封装在muduo网络库的使用。
-
掌握MutexLock类里保存获得锁的线程ID的作用,这样更方便分析死锁问题。