一.生产者-消费者模型
生产者-消费者模型是一个十分经典的多线程并发协作模式。所谓的生产者-消费者,实际上包含了两类线程,一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域(临界区)。就像是一个仓库,生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为;而消费者只需要从共享数据区中获取数据,不需要关心生产者的行为。举一个简单的例子,在网络通信过程中消费者可以是多个数据写入线程,负责向输入缓冲区写入以太网数据。消费者为多个数据处理线程,负责网络数据解析与处理。需要注意,这里的生产者与消费者是两类线程,实现了数据收发与数据处理的解耦合。好处一方面在于某一线程的错误不会导致整体传输-解析架构的崩溃,另一方面是可以使得整体程序结构变得清晰明了利于后期维护。
二.实现方式(条件变量)
std::conditon_variable(c11)在头文件<condition_variable>定义,它是与std::mutex一起使用的同步原语。用于阻塞一个线程或者同时阻塞多个线程,直至另一个线程修改共享条件变量并通知。阻塞多个线程等待通知有利于减少CPU负载,特别是在多个线程的情况下。举个例子:生产者线程修改输入缓存区后,通知消费者线程进行处理。消费者线程在没有接到通知时,一直处在阻塞等待通知的状态,对比死循环加条件判断的形式更节省OS资源消耗。
有意修改条件变量状态的线程需要注意:
1.必须获得std::mutex所有权。(上锁)
2.在保有锁的时候进行修改操作。
3.在std::condition_variable上执行notify_one或notify_all(释放锁后通知)。
任何有意在std::condition_variable上等待的线程需要注意:
1.在用于保护共享变量的互斥体上获得std::unique_lock<std::mutex>。
2.执行检查:
1.检查条件。
2.调用wait,wait_for,wait_until。
3.检查条件,并在为满足的条件下继续阻塞等待。
示例:与mutex同步实现进程间通信
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;void worker_thread()
{// 等待直至 main() 发送数据std::unique_lock lk(m);cv.wait(lk, []{ return ready; });// 等待后,我们占有锁std::cout << "工作线程正在处理数据\n";data += "(处理后)";// 发送数据回 main()processed = true;std::cout << "工作线程指示数据已经处理完成\n";// 通知前完成手动解锁,以避免等待线程才被唤醒就阻塞(细节见 notify_one)lk.unlock();cv.notify_one();
}int main()
{std::thread worker(worker_thread);data = "数据样例";// 发送数据到 worker 线程{std::lock_guard lk(m);ready = true;std::cout << "main() 指示数据已准备好进行处理\n";}cv.notify_one();// 等候 worker{std::unique_lock lk(m);cv.wait(lk, []{ return processed; });}std::cout << "返回 main(),data = " << data << '\n';worker.join();
}
需注意,在通知前先释放锁,不然会出现子线程唤醒后再次阻塞的风险(假通知)。这是一种常见的条件竞争。
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <thread>
using namespace std::chrono_literals;std::condition_variable cv;
std::mutex cv_m;
int i = 0;
bool done = false;void waits()
{std::unique_lock<std::mutex> lk(cv_m);std::cout << "等待... \n";cv.wait(lk, []{ return i == 1; });std::cout << "...结束等待; i == " << i << '\n';done = true;
}void signals()
{std::this_thread::sleep_for(200ms);std::cout << "假通知...\n";cv.notify_one(); // 等待线程被通知且 i == 0。// cv.wait 被唤醒,检查 i,再回到等待std::unique_lock<std::mutex> lk(cv_m);i = 1;while (!done) {std::cout << "真的改动通知...\n";lk.unlock();cv.notify_one(); // 等待线程被通知且 i == 1,cv.wait 返回std::this_thread::sleep_for(300ms);lk.lock();}
}int main()
{std::thread t1(waits), t2(signals);t1.join(); t2.join();
}
三.使用条件变量实现简单的多线程通信示例
#include <iostream>
#include <string>
#include <sstream>
#include <list>
#include <thread>
#include <condition_variable>
#include <mutex>std::mutex mux;
std::condition_variable cv;
std::list<std::string> buffer;void thRead(int i){for (;;) {std::unique_lock<std::mutex> lock(mux);cv.wait(lock, [](){ return !buffer.empty(); }); //wait解锁阻塞等待唤醒,上锁判断Lambda//如果Lambda返回true获得锁进入处理逻辑while (!buffer.empty()){std::cout << "Read Thread " << i << "\n" << buffer.front() << std::endl;buffer.pop_front();}}
}
void thWrite(){for (int i = 0; ; ++i) {std::unique_lock<std::mutex> lock(mux); //获取锁std::stringstream ss;ss << "Write " << i + 1 << " Times.";buffer.push_back(ss.str());lock.unlock();//先解锁再进行通知,防止读线程死锁cv.notify_one();std::this_thread::sleep_for(std::chrono::seconds(1)); //1s写入一次数据}
}int main() {std::thread write(thWrite);write.detach();for (int i = 0; i < 3; ++i) {std::thread th(thRead, i + 1);th.detach();}getchar();return 0;
}
四.通过信号量通知线程关闭
//
// Created by zty19 on 24-6-3.
//
#include "xmessage.h"
#include <iostream>void XMessage::stop(){exit_flag_ = true;cv.notify_all();wait();
}void XMessage::send(const std::string &msg) {std::unique_lock<std::mutex> lock(buffer_mutex_);send_buffer_.push_back(std::move(msg));lock.unlock();cv.notify_one(); //发布唤醒标志
}void XMessage::Main() {while (!is_exit()) {std::unique_lock<std::mutex> lock(buffer_mutex_);//std::this_thread::sleep_for(std::chrono::microseconds(100));cv.wait(lock, [this](){if (is_exit()) return true;return !send_buffer_.empty(); });if (is_exit()) break;while (!send_buffer_.empty()){std::string tmp = send_buffer_.front();send_buffer_.pop_front();std::cout << tmp << std::endl;}}
}