Linux系统编程 --- 多线程

 线程:是进程内的一个执行分支,线程的执行粒度,要比进程要细。

一、线程的概念

1、Linux中线程该如何理解

地址空间就是进程的资源窗口。

  • 在一个程序里的一个执行路线就叫做线程(thread)。更准确的定义是:线程是一个进程内部的控制序列”
  • 一切进程至少都有一个执行线程
  • 线程在进程内部运行,本质是在进程地址空间内运行
  • Linux系统中,在CPU眼中,看到的PCB都要比传统的进程更加轻量化
  • 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流

Linux实现方案:

        在Linux中,线程在进程内部执行,线程在进程的地址空间内运行。任何执行流要执行,都要有资源,地址空间是进程的资源窗口。

        在Linux中,线程的执行粒度要比进程要更细,线程执行进程代码的一部分。

2、重新定义线程和进程

什么叫做线程呢?线程是操作系统调度的基本单位。

什么叫做进程呢?内核观点,进程是承担分配系统资源的基本实体。

执行流也是资源。

如何理解我们以前的进程呢?

操作系统以进程为单位,分配资源,我们当前的进程内部,只有一个执行流。常规情况就是,一个进程里面有多个执行流。

Linux 设计者复用进程数据结构和管理算法。struct task_struct --- 模拟线程。Linux没有真正意义的线程,他没有对应的 TCB 结构,而是使用进程的内核和数据结构模拟线程。

3、重谈地址空间

虚拟地址是如何转换到物理地址的?

虚拟地址是32位的, 并不是一个整体,把他分成了10 + 10 + 12前十位找到二级页表,中间十位找到页框,后面12位找到页框里的地址。

二级页表大部分情况都是不全的 ,线程目前分配资源,本质就是分配地址空间的范围。

4、Linux线程周边的概念

线程 vs 进程 切换问题

线程比进程更轻量化,为什么?

a.创建和释放轻量化。

b.切换更加轻量化(运行) 整个生命周期。

线程内的切换,不需要重新切换cache中的数据。、

共享:文件描述符表,私有:栈和上下文

样例代码:

#include <iostream>
#include <pthread.h>
#include <unistd.h>void* threadRun(void* thread)
{while(true){std::cout << "new thread tid: " << getpid() << std::endl;sleep(1);}return nullptr;
}
int main()
{//创建一个线程pthread_t tid;pthread_create(&tid,nullptr,threadRun,nullptr);while(true){std::cout << "main thread tid: " << getpid() << std::endl;sleep(1);}
}

内核中没有线程的概念,操作系统不会提供线程的系统调用,只会给我们提供轻量级进程的系统调用!我们用户需要线程的接口,所以就有了应用层pthread线程库,轻量级进程接口进行封装,为用户提供直接线程的接口。几乎所有的LInux平台,都是默认自带这个库的!Linux中编写多线程代码,需要使用第三方pthread库!

快速使用线程接口:

创建线程:

thread 线程id,attr 线程的属性设置成nullptr,start_routine 函数指针

void* 返回值,返回任意的指针类型,8个字节。void大小为1个字节

arg 创建线程成功,新县城回调线程函数的时候,需要参数,这个参数就是给线程函数传递的。

成功0被返回,errno不会被设置。!0表示错误码。

查看轻量级进程。

轻量级进程的lwp,light weight process

 

任何一个线程被杀掉了,进程也会退出, kill -9 发给了进程。全区变量是线程间共享的。 

线程id是什么东西呢?乍一看他像地址,实质上他是什么呢?

拿到自己的线程id

clone函数被我们的原生线程库封装。

线程库调用系统调用,参数为回调函数,独立栈。所以线程的概念是库给我们维护的。书写的原生线程库,需要加载到内存中,所以我们的讨论都在内存中。线程库需要维护线程的概念不用维护线程的执行流。所以线程库里面都要维护多个线程属性集合,线程库需要管理线程,先描述再组织。我们的线程称为用户级线程。

tid是每一个线程的库级别的tcb的起始地址。是共享区中的一个地址。除了主线程,所有其他线程的独立栈,都是在共享区,具体来说是在pthread库中,tid指向的用户tcb中!

每一个线程在运行的时候都有独自的栈结构,都有自己的调用链。

给新线程传递参数。

二、线程的控制

线程的等待:

#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <pthread.h>
//实现一个累加器,让新线程计算,返回给主线程
//设置请求任务
class request
{
public:request(int start,int end,std::string threadname):_start(start),_end(end),_threadname(threadname){}
public:int _start;int _end;std::string _threadname;
};//任务的回复
class response
{
public:response(int result,int exitcode):_result(result),_exitcode(exitcode){}
public:int _result;int _exitcode;
};void* sumCount(void* args)
{//强转为request指针request* rq = static_cast<request*>(args);//计算求和response* res = new response(0,0);for(int i = rq->_start;i <= rq->_end; i++){   std::cout << rq->_threadname << " is runing, caling..., " << i << std::endl;res->_result += i;usleep(1000);}delete rq;return res;
}
int main()
{//创建新线程pthread_t tid;request* rq = new request(1,100,"[new thread]");pthread_create(&tid,nullptr,sumCount,rq);//进行等待//8个字节void* res;pthread_join(tid,&res);//强转response* rsp = static_cast<response*>(res);std::cout << "rsp->result: " << rsp->_result << ", exitcode: " << rsp->_exitcode << std::endl;delete rsp;return 0;
}

主线程最后退出。

thread:等待线程的id,

value_ptr: 二级指针传参,解引用访问到void*  x   函数内部 *retval = z 就把z传递到了x中。就拿到了新线程的退出状态。

线程函数执行完后,线程就退出了。主线程等待的时候,默认是阻塞等待的! 

