目录
🤫基础知识
🎂整体概述
🌼单例模式
懒汉 -- 双检锁
懒汉 -- 局部静态变量
饿汉
🌼条件变量 && 生产/消费者模型
条件变量 API 与 陷阱
基础 API
陷阱一
陷阱二
生产/消费者 模型
🎂阻塞队列 -- 代码
自定义队列
🤫基础知识
- 日志
服务器自动创建,并记录 运行状态 / 错误信息,访问数据的文件- 同步日志
日志写入函数 与 工作线程 串行执行,由于涉及 I / O 操作,当单条日志比较大的时候
同步模式会阻塞整个处理流程
服务器所能处理的并发能力有所下降
尤其在 峰值 的时候,写日志 可能成为系统的瓶颈- 生产者 / 消费者模型
并发编程中的经典模型
多线程为例,为了实现线程间数据同步,生产者线程 与 消费者线程 共享一个缓冲区,其中,生产者线程 向 缓冲区 push消息,消费者线程 从 缓冲区 pop消息- 阻塞队列
将 生产者 / 消费者模型进行封装,使用循环数组实现队列,作为两者共享的缓冲区- 异步日志
将所写的日志内容,先存入阻塞队列,写线程从阻塞队列中,取出内容,写入日志- 单例模式
最简单,也是考察最多的设计模式之一
保证一个类只创建一个实例,同时提供 全局访问 的方法
🎂整体概述
webserver中,使用单例模式创建日志系统
对服务器 运行状态 / 错误信息 / 访问数据 进行记录
该系统可以实现 按天分类,超行分类 功能
可以根据实际情况,分别使用同步和异步写入两种方式
其中,异步写入方式,将生产者 / 消费者模型,封装为阻塞队列,创建一个
写线程,工作线程将要写的内容 push 进队列
写线程从队列取出内容,写入日志文件
日志系统大致可以分为两部分,其一,单例模式与阻塞队列的定义
其二,日志类的定义与使用
本文内容
本篇介绍单例模式与阻塞队列的定义
具体的,涉及单例模式,生产/消费者模型,阻塞队列的代码实现
- 单例模式:描述懒汉与饿汉两种单例,并结合线程安全进行讨论
- 生产者 / 消费者模型,描述条件变量,基于该同步机制实现简单的生产者 / 消费者模型
- 代码实现,结合代码对阻塞队列的设计进行详解
🌼单例模式
引用一篇之前的文章👇
单例模式 C++-CSDN博客
以下再跟一遍 Github 的代码,加深印象
单例模式,一个类仅有一个实例,提供一个全局访问点,该实例被所有程序模块共享
实现思路👇
私有化它的构造函数,以防止外界创建单例类对象;
使用类的私有静态指针变量,指向类的唯一实例,并用一个公有的静态方法获取该实例
懒汉模式 -- 非常懒,不用的时候不去初始化,要到第一次被使用才进行初始化
饿汉模式 -- 即迫不及待,程序运行时,立即初始化
懒汉 -- 双检锁
经典的线程安全懒汉模式,使用 双检锁 模式
class single {
private:static single *p; // 静态指针变量,指向唯一实例static pthread_mutex_t lock; // 静态互斥锁single() { // 构造函数pthread_mutex_init(&lock, NULL); // 初始化互斥锁}~single(){} // 析构函数public:static single* getinstance(); // 获取实例的静态方法
};pthread_mutex_t single::lock; // 定义静态成员变量 locksingle* single::p = NULL; // 初始化静态指针 psingle* single::getinstance() { // 实现获取实例的静态方法if (NULL == p) { // 检查实例是否已经存在pthread_mutex_lock(&lock); // 加锁if (NULL == p) { // 再次检查实例是否已经存在p = new single; // 创建新实例}pthread_mutex_unlock(&lock); // 解锁}return p; // 返回实例指针
}
为什么要用 双检锁 呢,只检测一次不行吗?
如果只检测一次,每次调用获取实例的方法时,都需要加锁,会严重影响程序性能
而,双层检测,可以避免
仅在第一次创建单例时加锁,其他时候,都不再符合 NULL == p,
直接返回已创建好的实例
补充解释
双重检测(double-checked locking)是一种常见的单例模式实现方式,主要是为了在多线程环境下保证单例对象只被创建一次,同时尽可能地减少加锁的次数,提高程序性能。下面对为什么需要双重检测进行详细解释:
单次检测不够:如果只进行一次检测,即只有一个 if 语句来检查实例是否为空,那么在多线程环境下可能存在以下问题:
- 线程 A 检查到实例为空,然后获取锁并创建实例。
- 在线程 A 创建实例的过程中,线程 B 也通过了第一个检查,但此时实例还未被创建,线程 B 也会进入临界区。
- 这样就导致了多个线程同时创建实例,破坏了单例的唯一性。
双重检测的作用:
- 第一次检测:通过第一次 if 语句检查实例是否为空,可以避免多个线程进入临界区。如果实例已经被创建,则直接返回现有实例,不需要加锁。
- 加锁:只有当需要创建实例时,才会进入加锁的临界区,确保只有一个线程可以创建实例。
- 第二次检测:在获取锁之后再次检查实例是否为空,这是为了防止在等待锁的时候,另一个线程已经创建了实例。如果在第二次检测时发现实例已经被创建,那么当前线程就不需要再创建实例了,直接返回已有的实例
懒汉 -- 局部静态变量
前面的双检锁模式,不太优雅
《Effective C++》提供了另一种方式 -- 函数内的局部静态对象,
这种方式不用 加锁 和 解锁
class single {
private:single() {} // 私有构造函数,防止外部实例化对象~single() {} // 私有析构函数,防止外部删除对象public:static single* getinstance(); // 静态方法用于获取单例实例
};single* single::getinstance() {static single obj; // 静态局部变量,保证在程序运行期间只被初始化一次return &obj; // 返回单例对象的指针
}
这时候有人说了,这种方法 不加锁,会不会造成线程安全问题?
其实,C++oX以后,要求编译器保证内部静态变量的线程安全性,故C++0X之后,
该实现是线程安全的,C++0X之前仍需加锁,其中C++0X是C++11标准成为正是标准之前的临时草案名字
所以,使用C++11之前的标准,还是需要加锁,这里同样给出加锁版本
#include <pthread.h>class single {
private:static pthread_mutex_t lock; // 静态互斥锁,用于实现线程安全single() { // 私有构造函数,初始化互斥锁pthread_mutex_init(&lock, NULL);}~single(){} // 私有析构函数,防止外部删除对象public:static single* getinstance(); // 静态方法用于获取单例实例
};pthread_mutex_t single::lock; // 初始化静态互斥锁single* single::getinstance() {pthread_mutex_lock(&lock); // 加锁,确保只有一个线程进入临界区static single obj; // 静态局部变量,保证在程序运行期间只被初始化一次pthread_mutex_unlock(&lock); // 解锁,允许其他线程进入临界区return &obj; // 返回单例对象的指针
}
饿汉
饿汉模式,不需要加锁,就可以实现线程安全
原因在于,程序运行时就定义了对象,并对其初始化
之后,不管哪个线程调用成员函数 getinstance()
都只不过返回一个对象的指针而已
所以是线程安全的,不需要在获取实例的成员函数中加锁
#include <iostream>class single {
private:static single* p; // 静态成员变量,用于持有单例对象single() {} // 私有构造函数,防止外部实例化对象~single() {} // 私有析构函数,防止外部删除对象public:static single* getinstance(); // 静态方法用于获取单例实例
};single* single::p = new single(); // 在类外初始化静态成员变量,保证程序运行期间只被初始化一次single* single::getinstance() {return p; // 返回单例对象的指针
}int main() {single* p1 = single::getinstance(); // 获取单例对象single* p2 = single::getinstance(); // 再次获取单例对象if (p1 == p2) // 判断两个指针是否相同std::cout << "same" << std::endl; // 输出结果为 "same"system("pause"); // 用于在控制台暂停,仅用于特定编译环境return 0;
}
饿汉模式虽好,但其存在隐藏的问题,在于非静态对象(函数外的 static 对象),
在不同的编译单元中的初始化顺序是未定义的
如果初始化完成之前调用 getInstance() 方法,会返回一个未定义的实例
🌼条件变量 && 生产/消费者模型
条件变量 API 与 陷阱
总结
条件变量(Condition Variable)详解-阿里云开发者社区 (aliyun.com)
条件变量,提供了一种线程间的通信机制,当某个共享数据达到某个值,
唤醒等待这个共享数据的线程
基础 API
- pthread_cond_init() 函数,初始化条件变量
- pthread_cond_destory() 函数,销毁条件变量
- pthread_cond_broadscast(),广播方式,唤醒所有等待目标条件变量的线程
- pthread_cond_wait(),等待目标条件变量。该函数调用时,传入 mutex参数(加锁的互斥锁),函数执行时,先把调用线程放入条件变量的请求队列,然后将互斥锁 mutex 解锁,当函数返回 0 时,表示重新抢到了互斥锁,互斥锁会再次被锁上,
即 函数内部会有一次 解锁 和 加锁 操作
pthread_cond_wait() 使用方式👇
pthread_mutex_lock(&mutex); // 获取互斥锁while (condition) {pthread_cond_wait(&cond, &mutex); // 等待条件变量,同时释放互斥锁
}pthread_mutex_unlock(&mutex); // 释放互斥锁
pthread_cond_wait() 执行后的内部操作分以下几步👇
- 线程放在条件变量的请求队列后,内部解锁
- 线程等待被 pthread_cond_broadcast 信号
或
pthread_cond_signal 信号
唤醒,唤醒后去竞争锁- 若竞争到互斥锁,内部再次加锁
陷阱一
使用前要 加锁,为什么要 加锁?
多线程访问,为了避免资源竞争,所以要加锁,使得每个线程互斥的访问公有资源
pthread_cond_wait() 内部为什么要 解锁?
while 或 if 判断时,满足执行条件,线程便会调用 pthread_cond_wait 阻塞自己,此时它还在持有锁,如果不解锁,那么其他线程无法访问公有资源
具体到 pthread_cond_wait() 内部实现,当 pthread_cond_wait() 被调用线程阻塞时,
pthread_cond_wait() 会自动释放互斥锁
为什么要把 调用线程 放入条件变量的 请求队列 后再解锁?
线程是并发执行的,如果把调用线程A放在等待队列之前,就释放了互斥锁,意味着其他线程,比如线程B可以获得互斥锁去访问公有资源,这时候线程A所等待的条件改变了,但是它没有被放在等待队列上,导致A忽略了等待条件被满足的信号
如果在线程A调用 pthread)cond_wait() 开始,到把A放在等待队列过程中,都持有互斥锁,其他线程无法得到互斥锁,就不能改变公有资源
在使用条件变量时,通常会结合互斥锁一起使用来保护共享资源。当线程等待某个条件成立时,会通过调用
pthread_cond_wait()
函数来等待条件变量,并在等待过程中释放互斥锁。这是因为条件变量的目的是让线程在等待某个条件成立时能够释放互斥锁,让其他线程有机会去修改共享资源,并在条件满足时通知等待的线程
为什么最后还要加锁?
将线程放在条件变量的请求队列后,将其解锁,此时等待被唤醒,若成功竞争到互斥锁,再次加锁
陷阱二
为什么判断 线程执行 的条件用 while 而不是 if ?
一般来说,多线程资源竞争的时候,在一个使用资源的线程里面,
(消费者)判断资源是否可用,不可用,就调用 pthread_cond_wait()
在另一个线程里面,(生产者)如果判断资源可用,
则调用 pthread_cond_signal() 发送一个资源可用的信号
在 wait 成功之后,资源就一定可以被使用吗?不一定
如果同时有两个或者以上的线程,正在等待此资源
wait 返回后,资源可能已经被使用
再具体点,如果有多个线程都在等待这个资源的可用信号,
信号发出后只有一个资源可用,但是有 A,B 两个线程都在等待,
B 速度较快,获得互斥锁,然后加锁,消耗资源,然后解锁,
之后 A 获得互斥锁,但 A 回去发现资源已经被使用了,那么 A 有两种选择:
1)访问不存在的资源
2)继续等待
继续等待的条件只能使用 while,否则 if 的话,pthread_cond_wait() 返回后,就会
顺序执行下去
所以,这种情况,应该用 while 而不是 if 👇
while (resource == FALSE)pthread_cond_wait(&conf, &mutex);
如果只有一个消费者,使用 if 也可以
生产/消费者 模型
总结
- 生产者消费者模式是指多个进程共享一个固定大小的缓冲区,其中一个进程负责生产数据,另一个进程负责消费数据。
- 使用生产者消费者模式可以平衡生产者和消费者之间的处理能力,避免出现生产者等待消费者或消费者处理等待的情况。
- 缓冲区的作用是存储生产者生产的数据,起到数据缓存和解耦的作用。
- 特点包括保证生产者不会在缓冲区满时继续放入数据,消费者不会在缓冲区空时消耗数据,并通过进入休眠状态和唤醒来实现生产者和消费者之间的协调。
正文
以下摘抄《Unix环境高级编程》第 11 章的内容
关于 pthread_cond_wait() 介绍中,有一个 生产 / 消费者 的例子👇
process_msg 消费者,enqueue_msg 是生产者,struct msg* workq 缓冲队列
生产者,消费者是互斥关系,两者对缓冲区访问互斥,同时生产者和消费者又是一个相互协作与同步的关系,只有生产者生产之后,消费者才能消费
#include<pthread.h>struct msg {struct msg *m_next; // 消息队列中下一个消息的指针/* value */
};struct msg* workq; // 消息队列头指针
pthread_cond_t qready = PTHREAD_COND_INITIALIZER; // 条件变量,用于等待消息队列非空
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER; // 互斥锁,用于保护消息队列访问// 处理消息队列的消息
void process_msg() {struct msg* mp;for (;;) {pthread_mutex_lock(&qlock); // 加锁互斥量// 这里需要 while,而不是 ifwhile (workq == NULL) { // 如果消息队列为空,则等待消息队列非空pthread_cond_wait(&qread, &qlock);}mp = workq; // 取出队列中的消息workq = mp->m_next; // 更新队列头指针pthread_mutex_unlock(&qlock); // 解锁互斥量/* now process the message mp */ // 对消息进行处理}
}// 向消息队列添加消息
void enqueue_msg(struct msg* mp) {pthread_mutex_lock(&qlock); // 加锁互斥量mp->m_next = workq; // 将消息插入队列头部workq = mp; // 更新队列头指针pthread_mutex_unlock(&qlock); // 解锁互斥量// 此时另外一个线程在signal之前,执行了 process_msg,// 刚好把 mp 元素拿走pthread_cond_signal(&qready); // 唤醒等待的线程// 此时执行signal,pthread_cond_wait 中等待的线程被唤醒// 但是 mp 已经被另外一个线程拿走,所以,workq 还是 NULL,// 因此需要继续等待
}
补充👇
1)消息队列
消息队列通常用于处理并发请求。当多个客户端同时发送请求给服务器时,服务器需要将这些请求按照一定的顺序进行处理。
消息队列是一个存储请求的数据结构,它可以是队列、链表或其他形式。每当服务器接收到一个请求,它会将该请求添加到消息队列的末尾。然后,服务器会按照先入先出的原则从消息队列中取出请求,并逐个进行处理
🎂阻塞队列 -- 代码
阻塞队列中,封装了 生产 / 消费者 模型,其实 push成员 是生产者,pop成员 是消费者
使用 循环数组 实现队列,作为两者共享的缓冲区,当然,也可用 STL 的 queue
自定义队列
当队列为空,从队列中获取元素的线程会被挂起;
当队列满,向队列添加元素的线程会被挂起
👇仅罗列部分代码 -- push 和 pop成员函数
// 模板类,用于创建阻塞队列
class block_queue
{public:// 初始化 阻塞队列 私有成员block_queue(int max_size = 1000) {if (max_size <= 0)exit(-1); // 终止程序// 构造函数创建循环数组m_max_size = max_size;m_array = new T[max_size]; // 循环数组m_size = 0; // 大小m_front = -1; // 队头m_back = -1; // 队尾// 创建 互斥锁 和 条件变量m_mutex = new pthread__mutex_t;m_conf = new pthread_cond_t;pthread_mutex_init(m_mutex, NULL);pthread_cond_init(m_cond, NULL);}// 向队列添加元素 push// 需要将所有使用队列的线程先唤醒// 元素 push 进队列,相当于生产者生产了一个元素// 没有线程等待条件变量时,唤醒无意义bool push(const T &item){pthread_mutex_lock(m_mutex); // 获取互斥锁if (m_size >= m_max_size) { // 队列已满pthread_cond_broadcat(m_cond); // 发送广播pthread_mutex_unlock(m_mutex); // 释放互斥锁return false;}// 将所有新增数据放在循环数组的对应位置m_back = (m_back + 1) % m_max_size;m_array[m_back] = item;m_size++;pthread_cond_broadcast(m_cond);pthread_mutex_unlock(m_mutex);return true;}// pop时,如果当前队列没有元素,就等待条件变量bool pop(T &item){pthread_mutex_lock(m_mutex);// 多个消费者的话,用 while 不用 ifwhile (m_size <= 0) { // 队列为空// 重新抢到互斥锁后,pthread_cond_wait 返回 0// pthread_cond_wait(,) 等待条件变量if (0 != pthread_cond_wait(m_cond, m_mutex)){pthread_mutex_unlock(m_mutex);return false;}}// 取出队首的元素(循环数组模拟队列)m_front = (m_front + 1) % m_max_size;item = m_array[m_front];m_size--;pthread_mutex_unlock(m_mutex);return true;}// 增加超时处理,项目中未用到// pthrad_cond_wait 基础上增加了等待时间,// 指定时间抢到互斥锁即可// 其他逻辑不变bool pop(T &item, int ms_timeout){struct timespec t = {0, 0};struct timeval now = {0, 0};gettimeofday(&now, NULL); // 获取当前时间pthread_mutex_lock(m_mutex);if (m_size <= 0) { // 队列为空,设置超时时间t.tv_sec = now.tv_sec + ms_timeout / 1000;t.tv_nsec = (ms_timeout % 1000) * 1000;if (0 != pthread_cond_timedwait(m_cond, m_mutex, &t)){pthread_mutex_unlock(m_mutex);return false;}}if (m_size <= 0) {pthread_mutex_unlock(m_mutex);return false;}m_front = (m_front + 1) % m_max_size;item = m_array[m_front];m_size--;pthread_mutex_unlock(m_mutex);return true;}
};