目录
抢票逻辑代码:
thread.hpp
thread.cc
运行结果:
为什么票会抢为负数?
概念前言
临界资源
临界区
原子性
数据不一致
为什么数据不一致?
互斥
概念
pthread_mutex_init(初始化互斥锁)
pthread_mutex_lock(申请互斥锁)
pthread_mutex_unlock(释放互斥锁)
pthread_mutex_destory(销毁互斥锁)
全局的互斥锁
thread.cc 代码
局部的互斥锁
thread.cc 代码
抢票逻辑代码:
thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__
#include<vector>
#include<iostream>
#include<string>
#include<functional>
#include<pthread.h>
#include<unistd.h>namespace ThreadModule
{//给 函数参数为T(T为任意类型)的引用,返回值为 void 的函数 重命名为func_ttemplate<typename T>using func_t=std::function<void(T&)>;template<typename T>class Thread{public://线程的任务void Excute(){_func(_data);}public://构造函数Thread(func_t<T> func, T &data,const std::string &name="none-name"):_func(func),_data(data),_threadname(name),_stop(true){ }//如果没有static,由于 this 指针,函数的参数有2个,而pthread_create要求函数参数只能有void*//加上static,则要求函数不能访问类内的非静态成员变量,也就避免了this指针作为函数参数 static void* threadroute(void* args){//参数从void* 类型转为Thread<T> *类型,static_cast是一种相对安全的类型转换方式Thread<T> *self=static_cast<Thread<T> *>(args);//由于没有了this指针,所以需要封装Excute函数来传递 _data参数,从而执行任务self->Excute();return nullptr;}//开始执行任务bool Start(){//创建线程int n=pthread_create(&_tid,nullptr,threadroute,this);if(!n){ _stop=false;//修改状态return true;}else{return false;}}void Detach(){//有线程启动了才分离线程if(!_stop)pthread_detach(_tid);}void Join(){if(!_stop)pthread_join(_tid,nullptr);}std::string name(){return _threadname;}void Stop(){_stop=true;}//析构函数~Thread(){ }private:std::string _threadname;//线程名bool _stop;//该线程是否启动,true表示未启动,false表示已启动pthread_t _tid;T &_data;func_t<T> _func;//线程调用的函数};
}
#endif
thread.cc
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
#include<vector>
using namespace std;
#include"thread.hpp"
using namespace ThreadModule;int g_tickets=10000;//共享资源,未保护
const int num=4;void route(int &tickets)
{//票没抢完就一直抢while(true){if(tickets>0)//还有票,可以继续抢{usleep(1000);//目前抢到的票printf("get tickets:%d\n",tickets);tickets--;}else//已经没票了,不能抢了,退出{break;}}
}int main()
{std::vector<Thread<int>> threads;//创建线程for(int i=0;i<num;i++){std::string name="thread-"+std::to_string(i+1);threads.emplace_back(route,g_tickets,name);}//启动线程for(auto &threads:threads){threads.Start();}//等待线程for(auto &threads:threads){threads.Join();std::cout<<"wait thread done, thread is: "<<threads.name()<<std::endl;}//return 0;
}
运行结果:
发现票数被减为了负数,且有的票被重复抢了,每次运行的结果都不一样。
为什么票会抢为负数?
概念前言
临界资源
多线程执行流共享的资源称为临界资源。
临界区
每个线程内部,访问了临界资源的代码称为临界区。
原子性
不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。
数据不一致
在多线程或分布式系统中,由于并发操作或其他因素导致的数据状态不符合预期的情况。当多个线程或进程同时访问和修改共享资源时,如果没有适当的同步机制,可能会出现数据不一致的问题。这可能导致系统的不稳定、错误的结果或难以调试的行为。例如,上述的运行结果中出现了剩余的票数为负数的情况,而剩余的票数不应该出现负数,数据状态不符合预期,即数据不一致。
为什么数据不一致?
我们可以来模拟一下代码的整个运行过程。
在代码中,我们创建了4个线程,每个线程都要执行以下代码,函数内中一共有 3 个地方访问了临界资源,标为1、2、3:
void route(int &tickets)
{//票没抢完就一直抢while(true){//还有票,可以继续抢if(tickets>0)//1{usleep(1000);//目前剩下的票数printf("get tickets:%d\n",tickets);//2tickets--;//3}else//已经没票了,不能抢了,退出{break;}}
}
假设现在只剩一张票了,即 g_tickets = 1:
假设现在执行的是线程 1,线程 1 进行 tickets>0 的判断,这个判断过程是由 CPU 来完成的。
系统把 g_tickets 的值从内存读到 CPU 的寄存器 ebx 中,判断结果为真,线程 1 开始执行 if 的代码块,还没执行到打印操作,线程 1 被挂起并切走了,切走时线程 1 带走了寄存器的上下文数据,g_tickets 还是 1,还没有写回到内存中!
此时轮到线程 2 执行函数了,线程 2 也进行了 tickets>0 的判断,由于 g_tickets 的值依旧为 1,和线程 1 的过程一样,所以线程 2 也执行 if 的代码块, 线程 2 也还没有执行到打印操作,就被挂起并切走了。线程 3 同理。
再次轮到线程 1 时,由于已经进行过 if 判断了,线程 1 直接执行打印操作,对 tickets -- 并把 tickets 的值写回到内存中,g_tickets 的值变为 0。 这里我们需要了解到,tickets-- 看似只有一句代码,其实要分为三个过程来执行:
- 把 tickets 从内存中读到 CPU 中;
- CPU 进行 -- 操作;
- 把 tickets 的值写回内存中。
再次轮到线程 2,因为线程 2 已经进行过 if 判断了,线程 2 以为 tickets 还是1,也直接执行打印操作,把 tickets-- 并写回到内存中,但线程 2 在进行 tickets -- 操作时,读到的 tickets 已经是 0 了,-- 操作后,tickets 变为 -1,内存中的 g_tickets 的值变为 -1.
线程 3 也是同理,-- 后 tickets 变为 -2,写回到内存后,内存中的 g_tickets 的值变为 -2.
就这样 g_tickets 的值被减到了负数!
也就是说,多线程访问共享资源 g_tickets 时,由于共享资源 g_tickets 未被保护,且 -- 操作不是原子的,在执行任何一个步骤时线程都可能被切换,导致产生了计算过程的中间状态!
互斥
由于多个执行流访问全局数据的代码,所以会发生上面的问题,多个线程共享的全局数据就是临界资源,访问了全局数据的代码其实就是临界区,换句话说,保护临界区,就可以保护临界资源,就可以解决上面数据不一致的问题。
概念
任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,对临界资源起保护作用。
pthread_mutex_init(初始化互斥锁)
#include <pthread.h>int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
初始化一个互斥锁。
mutex
:指向要初始化的互斥锁对象的指针。
attr
:指向互斥锁属性对象的指针,可以为NULL
以使用默认属性。
pthread_mutex_lock(申请互斥锁)
#include <pthread.h>int pthread_mutex_lock(pthread_mutex_t *mutex);
尝试获取互斥锁。如果锁已经被其他线程持有,则当前线程将被阻塞,直到锁可用。
mutex
:指向要锁定的互斥锁对象的指针。
pthread_mutex_unlock(释放互斥锁)
#include <pthread.h>int pthread_mutex_unlock(pthread_mutex_t *mutex);
释放一个互斥锁,允许其他等待的线程获取该锁。
mutex
:指向要解锁的互斥锁对象的指针。
pthread_mutex_destory(销毁互斥锁)
#include <pthread.h>int pthread_mutex_destroy(pthread_mutex_t *mutex);
销毁一个互斥锁,释放相关资源。
mutex
:指向要销毁的互斥锁对象的指针。
加锁
注意加锁应该精细,只需要在临界区加锁,非临界区不需要加锁! 加锁成功后,只有申请到互斥锁的线程才可以访问临界区,其他线程阻塞等待,直到锁释放后,其他线程成功竞争到互斥锁,才可以访问临界区!
全局的互斥锁
如果互斥锁是全局的,或者静态的,则不需要 init 和 destory。
thread.cc 代码
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
#include<vector>
using namespace std;
#include"thread.hpp"
using namespace ThreadModule;int g_tickets=10000;//共享资源,未保护
const int num=4;//一把全局的锁
pthread_mutex_t gmutex=PTHREAD_MUTEX_INITIALIZER;void route(int &tickets)
{//票没抢完就一直抢while(true){pthread_mutex_lock(&gmutex);//申请锁//还有票,可以继续抢if(tickets>0)//1{usleep(1);//目前剩下的票数printf("get tickets:%d\n",tickets);//2tickets--;//3pthread_mutex_unlock(&gmutex);//释放锁}else//已经没票了,不能抢了,退出{pthread_mutex_unlock(&gmutex);//释放锁break;}}
}int main()
{std::vector<Thread<int>> threads;//创建线程for(int i=0;i<num;i++){std::string name="thread-"+std::to_string(i+1);threads.emplace_back(route,g_tickets,name);}//启动线程for(auto &threads:threads){threads.Start();}//等待线程for(auto &threads:threads){threads.Join();std::cout<<"wait thread done, thread is: "<<threads.name()<<std::endl;}//return 0;
}
不再出现抢到负数的票和抢到重复的票的情况了,且每次运行结果都一样:
局部的互斥锁
thread.cc 代码
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
#include<vector>
#include<mutex>
using namespace std;
#include"thread.hpp"
using namespace ThreadModule;int g_tickets=10000;//共享资源,未保护
const int num=4;//因为锁变成局部的,为了让route可以访问互斥锁,且统计每个线程抢到了多少张票,定义一个类
class ThreadData
{
public:ThreadData(int &tickets,std::string name,pthread_mutex_t &mutex):_tickets(tickets),_name(name),_mutex(mutex),_total(0){ }~ThreadData(){ }
public:int &_tickets;//所有线程最终都会引用同一个全局变量g_ticketsstd::string _name;int _total;pthread_mutex_t &_mutex;
};void route(ThreadData *td)
{//票没抢完就一直抢while(true){pthread_mutex_lock(&td->_mutex);//申请锁//还有票,可以继续抢if(td->_tickets>0)//1{usleep(1);//目前剩下的票数printf("%s running, get tickets:%d\n",td->_name.c_str(),td->_tickets);//2td->_tickets--;//3td->_total++;pthread_mutex_unlock(&td->_mutex);//释放锁}else//已经没票了,不能抢了,退出{pthread_mutex_unlock(&td->_mutex);//释放锁break;}}
}int main()
{//一把局部的锁pthread_mutex_t mutex;pthread_mutex_init(&mutex,nullptr);//初始化互斥锁std::vector<Thread<ThreadData*>> threads;std::vector<ThreadData*> datas;//创建线程for(int i=0;i<num;i++){std::string name="thread-"+std::to_string(i+1);ThreadData* td = new ThreadData(g_tickets,name,mutex);threads.emplace_back(route,td,name);datas.emplace_back(td);}//启动线程for(auto &threads:threads){threads.Start();}//等待线程for(auto &threads:threads){threads.Join();//std::cout<<"wait thread done, thread is: "<<threads.name()<<std::endl;}for(auto data:datas){std::cout<<data->_name<<" : "<<data->_total<<std::endl;delete data;}pthread_mutex_unlock(&mutex);//return 0;
}
封装成类
LockGuard.hpp 代码
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include<pthread.h>
#include<iostream>
class LockGuard
{
public:LockGuard(pthread_mutex_t *mutex):_mutex(mutex){pthread_mutex_lock(_mutex);//加锁}~LockGuard(){pthread_mutex_unlock(_mutex);//解锁}
private:pthread_mutex_t *_mutex;
};#endif
thread.cc 代码
#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<string>
#include<vector>
#include<mutex>
using namespace std;
#include"thread.hpp"
#include"LockGuard.hpp"
using namespace ThreadModule;int g_tickets=10000;//共享资源,未保护
const int num=4;//因为锁变成局部的,为了让route可以访问互斥锁,且统计每个线程抢到了多少张票,定义一个类
class ThreadData
{
public:ThreadData(int &tickets,std::string name,pthread_mutex_t &mutex):_tickets(tickets),_name(name),_mutex(mutex),_total(0){ }~ThreadData(){ }
public:int &_tickets;//所有线程最终都会引用同一个全局变量g_ticketsstd::string _name;int _total;pthread_mutex_t &_mutex;
};void route(ThreadData *td)
{//票没抢完就一直抢while(true){LockGuard guard(&td->_mutex);//临时对象//还有票,可以继续抢if(td->_tickets>0)//1{usleep(1);//目前剩下的票数printf("%s running, get tickets:%d\n",td->_name.c_str(),td->_tickets);//2td->_tickets--;//3td->_total++;}else//已经没票了,不能抢了,退出{break;}}
}int main()
{//一把局部的锁pthread_mutex_t mutex;pthread_mutex_init(&mutex,nullptr);//初始化互斥锁std::vector<Thread<ThreadData*>> threads;std::vector<ThreadData*> datas;//创建线程for(int i=0;i<num;i++){std::string name="thread-"+std::to_string(i+1);ThreadData* td = new ThreadData(g_tickets,name,mutex);threads.emplace_back(route,td,name);datas.emplace_back(td);}//启动线程for(auto &threads:threads){threads.Start();}//等待线程for(auto &threads:threads){threads.Join();//std::cout<<"wait thread done, thread is: "<<threads.name()<<std::endl;}for(auto data:datas){std::cout<<data->_name<<" : "<<data->_total<<std::endl;delete data;}pthread_mutex_unlock(&mutex);//return 0;
}