#include <iostream>
#include <unistd.h>
#include <pthread.h>
int g_val = 0;
void* threadRoutine(void* args)
{const char* name = static_cast<const char*>(args);int cnt = 5;while (true){//printf("%s,  pid: %d, g_val: %d, &g_val: 0x%p\n", name,getpid(), g_val, &g_val);std::cout << "thread name:" << name << std::endl;sleep(1);if(cnt == 0){break;}cnt--;}//新线程发生异常时,主线程会自动退出,新线程发送 值为-1 的宏return nullptr;
}
int main()
{//创建线程pthread_t tid;pthread_create(&tid,nullptr,threadRoutine,(void*)"new thread");//pthread_joinvoid* retval;pthread_join(tid,&retval);std::cout << "main thread quit ..., ret: " << (long long int)retval << std::endl;return 0;
}
#include <iostream>
#include <unistd.h>
#include <pthread.h>
int g_val = 0;
void* threadRoutine(void* args)
{const char* name = static_cast<const char*>(args);int cnt = 5;while (true){//printf("%s,  pid: %d, g_val: %d, &g_val: 0x%p\n", name,getpid(), g_val, &g_val);std::cout << "thread name:" << name << std::endl;sleep(1);if(cnt == 0){break;}//发生浮点数错误int x = 10;int z = x/0;cnt--;}//新线程发生异常时,主线程会自动退出return nullptr;
}
int main()
{//创建线程pthread_t tid;pthread_create(&tid,nullptr,threadRoutine,(void*)"new thread");//pthread_joinvoid* retval;pthread_join(tid,&retval);std::cout << "main thread quit ..., ret: " << (long long int)retval << std::endl;return 0;
}

上述代码会产生浮点数错误。

exit 是用来终止进程的!不能直接用来终止线程。

        

终止线程的函数。也可以使用返回值的方式。

void* threadRoutine(void* args)
{const char* name = static_cast<const char*>(args);int cnt = 5;while (true){//printf("%s,  pid: %d, g_val: %d, &g_val: 0x%p\n", name,getpid(), g_val, &g_val);std::cout << "thread name:" << name << std::endl;sleep(1);//exit(22);if(cnt == 0){break;}//发生浮点数错误// int x = 10;// int z = x/0;//cnt--;pthread_exit((void*)100);}//新线程发生异常时,主线程会自动退出return nullptr;
}

线程取消,不常见

int main()
{//创建线程pthread_t tid;pthread_create(&tid,nullptr,threadRoutine,(void*)"new thread");//join的时候会得到一个宏,PTHREAD_CANCEL 值为-1。pthread_cancel(tid);//pthread_joinvoid* retval;pthread_join(tid,&retval);std::cout << "main thread quit ..., ret: " << (long long int)retval << std::endl;return 0;
}

join的时候会得到一个宏,PTHREAD_CANCEL 值为-1。

所有语言的多线程都会封装Linux的原生线程库。

用户级线程 + 内核的轻量级进程 = Linux线程。Linux线程就是用户级线程。

都有独立的栈结构,其实线程和线程之间没有秘密,线程的栈上的数据,也是可以被其他线程看到并访问的。

全局变量是被所有的线程同时看到并访问的!就变成了临界资源。如果线程想要一个私有的全局变量呢?定义全局变量时在前面加上__thread,它只能定义内置类型,自定义变量不可以被它修饰,称为线程的局部存储。它是线程级别的全局变量。在线程里面获取pid和tid时我们不需要传参,直接使用局部存储的变量就可以了。

代码:

#include <iostream>
#include <unistd.h>
#include <pthread.h>
int g_val = 100;
void *threadRoutine(void *args)
{const char *name = static_cast<const char *>(args);int cnt = 5;while (true){// printf("%s,  pid: %d, g_val: %d, &g_val: 0x%p\n", name,getpid(), g_val, &g_val);printf("%s,  pid: %d, g_val: %d, &g_val: 0x%p\n", name,getpid(), g_val, &g_val);//std::cout << "thread name:" << name << std::endl;sleep(1);// exit(22);if (cnt == 0){break;}// 发生浮点数错误//  int x = 10;//  int z = x/0;// cnt--;// pthread_exit((void*)100);cnt--;}// 新线程发生异常时,主线程会自动退出return nullptr;
}
int main()
{// 创建线程pthread_t tid;pthread_create(&tid, nullptr, threadRoutine, (void *)"new thread");// join的时候会得到一个宏,PTHREAD_CANCEL 值为-1。// pthread_cancel(tid);while (true){printf("main thread pid: %d, g_val: %d, &g_val: 0x%p, create new thread tid: %p\n", getpid(), g_val, &g_val, tid);std::cout << "main thread, pid: " << getpid() << ", g_val: " << g_val << ", &g_val:" << &g_val << std::endl;sleep(1);g_val++;}// pthread_joinvoid *retval;pthread_join(tid, &retval);std::cout << "main thread quit ..., ret: " << (long long int)retval << std::endl;return 0;
}

 

线程分离:

默认情况下,新创建的线程是 joinable 的,线程退出后,需要对其进行 pthread_join 操作,否则无法释放 资源,从而造成系统泄漏。
如果不关心线程的返回值, join 是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线 程资源。

代码:

int main()
{// 创建线程pthread_t tid;pthread_create(&tid, nullptr, threadRoutine, (void *)"new thread");// join的时候会得到一个宏,PTHREAD_CANCEL 值为-1。// pthread_cancel(tid);// while (true)// {//     printf("main thread pid: %d, g_val: %d, &g_val: 0x%p, create new thread tid: %p\n", getpid(), g_val, &g_val, tid);//     std::cout << "main thread, pid: " << getpid() << ", g_val: " << g_val << ", &g_val:" << &g_val << std::endl;//     sleep(1);//     g_val++;// }pthread_detach(tid);// pthread_join//void *retval;//pthread_join(tid, &retval);//std::cout << "main thread quit ..., ret: " << (long long int)retval << std::endl;return 0;
}

三、线程的互斥

共享数据,数据的不一致问题,肯定是和多线程并发访问是有关系的。对一个全局变量进行多线程并发操作不是安全的。

代码:

#include <cstdio>
#include <iostream>
#include <cstring>
#include <string>
#include <unistd.h>
#include <vector>
#define NUM 10
int tickets = 1000;
class threaddata
{
public:threaddata(int name){threadname = "thread-" + std::to_string(name);}
public:std::string threadname;
};
void* getTicks(void* args)
{//强转threaddata* td = static_cast<threaddata*>(args);const char* name = td->threadname.c_str();while (true){if(tickets > 0){usleep(1000);printf("who=%s,get a ticket:%d\n",name,tickets);tickets--;}else{break;}}printf("%s ... quit\n", name);return nullptr;
}
int main()
{std::vector<threaddata*> dates;std::vector<pthread_t> tids;for(int i = 1; i <= NUM; i++){threaddata* date = new threaddata(i);dates.push_back(date);pthread_t tid;//创建子线程pthread_create(&tid,nullptr,getTicks,dates[i - 1]);tids.push_back(tid);}//等待for(auto thread:tids){pthread_join(thread,nullptr);}//释放资源for(auto td: dates){delete td;}return 0;
}

ticket-- 

1、先将tickets读入到CPU寄存器中;2、CPU内部进行运算操作。3、将计算结果写回内存。每一步都会对应一条汇编操作。

--操作并不是原子性操作,会导致数据不一致。
这给问题怎么解决呢?对共享数据的任何访问,保证任何时候只有一个执行流访问! ------ 互斥!引入锁的概念。

生成锁:

使用锁:

加锁的本质是使用时间换安全。加锁的表现是线程对于临界区代码串行执行。加锁的原则是尽量保证临界区代码越少越好!

代码:

#include <cstdio>
#include <iostream>
#include <cstring>
#include <string>
#include <unistd.h>
#include <vector>
#define NUM 10
int tickets = 1000;
class threaddata
{
public:threaddata(int name,pthread_mutex_t* lock){threadname = "thread-" + std::to_string(name);_mutex = lock;}
public:std::string threadname;pthread_mutex_t* _mutex;
};
void* getTicks(void* args)
{//强转threaddata* td = static_cast<threaddata*>(args);const char* name = td->threadname.c_str();while (true){//使用锁pthread_mutex_lock(td->_mutex);if(tickets > 0){usleep(1000);printf("who=%s,get a ticket:%d\n",name,tickets);tickets--;pthread_mutex_unlock(td->_mutex);}else{pthread_mutex_unlock(td->_mutex);break;}//不加这个会产生其他线程的饥饿问题,通过同步机制解决这个问题。usleep(13);}printf("%s ... quit\n", name);return nullptr;
}
int main()
{//生成一个锁pthread_mutex_t lock;//初始化一个锁pthread_mutex_init(&lock,nullptr);std::vector<threaddata*> dates;std::vector<pthread_t> tids;for(int i = 1; i <= NUM; i++){threaddata* date = new threaddata(i,&lock);dates.push_back(date);pthread_t tid;//创建子线程pthread_create(&tid,nullptr,getTicks,dates[i - 1]);tids.push_back(tid);}//等待for(auto thread:tids){pthread_join(thread,nullptr);}//释放资源for(auto td: dates){delete td;}//释放一个锁pthread_mutex_destroy(&lock);return 0;
}

申请锁成功才能往后走。不成功,阻塞等待。

细节问题:

没有sleep一直再被一个线程抢是正常的,线程对于锁的竞争能力会不同。

我们抢到了票,我们不会立马抢下一张。其实多线程还要执行得到票之后的后续动作。usleep模拟。

纯互斥环境,如果锁分配不够合理,容易导致其他线程的饥饿问题。不是说只要有互斥,必有饥饿。适合纯互斥的场景,就用互斥。为了解决这个问题,设置了两个规则:线程必须排队,出来的人,不能立马重新申请锁,必须排到队列的尾部。让所有的线程获取锁,按照一定的顺序。按照一定的顺序性获取资源叫做同步问题!

锁本身就是共享资源!那么谁来保护锁的安全呢?申请锁和释放锁本身就被设计称为了原子性操作。那么这是怎么做到的呢?锁的原理。
原子:一条汇编语句就是原子的!

为了实现互斥锁操作 , 大多数体系结构都提供了 swap exchange 指令 , 该指令的作用是把寄存器和内存单 元的数据相交换, 由于只有一条指令 , 保证了原子性 , 即使是多处理器平台 , 访问内存的 总线周期也有先后 , 一 个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。 现在我们把lock unlock 的伪 代码改一下

return 0 申请锁成功。 交换的本质就是把内存中的数据,交换到CPU的寄存器中。把数据交换到线程的硬件上下文中。把一个共享的锁,让一个线程一条交换汇编的方式,交换到自己的上下文中。当前线程持有锁。

在临界区中,线程可以被切换,在线程被切换出去的时候,是持有锁被切换出去的。所以我不在期间,照样没有人能进入临界区访问临界资源!对于其他线程来讲,一个线程要么持有锁,要么释放锁。当前线程访问临界区的过程,对于其他线程是原子的!

锁的应用:

做一个锁的封装。

代码:

#pragma once
#include <pthread.h>class mutex
{
public:mutex( pthread_mutex_t * mutex):_lock(mutex){}void Lock(){pthread_mutex_lock(_lock);}void UnLock(){pthread_mutex_unlock(_lock);}~mutex(){}
private:pthread_mutex_t *_lock;
};class lockGuard
{
public:lockGuard(pthread_mutex_t *mutex):_mutex(mutex){_mutex.Lock();}~lockGuard(){_mutex.UnLock();}
private:mutex _mutex;
};
#include <cstdio>
#include <iostream>
#include <cstring>
#include <string>
#include <unistd.h>
#include <vector>
#include "lockGuard.hpp"
#define NUM 10
int tickets = 1000;
// 全局锁
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
class threaddata
{
public:threaddata(int name /*pthread_mutex_t* lock*/){threadname = "thread-" + std::to_string(name);//_mutex = lock;}public:std::string threadname;// pthread_mutex_t* _mutex;
};
// void* getTicks(void* args)
// {
//     //强转
//     threaddata* td = static_cast<threaddata*>(args);
//     const char* name = td->threadname.c_str();
//     while (true)
//     {
//         //使用锁
//         pthread_mutex_lock(td->_mutex);
//         if(tickets > 0)
//         {
//             usleep(1000);
//             printf("who=%s,get a ticket:%d\n",name,tickets);
//             tickets--;
//             pthread_mutex_unlock(td->_mutex);
//         }
//         else
//         {
//             pthread_mutex_unlock(td->_mutex);
//             break;
//         }
//         //不加这个会产生其他线程的饥饿问题,通过同步机制解决这个问题。
//         usleep(13);
//     }
//     printf("%s ... quit\n", name);
//     return nullptr;
// }void *getTicks(void *args)
{// 强转threaddata *td = static_cast<threaddata *>(args);const char *name = td->threadname.c_str();while (true){// 使用锁{lockGuard _lockGuard(&lock);  //C++ RAIIif (tickets > 0){usleep(1000);printf("who=%s,get a ticket:%d\n", name, tickets);tickets--;}else{break;}}// 不加这个会产生其他线程的饥饿问题,通过同步机制解决这个问题。usleep(13);}printf("%s ... quit\n", name);return nullptr;
}
int main()
{// 生成一个锁// pthread_mutex_t lock;// 初始化一个锁// pthread_mutex_init(&lock,nullptr);std::vector<threaddata *> dates;std::vector<pthread_t> tids;for (int i = 1; i <= NUM; i++){threaddata *date = new threaddata(i /*&lock*/);dates.push_back(date);pthread_t tid;// 创建子线程pthread_create(&tid, nullptr, getTicks, dates[i - 1]);tids.push_back(tid);}// 等待for (auto thread : tids){pthread_join(thread, nullptr);}// 释放资源for (auto td : dates){delete td;}// 释放一个锁// pthread_mutex_destroy(&lock);return 0;
}

线程安全和重入的概念:

线程安全:多个线程并发同一段代码时,不会出现不同的结果。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

四、死锁问题

死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资 源而处于的一种永久等待状态。

死锁的四个必要条件:

1、互斥条件:一个资源每次只能被一个执行流使用。

2、请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不妨。

3、不剥夺条件:一个执行流已获得的资源,在未使用完之前,不能强行剥夺。

4、循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系。

避免死锁:破坏死锁的四个必要条件;加锁顺序一致;避免锁未释放的场景;资源一次性分配

 五、线程同步

同步问题是保证数据安全的情况下,  让我们的线程访问资源具有一定的顺序性。

1、提出解决方案

Linux中的条件变量,条件变量必须依赖于锁的使用。因为申请锁资源的时候申请不上,所以我们需要让线程到条件变量上按一定顺序等待。

代码:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
//创建的线程数
#define NUM 5
int cnt = 0;
//创建锁和条件变量
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void* Count(void* argc)
{//分离线程pthread_detach(pthread_self());uint64_t num = (uint64_t)argc;//访问临界资源while (true){//加锁以后其他线程访问不到pthread_mutex_lock(&lock);//线程访问到以后,查看资源是否具备,不具备进行等待,等待的过程中释放锁pthread_cond_wait(&cond,&lock);std::cout << "thread:" << num << ", cnt:" << cnt++ << std::endl;pthread_mutex_unlock(&lock);}
}
int main() 
{//创建线程for(uint64_t i = 0;i < NUM; i++){pthread_t tid;//创建线程pthread_create(&tid,nullptr,Count,(void*)i);usleep(1000);}sleep(3);std::cout << "main thread ctrl begin: " << std::endl;//唤醒线程while(true){sleep(1);pthread_cond_signal(&cond);std::cout << "signal a thread ... " << std::endl;}return 0;
}

2、CP --- 问题

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而 通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者 要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队 列就是用来给生产者和消费者解耦的。

生产者和消费者的行为,进行一定程度的解耦

生产者和消费者都是由线程承担。

执行流在做通信,如何高效安全的通信。看到同一份资源,共享资源,所以会有并发问题。

生产者 vs 生产者:互斥关系

消费者 vs 消费者:互斥关系 

生产者 vs 消费者:互斥,同步关系。  统一为三种关系。 

二种角色:生产和消费

一个交易场所:特定结构的内存空间。 统称为321原则。

优点:1、支持忙闲不均。 

           2、生产和消费进行解耦。

3、快速实现CP问题 

实现一个基于阻塞队列的生产消费者模型。

在多线程编程中阻塞队列 (Blocking Queue) 是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞,直到有元素被从队列中取出( 以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程 操作时会被阻塞)

代码:

#pragma once
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <queue>
//类模板
template <class T>
class BlockQueue
{static const int defaultmax = 5;
public:BlockQueue(int maxcap = defaultmax):maxcap_(maxcap){pthread_mutex_init(&mutex_,nullptr);pthread_cond_init(&_c_cond_,nullptr);pthread_cond_init(&_p_cond_,nullptr);lowwater = maxcap_/3;highwater = (maxcap_ * 2)/3;}T pop(){   pthread_mutex_lock(&mutex_);while(queue_.size() == 0)  //while循环是为了防止伪唤醒{pthread_cond_wait(&_c_cond_,&mutex_);}T out = queue_.front();queue_.pop();if(queue_.size() < lowwater) pthread_cond_signal(&_p_cond_);pthread_mutex_unlock(&mutex_);return out;}void push(T data){//互斥pthread_mutex_lock(&mutex_);//判断临界资源状态,没有就绪阻塞。while(queue_.size() == maxcap_) //while循环是为了防止伪唤醒{pthread_cond_wait(&_p_cond_,&mutex_);}queue_.push(data);if(queue_.size() > highwater) pthread_cond_signal(&_c_cond_);pthread_mutex_unlock(&mutex_);}~BlockQueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&_c_cond_);pthread_cond_destroy(&_p_cond_);}
private:std::queue<T> queue_;int maxcap_;    //极大值pthread_mutex_t mutex_; //互斥锁pthread_cond_t _c_cond_;  //消费条件变量pthread_cond_t _p_cond_;  //生产条件变量变量int lowwater;             int highwater;
};

#include "blockqueue.hpp"void* Consumer(void* argc)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(argc);while (true){sleep(1);int num = bq->pop();std::cout << "消费了一个任务:" << num << std::endl;}return nullptr;
}void* Productor(void* argc)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(argc);int data = 0;while(true){bq->push(data++);std::cout << "生产了一个任务!" << std::endl;}return nullptr;
}int main()
{pthread_t c,p;BlockQueue<int>* bq = new BlockQueue<int>();//消费者线程pthread_create(&c,nullptr,Consumer,bq);//生产者线程pthread_create(&p,nullptr,Productor,bq);//等待线程pthread_join(c,nullptr);pthread_join(p,nullptr);
}

生产者的数据可以从用户或者网络中产生,生产者生产数据也是要花时间获取的。消费者需要做数据的加工处理,也要花时间。两者在执行临界区外的代码是并发访问的。这就提高了效率。

判断资源状态为什么要在临界区内?因为判断临界资源调试是否满足,也是在访问临界资源!

pthread_cond_wait接口会自动释放锁。

如果线程wait时,被误唤醒了呢?多个线程被唤醒,而正好阻塞队列满了,但是没有被消费,还是生产者进程拿到了互斥锁还会往阻塞队列里面写数据。由于阻塞队列是满的,所以会发生写入数据错误。这就是伪唤醒。通过while循环防止伪唤醒。

 改成多线程模式

代码:

#pragma once
#include <iostream>
#include <string>std::string opers="+-*/%";enum{DivZero=1,ModZero,Unknown
};class Task
{
public:Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0){}void run(){switch (oper_){case '+':result_ = data1_ + data2_;break;case '-':result_ = data1_ - data2_;break;case '*':result_ = data1_ * data2_;break;case '/':{if(data2_ == 0) exitcode_ = DivZero;else result_ = data1_ / data2_;}break;case '%':{if(data2_ == 0) exitcode_ = ModZero;else result_ = data1_ % data2_;}            break;default:exitcode_ = Unknown;break;}}void operator ()(){run();}std::string GetResult(){std::string r = std::to_string(data1_);r += oper_;r += std::to_string(data2_);r += "=";r += std::to_string(result_);r += "[code: ";r += std::to_string(exitcode_);r += "]";return r;}std::string GetTask(){std::string r = std::to_string(data1_);r += oper_;r += std::to_string(data2_);r += "=?";return r;}~Task(){}private:int data1_;int data2_;char oper_;int result_;int exitcode_;
};
#include "blockqueue.hpp"
#include "Task.hpp"
#include <ctime>void *Consumer(void *argc)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(argc);sleep(1);while (true){Task task = bq->pop();// 处理数据task.run();std::cout << "得到一个运算结果:" << task.GetResult() << std::endl;}return nullptr;
}void *Productor(void *argc)
{int len = opers.size();BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(argc);// int data = 0;// 产生数据while (true){int x = rand() % 10 + 1; //[1,10]int y = rand() % 10 + 1; //[1,10]char op = opers[rand() % len];Task task(x, y, op);bq->push(task);std::cout << "生产了一个任务!" << task.GetTask() << std::endl;sleep(1);}return nullptr;
}
//BlockQueue 和上面的BlockQueue相同int main()
{srand(time(NULL));pthread_t c[5], p[5];BlockQueue<Task> *bq = new BlockQueue<Task>();// 消费者线程for (int i = 0; i < 5; i++){pthread_create(c + i, nullptr, Consumer, bq);}// 生产者线程for (int i = 0; i < 5; i++){pthread_create(p + i, nullptr, Productor, bq);}// 等待线程for (int i = 0; i < 5; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}
}

六、POSIX信号量

上述的阻塞队列,queue被当作整体使用,queue只有一份,加锁,但是共享资源也可以被看作多份。

信号量的本质是一把计数器,那么这把计数器的本质是用来描述资源数目的,把资源是否就绪放在了临界区之外,申请信号量时,其实就间接的已经在做判断了。

基于环形队列的生产消费模型。满了的时候和空的时候head和tail指向的是同一个位置。空和满的时候,tail和head指向的是同一个位置,无法判断是空还是满。

解决方案:添加计数器。空一个位置。

1、指向同一个位置的时候,不能同时访问。

空:生产者。

满:消费者。

2、消费者不能超过生产者。 

空或者满的时候会指向同一个位置,不空和不满的时候指向不同的位置,我们可以同时访问!

3、生产者不能套消费者一个圈。正常使用必须满足这三个条件。

Productor 关注环形队列还有多少剩余空间。SpaceSem空余信号量

Consumer关注环形队列还有多少剩余数据。DataSem 数据信号量

代码:

#include <iostream>
#include <pthread.h>
#include <vector>
#include <semaphore.h>
const static int defaultcap = 30;
template <class T>
class RingQueue
{
private://封装PV操作void p(sem_t& sem){sem_wait(&sem);}void v(sem_t& sem){sem_post(&sem);}void lock(pthread_mutex_t& lock){pthread_mutex_lock(&lock);}void unlock(pthread_mutex_t& lock){pthread_mutex_unlock(&lock);}
public://给构造函数传入我们需要多大的环形队列,这里使用缺省参数给定RingQueue(int num = defaultcap):cap_(num),_ringQ(num),c_step_(0),p_step_(0){//初始的资源信号量有0个sem_init(&_cdata_sem,0,0);//初始的空信号量有cap_个sem_init(&_pspace_sem,0,cap_);pthread_mutex_init(&_c_lock,nullptr);pthread_mutex_init(&_p_lock,nullptr);}void push(const T& in){//要访问临界资源首先申请信号量p(_pspace_sem); //先申请信号量再加锁的原因是: 1、pv操作是原子的 2、提高并发度lock(_p_lock);_ringQ[p_step_] = in;//位置后移,维护环形队列p_step_++;p_step_%=cap_;unlock(_p_lock);v(_cdata_sem);}void pop(T* out){p(_cdata_sem);lock(_c_lock);*out = _ringQ[c_step_];c_step_++;c_step_%= cap_;unlock(_c_lock);v(_pspace_sem);}~RingQueue(){sem_destroy(&_cdata_sem);sem_destroy(&_pspace_sem);pthread_mutex_destroy(&_c_lock);pthread_mutex_destroy(&_p_lock);}
private://这里我们需要实现一个基于环形队列的生产消费模型。//使用STL容器queuestd::vector<T> _ringQ;int cap_;  //定义环形队列的最大值//还需要定义两个指针,指向当前消费者和生产者资源的位置。int c_step_;int p_step_;//考虑到需要资源数量,这里使用信号量作为线程同步互斥的实现。///由于生产者和消费者关心的资源不同我们需要两个信号量表示生产消费者所需要的模型//定义两个信号量,一个表示空位置有几个,另一个表示有多少个资源数量sem_t _cdata_sem;sem_t _pspace_sem;//为了实现多线程的互斥我们需要定义两把锁pthread_mutex_t _c_lock;pthread_mutex_t _p_lock;
};
#include "RingQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <unistd.h>
#define PNUM 3
#define CNUM 2// 线程执行的函数
void *Productor(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){// 生成数据int data1 = rand() % 10 + 1;usleep(3);int data2 = rand() % 10;char op = opers[rand() % opers.size()];Task t(data1, data2, op);// 生成任务rq->push(t);std::cout << "Productor task done, task is : " << t.GetTask() << std::endl;sleep(1);}return nullptr;
}
void *Consumer(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);// 拿到数据while (true){Task t;rq->pop(&t);// 处理任务t();std::cout << "Consumer get task, task is : " << t.GetTask() << " result: " << t.GetResult() << std::endl;}// sleep(1);return nullptr;
}int main()
{srand(time(nullptr) ^ getpid());// 实例化RingQueue类的对象,需要放入任务。所以类模板实例化为Task类RingQueue<Task> *rq = new RingQueue<Task>();// 生成生产消费者模型的线程// 是共享区中的线程结构体的地址。pthread_t c[CNUM], p[PNUM];for (int i = 0; i < PNUM; i++){pthread_create(p + i, nullptr, Productor, rq);}for (int i = 0; i < CNUM; i++){pthread_create(c + i, nullptr, Consumer, rq);}for (int i = 0; i < PNUM; i++){pthread_join(p[i], nullptr);}for (int i = 0; i < CNUM; i++){pthread_join(c[i], nullptr);}return 0;
}

七、线程池

C++类内创建线程,使用原生线程。

代码:

#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include "Task.hpp"struct ThreadInfo
{pthread_t tid;std::string name;
};
// 默认最大可以创建多少个线程
const static int defaultnum = 5;
template <class T>
class ThreadPool
{
private:void lock(){pthread_mutex_lock(&lock_);}void unlock(){pthread_mutex_unlock(&lock_);}void wakeup(){pthread_cond_signal(&cond_);}void threadsleep(){pthread_cond_wait(&cond_, &lock_);}bool GetEmpty(){return tasks_.empty();}std::string GetThreadTid(pthread_t tid){for (auto &t : threads_){if (t.tid == tid){return t.name;}}return "None";}public:static void *threadHandler(void *argc){ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(argc);std::string name = tp->GetThreadTid(pthread_self());while (true){// 拿任务处理任务tp->lock();while (tp->GetEmpty()){tp->threadsleep();}T t = tp->pop();tp->unlock();t();std::cout << name << " run, "<< "result: " << t.GetResult() << std::endl;}}void Start(){// 创建线程int num = threads_.size();for (int i = 0; i < num; i++){threads_[i].name = "thread-" + std::to_string(i + 1);pthread_create(&(threads_[i].tid), nullptr, threadHandler, this);}}void push(const T &in){// 向队列里push任务lock();tasks_.push(in);// 唤醒进程。wakeup();unlock();}T pop(){T t = tasks_.front();tasks_.pop();return t;}static ThreadPool<T> *GetInstance_(){// 防止多次争夺锁if (nullptr == tp_){pthread_mutex_lock(&mutex_);if (nullptr == tp_){std::cout << "log: singleton create done first!" << std::endl;tp_ = new ThreadPool<T>();}pthread_mutex_unlock(&mutex_);}return tp_;}private:ThreadPool(int num = defaultnum) : threads_(num){// 在这里初始化互斥锁和条件变量pthread_mutex_init(&lock_, nullptr);pthread_cond_init(&cond_, nullptr);}~ThreadPool(){pthread_mutex_destroy(&lock_);pthread_cond_destroy(&cond_);}ThreadPool(const ThreadPool<T> &) = delete;const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;private:// 我们需要一个vector来管理线程的信息std::vector<ThreadInfo> threads_;// 需要一个队列存放任务std::queue<Task> tasks_;// 需要一个锁pthread_mutex_t lock_;// 需要一个条件变量实现同步和互斥pthread_cond_t cond_;// 修改成单例模式static ThreadPool<T> *tp_;static pthread_mutex_t mutex_;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::mutex_ = PTHREAD_MUTEX_INITIALIZER;#include "ThreadPool.hpp"
#include <ctime>
#include <unistd.h>
int main()
{//ThreadPool<Task>* tp = new ThreadPool<Task>(5);srand(time(nullptr)^getpid());  //懒汉单例模式ThreadPool<Task>::GetInstance_()->Start();  while (true){//生成任务int x = rand()%10 + 1;usleep(3);int y = rand()%10;char op = opers[rand()%opers.size()];Task t(x,y,op);//交给线程池处理ThreadPool<Task>::GetInstance_()->push(t);std::cout << "main thread make task: " << t.GetTask() << std::endl;}return 0;
}

八、线程安全的单例模式

某些类 , 只应该具有一个对象 ( 实例 ), 就称之为单例 .
例如一个男人只能有一个媳妇。
在很多服务器开发场景中 , 经常需要让服务器加载很多的数据 ( 上百 G) 到内存中 . 此时往往要用一个单例的类来管理这些数据。

懒汉模式:延迟加载,能够优化服务器的启动速度。线程池已经改成了懒汉模式的单例模式

饿汉模式:直接加载 

全局变量在程序启动的时候就会创建。 

九、STL,智能指针和线程安全

STL容器不是线程安全的。

智能指针:

对于 unique_ptr, 由于只是在当前代码块范围内生效 , 因此不涉及线程安全问题 .
对于 shared_ptr, 多个对象需要共用一个引用计数变量 , 所以会存在线程安全问题 . 但是标准库实现的时候考虑到了这 个问题, 基于原子操作 (CAS) 的方式保证 shared_ptr 能够高效 , 原子的操作引用计数

十、其他常见的各种锁

  • 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行 锁等),当其他线程想要访问数据时,被阻塞挂起。
  • 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前, 会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
  • CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不 等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
  • 自旋锁,公平锁,非公平锁

 自旋锁:取决于其他线程执行临界区的时长,之前学到的锁都是挂起等待锁。会有资源上的浪费。我们可以使用自旋锁循环访问临界资源。可以使用这个接口实现一个自旋锁。需要外套一个while循环。

pthread库里面有自旋锁的调用接口。

访问临界区的时间长短,决定了是否需要自旋锁。

十一、读者写者问题

遵守321原则

三种关系:写 vs 写 互斥。

                  写 vs 读 互斥,同步。

                  读 vs 读 共享关系。

为什么读者vs读者是共享关系,消费者之间却是互斥?

数据的存留问题,因为读者不会把数据拿走,而消费者会把数据拿走。

两种角色:读者R,写者W,线程承担。

一个交易场所:数据交换的地点。

相关接口:

读写的理解 :读多和写少的情况。读写之间的同步问题。读者优先,写者的饥饿问题。写者优先,读者饥饿问题。

伪代码:

系统部分完结。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/877425.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

聊聊场景及场景测试

在我们进行测试过程中&#xff0c;有一种黑盒测试叫场景测试&#xff0c;我们完全是从用户的角度去理解系统&#xff0c;从而可以挖掘用户的隐含需求。 场景是指用户会使用这个系统来完成预定目标的所有情况的集合。 场景本身也代表了用户的需求&#xff0c;所以我们可以认为…

SpringBoot+Vue在线商城(电子商城)系统-附源码与配套论文

摘 要 随着互联网技术的发展和普及&#xff0c;电子商务在全球范围内得到了迅猛的发展&#xff0c;已经成为了一种重要的商业模式和生活方式。电子商城是电子商务的重要组成部分&#xff0c;是一个基于互联网的商业模式和交易平台&#xff0c;通过网络进行产品和服务的销售。…

计算机图形学 | 动画模拟

动画模拟 布料模拟 质点弹簧系统&#xff1a; 红色部分很弱地阻挡对折 Steep connection FEM:有限元方法 粒子系统 粒子系统本质上就是在定义个体和群体的关系。 动画帧率 VR游戏要不晕需要达到90fps Forward Kinematics Inverse Kinematics 只告诉末端p点&#xff0c;中间…

Delphi5实现色板程序——滑块型组件实例

效果图 参考 Delphi程序设计基础&#xff1a;教程、实验、习题 代码 unit Unit1;interfaceusesSysUtils, WinTypes, WinProcs, Messages, Classes, Graphics, Controls,Dialogs, Forms,Form, Formprpt, ExtCtrls, StdCtrls;typeTForm1 class(MForm)Label1: TLabel;Label2: …

公式编辑器 -vue-formula-editor

前言 公式编辑旨在帮助用户使用可视化的前提&#xff0c;能便捷的使用平台&#xff0c;例如低代码平台使用广泛 vue-formula-editor vue-formula-editor是一款开源的Vue公式计算组件&#xff0c;可以帮助开发者快速集成公式编辑 在线体验 demo & 源码 安装 npm i vue-form…

[Python学习日记-9] Python中的运算符

简介 计算机可以进行的运算有很多种&#xff0c;但可不只加减乘除这么简单&#xff0c;运算按种类可分为算数运算、比较运算、逻辑运算、赋值运算、成员运算、身份运算、位运算&#xff0c;而本篇我们暂只介绍算数运算、比较运算、逻辑运算、赋值运算 算数运算 一、运算符描述…

FunHPC算力平台评测

作为内测老用户&#xff0c;已经用DeepLn平台&#xff08;现改名为FunHPC平台&#xff09;好久了&#xff0c;一路见证了平台从最初100多人的小群到现在满群的状态&#xff0c;FunHpc平台确实在一步步的走向成熟&#xff0c;一步步的变大。趁着现在活动的时间&#xff0c;发篇文…

XSS反射型和DOM型+DOM破坏

目录 第一关 源码分析 payload 第二关 源码分析 payload 第三关 源码分析 payload 第四关 源码分析 payload 第五关 源码分析 payload 第六关 源码分析 第七关 源码分析 方法一&#xff1a;构造函数 方法二&#xff1a;parseInt 方法三&#xff1a;locat…

项目问题 | CentOS 7停止维护导致yum失效的解决办法

目录 centos停止维护意味着yum相关源伴随失效。 报错&#xff1a; 解决方案&#xff1a;将图中四个文件替换掉/etc/yum.repos.d/目录下同名文件 资源提交在博客头部&#xff0c;博客结尾也提供文件源码内容 CentOS-Base.repo CentOS-SCLo-scl.repo CentOS-SCLo-scl-rh.rep…

HTML5服装电商网上商城模板源码

文章目录 1.设计来源1.1 主界面1.2 购物车界面1.3 电子产品界面1.4 商品详情界面1.5 联系我们界面1.6 各种标签演示界面 2.效果和源码2.1 动态效果2.2 源代码 源码下载万套模板&#xff0c;程序开发&#xff0c;在线开发&#xff0c;在线沟通 【博主推荐】&#xff1a;前些天发…

【系统分析师】-综合知识-系统架构

1、设计模式 1&#xff09;观察者模式定义了对象间的一种一对多依赖关系&#xff0c;使得每当一个对象改变状态&#xff0c;则所有依赖于它的对象都会得到通知并被自动更新【消息订阅】。在该模式中&#xff0c;发生改变的对象称为观察目标&#xff0c;被通知的对象称为观察者&…

Python爬虫使用实例

IDE&#xff1a;大部分是在PyCharm上面写的 解释器装的多 → 环境错乱 → error&#xff1a;没有配置&#xff0c;no model 爬虫可以做什么&#xff1f; 下载数据【文本/二进制数据&#xff08;视频、音频、图片&#xff09;】、自动化脚本【自动抢票、答题、采数据、评论、点…

vue3 响应式 API:ref() 和 reactive()

在 Vue 3 中&#xff0c;响应式系统是其核心特性之一&#xff0c;它使得数据的变化能够自动触发视图的更新。 官方文档&#xff1a; 响应式 API&#xff1a;核心 要更好地了解响应式 API&#xff0c;推荐阅读官方指南中的章节&#xff1a; 响应式基础 (with the API preference…

SX_初识GitLab_1

1、对GitLab的理解&#xff1a; 目前对GitLab的理解是其本质是一个远程代码托管平台&#xff0c;上面托管多个项目&#xff0c;每个项目都有一个master主分支和若干其他分支&#xff0c;远程代码能下载到本机&#xff0c;本机代码也能上传到远程平台 1.分支的作用&#xff1a…

Oracle大师Roger Cornejo的推荐:使用ASH诊断Oracle解析故障

这篇文章被Oracle大师Roger Cornejo在X平台上推荐&#xff08;见下图&#xff09;&#xff0c;英文原文在&#xff1a; Diagnosing Parsing Issue with ASH 解析&#xff0c;尤其是硬解析&#xff0c;是非生产性操作&#xff0c;会消耗大量系统资源&#xff0c;导致库缓存争用…

Meta 如何实现 99.99999999% 的缓存一致性

曾经的故事 Meta&#xff08;Facebook&#xff09; 曾经运行一个简单的技术栈——PHP 和 MySQL。 但随着更多用户的加入&#xff0c;他们面临着可扩展性问题。因此他们建立了一个分布式缓存。 虽然这暂时解决了可扩展性问题&#xff0c;但保持缓存数据的新鲜度变得困难。以下…

SpringBoot 整合 Redis 实现验证码登录功能

一、整合Redis 在pom.xml中添加Redis相关依赖&#xff1b; <!--Spring Data Redis依赖配置--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency>…

阿里云账户注册与实名认证详细教程

在开始使用阿里云服务之前&#xff0c;您需要先有一个阿里云账号&#xff0c;拥有阿里云账号后&#xff0c;您可以选购和使用云产品和服务。如果您没有阿里云账号&#xff0c;需要先注册一个阿里云账号。 说明 一个手机号码下最多可以注册6个阿里云账号。如果您的手机号码已经…

一分钟了解VMware虚拟机三种网络模式区别

VMware虚拟机提供了三种主要的网络模式&#xff0c;分别是桥接模式&#xff08;Bridged Mode&#xff09;、网络地址转换模式&#xff08;NAT Mode&#xff09;和仅主机模式&#xff08;Host-Only Mode&#xff09;。这三种模式各有其特点和适用场景&#xff0c;以下是对它们之…

Perfectly Clear WorkBench中文绿色版,让每一张照片都完美无瑕

软件简介 你是否曾经为了一张不完美的照片而感到遗憾&#xff1f;是否曾经因为照片中的小瑕疵而不得不花费大量时间进行后期处理&#xff1f;现在&#xff0c;有了Perfectly Clear WorkBench&#xff0c;这些问题都将迎刃而解。作为全球领先的智能图像校正技术商推出的图像清晰…