为了用c语言实现队列进行多线程通信,用于实现一个状态机。
下面是实现过程
1.实现多线程队列入栈和出栈,不加锁
发送线程发送字符1,接收线程接收字符并打印。
多线程没有加锁,会有危险
#include "stdio.h"
#include <thread>
#include <unistd.h>
#include <pthread.h>typedef struct MutiThreadCharQueNode
{unsigned char data;struct MutiThreadCharQueNode* next;
}MutiThreadCharQueNode;typedef struct MutiThreadCharQueue
{MutiThreadCharQueNode* phead;MutiThreadCharQueNode* ptail;int size;
}MutiThreadCharQueue;
MutiThreadCharQueue TestMutiThreadQue;void MutiThreadCharQueueInit(MutiThreadCharQueue* pq)
{pq->phead=NULL; //将队列的头指针置为空pq->ptail = NULL;//将队列的尾指针置为空pq->size = 0;// 将队列的头指针置为空
}
bool MutiThreadCharQueueEmpty(MutiThreadCharQueue* pq)
{return pq->size == 0;
}
void MutiThreadCharQueueDestroy(MutiThreadCharQueue* pq)
{MutiThreadCharQueNode* cur = pq->phead;// 创建一个指针 cur,指向队列的头指针while (cur){MutiThreadCharQueNode* next = cur->next;// 创建一个指针 cur,指向队列的头指针free(cur);// 释放当前节点的内存cur = next;// 将指针 cur 移动到下一个节点}pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空pq->size = 0;// 将队列的大小置为0
}
void MutiThreadCharQueuePush(MutiThreadCharQueue* pq, unsigned char x)
{MutiThreadCharQueNode* newnode = (MutiThreadCharQueNode*)malloc(sizeof(MutiThreadCharQueNode));// 创建一个新的节点if (newnode == NULL){return;}newnode->data = x;// 设置新节点的数据为传入的元素值newnode->next = NULL;// 将新节点的指针域置空//一个节点if (pq->ptail == NULL)// 判断队列是否为空{pq->phead = pq->ptail = newnode;// 将新节点同时设置为队列的头节点和尾节点}//多个节点else{pq->ptail->next = newnode;// 将新节点同时设置为队列的头节点和尾节点pq->ptail = newnode;// 更新队列的尾指针为新节点}pq->size++;// 增加队列的大小计数
}
unsigned char MutiThreadCharQueueFront(MutiThreadCharQueue* pq)
{
// assert(pq);// 检查指针是否为空
// assert(!QueueEmpty(pq));// 检查队列是否非空
// assert(pq->phead);// 检查队列的头指针是否存在
// if(QueueEmpty(pq))
// {
// return ;
// }return pq->phead->data;// 返回队列头节点的数据
}
void MutiThreadCharQueuePop(MutiThreadCharQueue* pq)
{//1.一个节点if (pq->phead->next == NULL) // 队列只有一个节点的情况{free(pq->phead); // 释放队列头节点的内存pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空}//2.多个节点else{MutiThreadCharQueNode* next = pq->phead->next; //保存队列头节点的下一个节点指针free(pq->phead);// 释放队列头节点的内存pq->phead = next;// 更新队列的头指针为下一个节点}pq->size--;//减少队列的大小计数
}
void* thread_send(void* para)
{printf("hh\n");MutiThreadCharQueueInit(&TestMutiThreadQue);unsigned char sendChar=1;while(1){printf("send a\n");MutiThreadCharQueuePush(&TestMutiThreadQue,sendChar);usleep(1000000);}
}
void* thread_rev(void* para)
{printf("h2\n");unsigned char revChar;while(1){if(false==MutiThreadCharQueueEmpty(&TestMutiThreadQue)){revChar=MutiThreadCharQueueFront(&TestMutiThreadQue);printf("rev char= %d\n",(int)revChar);MutiThreadCharQueuePop(&TestMutiThreadQue);}usleep(1000000);}
}void create_c_thread_send()
{int ret;pthread_attr_t attr;ret = pthread_attr_init(&attr);if (ret != 0) {return ;}//2pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);int err;pthread_t tid;err = pthread_create(&tid, &attr, thread_send, (void*)NULL);}
void create_c_thread_rev()
{int ret;pthread_attr_t attr;ret = pthread_attr_init(&attr);if (ret != 0) {return ;}//2pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);int err;pthread_t tid;err = pthread_create(&tid, &attr, thread_rev, (void*)NULL);}
int main(int argc, char** argv)
{printf("hello\n");create_c_thread_send();create_c_thread_rev();while(1){usleep(10000);}return 0;
}
2.实现多线程队列入栈和出栈,加锁并使用信号量触发接收线程
在队列的结构体中加上锁,防止多线程冲突
#include "stdio.h"
#include <thread>
#include <unistd.h>
#include <pthread.h>typedef struct MutiThreadCharQueNode
{unsigned char data;struct MutiThreadCharQueNode* next;
}MutiThreadCharQueNode;typedef struct MutiThreadCharQueue
{MutiThreadCharQueNode* phead;MutiThreadCharQueNode* ptail;int size;pthread_mutex_t mutex;
}MutiThreadCharQueue;
MutiThreadCharQueue TestMutiThreadQue;void MutiThreadCharQueueInit(MutiThreadCharQueue* pq)
{pq->phead=NULL; //将队列的头指针置为空pq->ptail = NULL;//将队列的尾指针置为空pq->size = 0;// 将队列的头指针置为空pthread_mutex_init(&pq->mutex, NULL);
}
bool MutiThreadCharQueueEmpty(MutiThreadCharQueue* pq)
{pthread_mutex_lock(&pq->mutex);bool bEmpty=(bool) (pq->size == 0);pthread_mutex_unlock(&pq->mutex);return bEmpty;
}
void MutiThreadCharQueueDestroy(MutiThreadCharQueue* pq)
{pthread_mutex_lock(&pq->mutex);MutiThreadCharQueNode* cur = pq->phead;// 创建一个指针 cur,指向队列的头指针while (cur){MutiThreadCharQueNode* next = cur->next;// 创建一个指针 cur,指向队列的头指针free(cur);// 释放当前节点的内存cur = next;// 将指针 cur 移动到下一个节点}pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空pq->size = 0;// 将队列的大小置为0pthread_mutex_unlock(&pq->mutex);
}
void MutiThreadCharQueuePush(MutiThreadCharQueue* pq, unsigned char x)
{pthread_mutex_lock(&pq->mutex);MutiThreadCharQueNode* newnode = (MutiThreadCharQueNode*)malloc(sizeof(MutiThreadCharQueNode));// 创建一个新的节点if (newnode == NULL){pthread_mutex_unlock(&pq->mutex);return;}newnode->data = x;// 设置新节点的数据为传入的元素值newnode->next = NULL;// 将新节点的指针域置空//一个节点if (pq->ptail == NULL)// 判断队列是否为空{pq->phead = pq->ptail = newnode;// 将新节点同时设置为队列的头节点和尾节点}//多个节点else{pq->ptail->next = newnode;// 将新节点同时设置为队列的头节点和尾节点pq->ptail = newnode;// 更新队列的尾指针为新节点}pq->size++;// 增加队列的大小计数pthread_mutex_unlock(&pq->mutex);
}
unsigned char MutiThreadCharQueueFront(MutiThreadCharQueue* pq)
{
// if(QueueEmpty(pq))
// {
// return ;
// }pthread_mutex_lock(&pq->mutex);char data=pq->phead->data;// 返回队列头节点的数据pthread_mutex_unlock(&pq->mutex);return data;
}
void MutiThreadCharQueuePop(MutiThreadCharQueue* pq)
{pthread_mutex_lock(&pq->mutex);//1.一个节点if (pq->phead->next == NULL) // 队列只有一个节点的情况{free(pq->phead); // 释放队列头节点的内存pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空}//2.多个节点else{MutiThreadCharQueNode* next = pq->phead->next; //保存队列头节点的下一个节点指针free(pq->phead);// 释放队列头节点的内存pq->phead = next;// 更新队列的头指针为下一个节点}pq->size--;//减少队列的大小计数pthread_mutex_unlock(&pq->mutex);
}
void* thread_send(void* para)
{printf("hh\n");MutiThreadCharQueueInit(&TestMutiThreadQue);unsigned char sendChar=1;while(1){printf("send a\n");MutiThreadCharQueuePush(&TestMutiThreadQue,sendChar);usleep(1000000);}
}
void* thread_rev(void* para)
{printf("h2\n");unsigned char revChar;while(1){if(false==MutiThreadCharQueueEmpty(&TestMutiThreadQue)){revChar=MutiThreadCharQueueFront(&TestMutiThreadQue);printf("rev char= %d\n",(int)revChar);MutiThreadCharQueuePop(&TestMutiThreadQue);}usleep(1000000);}
}void create_c_thread_send()
{int ret;pthread_attr_t attr;ret = pthread_attr_init(&attr);if (ret != 0) {return ;}//2pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);int err;pthread_t tid;err = pthread_create(&tid, &attr, thread_send, (void*)NULL);}
void create_c_thread_rev()
{int ret;pthread_attr_t attr;ret = pthread_attr_init(&attr);if (ret != 0) {return ;}//2pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);int err;pthread_t tid;err = pthread_create(&tid, &attr, thread_rev, (void*)NULL);}
int main(int argc, char** argv)
{printf("hello\n");create_c_thread_send();create_c_thread_rev();while(1){usleep(10000);}return 0;
}
3.实现任意数据类型的多线程队列
以上的队列数据类型固定了,希望实现一个通用的多线程队列,并且数据可以得到释放。
#include "stdio.h"
#include <thread>
#include <unistd.h>
#include <pthread.h>
#include <string.h>typedef struct MutiThreadQueNode
{void* data;struct MutiThreadQueNode* next;
}MutiThreadQueNode;typedef struct MutiThreadQueue
{MutiThreadQueNode* phead;MutiThreadQueNode* ptail;int size;int data_mem_size;pthread_mutex_t mutex;
}MutiThreadQueue;typedef struct TestMyStructData
{int my_int_data;float my_float_data;
}TestMyStructData;MutiThreadQueue TestMutiThreadQue;void MutiThreadQueueInit(MutiThreadQueue* pq,int data_mem_size)
{pq->phead=NULL; //将队列的头指针置为空pq->ptail = NULL;//将队列的尾指针置为空pq->size = 0;// 将队列的头指针置为空pq->data_mem_size=data_mem_size;pthread_mutex_init(&pq->mutex, NULL);
}
bool MutiThreadQueueEmpty(MutiThreadQueue* pq)
{pthread_mutex_lock(&pq->mutex);bool bEmpty=(bool) (pq->size == 0);pthread_mutex_unlock(&pq->mutex);return bEmpty;
}
void MutiThreadQueueDestroy(MutiThreadQueue* pq)
{pthread_mutex_lock(&pq->mutex);MutiThreadQueNode* cur = pq->phead;// 创建一个指针 cur,指向队列的头指针while (cur){MutiThreadQueNode* next = cur->next;// 创建一个指针 cur,指向队列的头指针//!由于data是拷贝过来的,释放data内存free(cur->data);free(cur);// 释放当前节点的内存cur = next;// 将指针 cur 移动到下一个节点}pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空pq->size = 0;// 将队列的大小置为0pthread_mutex_unlock(&pq->mutex);
}
void MutiThreadQueuePush(MutiThreadQueue* pq, void* data,int data_mem_size)
{pthread_mutex_lock(&pq->mutex);MutiThreadQueNode* newnode = (MutiThreadQueNode*)malloc(sizeof(MutiThreadQueNode));// 创建一个新的节点if (newnode == NULL){pthread_mutex_unlock(&pq->mutex);return;}if(pq->data_mem_size!=data_mem_size){printf("input data error\n");pthread_mutex_unlock(&pq->mutex);return;}void* queData=malloc(pq->data_mem_size);memcpy(queData,data,pq->data_mem_size);newnode->data = queData;// 设置新节点的数据为传入的元素值newnode->next = NULL;// 将新节点的指针域置空//一个节点if (pq->ptail == NULL)// 判断队列是否为空{pq->phead = pq->ptail = newnode;// 将新节点同时设置为队列的头节点和尾节点}//多个节点else{pq->ptail->next = newnode;// 将新节点同时设置为队列的头节点和尾节点pq->ptail = newnode;// 更新队列的尾指针为新节点}pq->size++;// 增加队列的大小计数pthread_mutex_unlock(&pq->mutex);
}
void MutiThreadQueueFront(MutiThreadQueue* pq,void* outData,int data_mem_size)
{
// if(QueueEmpty(pq))
// {
// return ;
// }pthread_mutex_lock(&pq->mutex);if(data_mem_size!=pq->data_mem_size){printf("input data_mem_size error\n");pthread_mutex_unlock(&pq->mutex);return ;}memcpy(outData,pq->phead->data,pq->data_mem_size);pthread_mutex_unlock(&pq->mutex);}
void MutiThreadQueuePop(MutiThreadQueue* pq)
{pthread_mutex_lock(&pq->mutex);//1.一个节点if (pq->phead->next == NULL) // 队列只有一个节点的情况{free(pq->phead); // 释放队列头节点的内存pq->phead = pq->ptail = NULL;// 将队列的头指针和尾指针置为空}//2.多个节点else{MutiThreadQueNode* next = pq->phead->next; //保存队列头节点的下一个节点指针//!由于data是拷贝过来的,释放data内存free(pq->phead->data);free(pq->phead);// 释放队列头节点的内存pq->phead = next;// 更新队列的头指针为下一个节点}pq->size--;//减少队列的大小计数pthread_mutex_unlock(&pq->mutex);
}
void* thread_send(void* para)
{printf("hh\n");TestMyStructData mySendData;mySendData.my_int_data=1;mySendData.my_float_data=2;MutiThreadQueueInit(&TestMutiThreadQue,sizeof(TestMyStructData));while(1){printf("send 1\n");MutiThreadQueuePush(&TestMutiThreadQue,&mySendData,sizeof(TestMyStructData));usleep(1000000);}
}
void* thread_rev(void* para)
{printf("h2\n");TestMyStructData myRevData;while(1){if(false==MutiThreadQueueEmpty(&TestMutiThreadQue)){MutiThreadQueueFront(&TestMutiThreadQue,&myRevData,sizeof(TestMyStructData));printf("rev intdata= %d float data=%f\n",myRevData.my_int_data,myRevData.my_float_data);MutiThreadQueuePop(&TestMutiThreadQue);}usleep(1000000);}
}void create_c_thread_send()
{int ret;pthread_attr_t attr;ret = pthread_attr_init(&attr);if (ret != 0) {return ;}//2pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);int err;pthread_t tid;err = pthread_create(&tid, &attr, thread_send, (void*)NULL);}
void create_c_thread_rev()
{int ret;pthread_attr_t attr;ret = pthread_attr_init(&attr);if (ret != 0) {return ;}//2pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);int err;pthread_t tid;err = pthread_create(&tid, &attr, thread_rev, (void*)NULL);}
int main(int argc, char** argv)
{printf("hello\n");create_c_thread_send();create_c_thread_rev();while(1){usleep(10000);}return 0;
}
4.队列其他操作
队列操作可以完善的点
1.加上队列最大限制,如果队列内数据大小超过阈值,清空队列