文章目录
- 非并发的一写一读环形队列
- 多读多写环形队列
非并发的一写一读环形队列
读指针:
1、先判断是否有数据
2、读取数据
3、操作指针
写指针:
1、先判断空间是否足够
2、写入数据
3、操作指针·
所以代码也十分简单:
bool putqueue(void* pData)
{ST_NODE* ptr = NULL;do {ptr = pWrite;if ((uiQueueSize - ((pWrite + uiQueueSize - pRead) % uiQueueSize) - 1 > 0) || (NULL != *ptr)) // 队列满了 {return false;}} while(!_sync_bool_compare_and_swap(pWrite, ptr, ptr + 1)) // 竞争到写指针if (pWrite >= pstBegin) {pWrite = pstBegin;}*ptr = pData;return true;
}
那么在多线程多读多写的情况下,该如何设计呢?
多读多写环形队列
核心问题是:
1、多个线程如何竞争操作一个指针?
思路:利用CAS(compare & swap)确保只有一个线程能把指针从当前位置指向下一个位置
2、先操作指针还是先操作数据?
- 先操作指针,有可能导致数据还没读,就被写入方覆盖
- 先读/写数据,可能无法竞争到指针导致错误
- 解决方案:标记法,已读取得数据置为NULL,未读数据为实际数据得指针,读写前先判断标记。
void* getqueue()
{ST_NODE* ptr = NULL;ST_NODE* current = NULL;do {ptr = pRead;if (((pWrite + uiQueueSize - pRead) % uiQueueSize) > 0 || (NULL != ptr)) // 队列空{return NULL;}} while(!_sync_bool_compare_and_swap(pRead, ptr, ptr + 1)) // 竞争到读指针if (pRead >= pstEnd) {pRead = pstEnd;}current = *ptr;*ptr = NULL;return *current;
}
此时也会出现一些极端的问题:
1、CAS指令的ABA问题:两个线程同时读/写同一个位置,第一个线程获取读/写权限后,第二个线程挂起。
指针有可能转一圈回到原来位置,导致第二个线程恢复运行,从而第二个线程CAS成功。极端情况下会导致读指针越过写指针。
解决方案:通过一个唯一id:seq替换指针,seq为64位整数,自增且永不重复。指针 = 队列首地址 + seq % 队列长度
class mqueue
{
public:mqueue() {read_seq_ = write_seq_ = 0;memset(queue_arr_, 0, sizeof(queue_arr_));}bool push_back(void* data); // 插入元素void* pop_front(); // 取出元素
private:void* queue_arr_[MAXN];volatile uint64_t read_seq_;volatile uint64_t write_seq_;
};bool mqueue::push_back(void* data)
{do {uint64_t cur_seq = write_seq_; // 保留原始值,避免处理过程被其他线程改变if (cur_seq >= read_seq_ + MAXN || queue_arr_[cur_seq % MAXN]){return false; // 队列满了,等读线程读取}if (_sync_bool_compare_and_swap(&write_seq_, cur_seq, cur_seq + 1)){queue_arr_[cur_seq % MAXN] = data;return true;}} while (true);
}void* mqueue::pop_front()
{do{uint64_t cur_seq = read_seq_; // 保留原始值,避免处理过程被其他线程改变if (cur_seq >= write_seq_ || queue_arr_[cur_seq % MAXN] == NULL){return NULL; // 队列空,等待写线程写入 }if (_sync_bool_compare_and_swap(&read_seq_, cur_seq, cur_seq + 1)){void* data = queue_arr_[cur_seq % MAXN];queue_arr_[cur_seq % MAXN] = NULL;return data;}} while (true);
}