完善博文 共享内存一写多读无锁实现的代码逻辑部分

使用共享内存(内存映射)实现发布订阅模式

  • 多进程实现PubSub发布订阅模式,从而实现进程间的通信。
  • 通信方式可以是TCP/UDP,管道Pipe/消息队列,共享内存shared memory等等。其中TCP/UDP的方式是可以用作局域网以及跨平台的通信,Pipe/消息队列是进程间基于系统实现比较基础的通信,这两者有大量优秀的第三方库支持,如ZeroMQ,只要加入我们自定义数据的转换方式即可方便实现;而共享内存是实现进程间通信最快的方式,但因为共享内存的设计并不是用来做类似PubSub这种模式的实现的,并且共享内存实质上就是一段进程间共享的内存空间,使用自由度是极高的,所以也很少有第三方库来实现共享内存方式的进程间通信。
  • 因此本文的重点是如何使用共享内存shared memory来实现高效的PubSub发布订阅模式。

需求

  • 消息通过事先分配好的共享内存空间来传递
  • 需要有一定的机制来管理消息的发送(写)和接收(读)
  • 需要实现发布订阅模式,也就是一个发布者(一写)多个订阅者(多读)
  • 考虑到平台的原因,最后采用了文件映射内存的这种方式,在各种系统中都有比较通用的实现

逻辑分析

  • 显然,只要创建了一个文件并且设置好需要的大小,即可以使用mmap映射到进程的内存空间,并且在退出时可以用munmap将映射释放掉。但是空间真正的释放是要把文件删掉的,因此我们需要一个计数器来记录使用这块共享内存的进程数,类似共享指针shared_ptr的实现,在计数为零时把文件删掉。在修改这个计数的时候还需要一把进程间读写锁:
  • 对于只有单个订阅者,数据之后包含一个标志位,发布者写完后置为true,订阅者读完之后置为false,可能再加上一个信号灯的控制,来避免频繁读写;
  • 对于多个订阅者,数据中的这个标志位变成一个计数,发布者写完之后将计数器置为订阅者的数量,订阅者读完之后将计数器减1,再加上一个进程条件变量的控制,来避免频繁读写。
  • 这两种方案都有一定的弊端,最大的问题在于,订阅者还需要修改共享内存的内容,这样就发挥不出读写锁支持多读的优势了。我们需要一个更好的机制。
  • 一个简单的实现是数据中带有一个单调递增的标签,订阅者读到数据后本地保存一下这个标签的值,如果下次读到的这个值不比保存的值大,就认为读到了旧数据,忽略之。这个标签比较好的实现是用当前的系统时间而不是计数,因为发布者可能会重启清零,就算重启后可以从已经写入的数据中读取,但后面为了实现无锁队列会让这个事情变得麻烦。这样还有一个问题是,依然会频繁地去读取这个标签。因此需要加入进程条件变量的控制来减少这种频繁。接下来是2,实现消息发送(写)和接收(读)的管理。因为我们已经有了一把读写锁,很自然地想到可以用它来管理读写啊。事实上并不是这样,因为发布者写完数据之后可能会有一段时间不会占有写锁,这时候就要一种机制来限制订阅者不会重复来读这个数据。对于这个实现,已有的方案有:
  • 对于每一个订阅者都开辟一块共享内存,可以按一对一的方式同时复制多份数据;
  • 使用生产消费模式,使用循环队列来实现读写分离。
  • 第1种方案是解决了读写锁争抢的问题,但是增加了内存复制的开销,反而没有第2种方案好。但是我们要稍微修改一下传统的生产消费模式的实现,只用一个指针来指向最新的数据。之所以这样做是因为内存是事先分配好的,我们把它改造成环形的内存缓冲区,很难保证数据读取的序列性;再者就是循环的尾指针应该由订阅者自己来维护,因为每个订阅者处理的速度是不一样的。
  • 如此一来,所有数据的修改完全是由发布者来做的,也就是说对于订阅者来说,这是个无锁队列:

代码实现

#include <iostream>
#include <cstring>
#include <vector>
#include <functional>
#include <memory>
#include <sys/mman.h>
#include <atomic>
#include <thread>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>struct ShmData{bool written_;long timestamp_;size_t size_;char data_[1];ShmData():written_(false){}void Write(const char *data,const size_t len){written_ = false;memcpy(data_,data,len);size_ = len;timestamp_ = GetTimestamp();written_ = true;}bool Read(std::vector<char>* data,long* time = nullptr){if (!written_){return false;}if (time){*time = timestamp_;}data->resize(size_);memcpy(data->data(),data_,size_);return true;}static long GetTimestamp(){struct timespec ts;clock_gettime(CLOCK_REALTIME,&ts);return ts.tv_sec * 1000000 + ts.tv_nsec / 1000;}
};struct ShmQueue{size_t size_;int count_;int head_;char data_[1];ShmQueue(const size_t size,const int count):size_(sizeof(ShmData) + size),count_(count),head_(0){new(data_)ShmData;}void Write(const char* data,const size_t len){const int next = (head_ + 1) % count_;(reinterpret_cast<ShmData *>(data_ + next * size_))->Write(data,len);head_ = next;}bool Read(std::vector<char>*data,long* time){return (reinterpret_cast<ShmData *>(data_ + head_ * size_))->Read(data,time);}
};struct ShmSlice{int attached_;pthread_rwlock_t rwlock_;pthread_mutex_t mutex_;pthread_cond_t cond_;char data_[1];ShmSlice(const size_t size,const int count,const bool init = false){if (init){//init rwlockpthread_rwlockattr_t rwattr;pthread_rwlockattr_init(&rwattr);pthread_rwlockattr_setpshared(&rwattr,PTHREAD_PROCESS_SHARED);pthread_rwlock_init(&rwlock_,&rwattr);//init mutexpthread_mutexattr_t mattr;pthread_mutexattr_init(&mattr);pthread_mutexattr_setpshared(&mattr,PTHREAD_PROCESS_SHARED);pthread_mutex_init(&mutex_,&mattr);//init condition variablepthread_condattr_t cattr;pthread_condattr_init(&cattr);pthread_condattr_setpshared(&cattr,PTHREAD_PROCESS_SHARED);pthread_cond_init(&cond_,&cattr);//init shm queuenew(data_)ShmQueue(size,count);}LockWrite();if (init){attached_ = 1;} else{++attached_;}UnLockWrite();}~ShmSlice(){LockWrite();UnLockWrite();if (0 == attached_){pthread_cond_destroy(&cond_);pthread_mutex_destroy(&mutex_);pthread_rwlock_destroy(&rwlock_);}}int count(){LockRead();const int count = attached_;UnlockRead();return count;}void Write(const char* data,const size_t len){LockWrite();(reinterpret_cast<ShmQueue*>(data_))->Write(data,len);UnLockWrite();}bool Read(std::vector<char> *data,long* time){return (reinterpret_cast<ShmQueue *>(data_))->Read(data,time);}void LockWrite(){pthread_rwlock_wrlock(&rwlock_);}void UnLockWrite(){pthread_rwlock_unlock(&rwlock_);}void LockRead(){pthread_rwlock_rdlock(&rwlock_);}void UnlockRead(){pthread_rwlock_unlock(&rwlock_);}void LockMutex(){while (EOWNERDEAD == pthread_mutex_lock(&mutex_)){UnlockMutex();}}void UnlockMutex(){pthread_mutex_unlock(&mutex_);}void NotifyOne(){pthread_cond_signal(&cond_);}void NotifyAll(){pthread_cond_broadcast(&cond_);}void wait(){LockMutex();pthread_cond_wait(&cond_,&mutex_);UnlockMutex();}bool WaitFor(struct timespec *ts,const std::function<bool()>&cond){if (cond && cond()){return true;}LockMutex();pthread_cond_timedwait(&cond_,&mutex_,ts);UnlockMutex();bool ret;if (cond){ret = cond();} else{struct timespec now;clock_gettime(CLOCK_REALTIME,&now);ret = now.tv_sec < ts->tv_sec || (now.tv_sec == ts->tv_sec && now.tv_nsec <= ts->tv_nsec);}return ret;}
};class ShmManger{
public:ShmManger(std::string file_name,const int size): name_(std::move(file_name)),size_(sizeof(ShmSlice) + sizeof(ShmQueue) + 3 * (sizeof(ShmData) + size)){bool init = false;//open file descriptorint fd = open(name_.c_str(),O_RDWR | O_CREAT | O_EXCL,0600);if(fd < 0){fd = open(name_.c_str(),O_RDWR,0600);}else{//set file sizestruct stat fs;fstat(fd,&fs);if (fs.st_size < 1){ftruncate(fd,size_);}init = true;}//mmapvoid *shmaddr = mmap(NULL,size_,PROT_READ | PROT_WRITE,MAP_SHARED,fd,0);new (shmaddr) ShmSlice(size,3,init);auto deleter = [](ShmSlice *ptr){ptr->~ShmSlice();};slice_ = std::shared_ptr<ShmSlice>(reinterpret_cast<ShmSlice *>(shmaddr),deleter);close(fd);}~ShmManger(){running_ = false;slice_->NotifyAll();if (read_thread_.joinable()){read_thread_.join();}const int count = slice_->count();auto ptr = slice_.get();slice_.reset();if(count > 1){//unmapmunmap(ptr,size_);} else{//remove fileremove(name_.c_str());}}void Publish(const std::vector<char> &data){slice_->Write(data.data(),data.size());slice_->NotifyAll();}void Subscribe(std::function<void (const std::vector<char>&)>callback){callback = std::move(callback);running_ = true;read_thread_ = std::thread(&ShmManger::ReadThread,this);}private:void ReadThread(){long read_time = 0;while (running_){std::vector<char> data;long time;struct timespec ts;clock_gettime(CLOCK_REALTIME,&ts);ts.tv_sec += 5;if (!slice_->WaitFor(&ts,[&]{return slice_->Read(&data,&time) && time > read_time;})){continue;}read_time = time;//deal with datacallback_(data);}}std::string name_;int size_;std::shared_ptr<ShmSlice>slice_;std::function<void(const std::vector<char>&)>callback_;std::atomic_bool running_;std::thread read_thread_;
};
int main() {std::cout << "Hello, World!" << std::endl;return 0;
}

参考链接

  • 共享内存一写多读无锁实现
  • 共享内存消息队列
  • 【转载】同步和互斥的POSIX支持(互斥锁,条件变量,自旋锁)
  • Linux线程-互斥锁pthread_mutex_t
  • C语言open()函数:打开文件函数
  • C语言mmap()函数:建立内存映射

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/446803.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

想对你说的话,就在这里!

甜(Tu)言(Wei)蜜(Qing)语(Hua)最近在github上看到了一个朋友开发的 土味情话在线生成器 &#xff0c;感觉还不错&#xff0c;在这里推荐一下。 github地址&#xff1a;在线生成土味情话

linux读写文件 简单版

代码 //write void write_file(const std::string file_name){FILE *fp nullptr;fp fopen(file_name.c_str(),"w");fprintf(fp,"This is testing for mutex\n");fclose(fp); } //read void read_file(const std::string file_name){std::ifstream fp(fi…

具有中国风的传统颜色(炫酷)

一个小小的中国风的传统颜色&#xff0c;你觉得应该是什么样子的呢&#xff1f; 看了下面这个&#xff0c;我一个搞移动开发的都想去搞前端开发了。 废话不多说了&#xff0c;直接看效果&#xff1a; 访问地址&#xff1a;中国传统颜色手册 github地址&#xff1a;Chinese…

Android Studio安装问题及填坑

安装过程 安装Android Studio 其他问题 1.Android Studio出现Error:Unable to tunnel through proxy. Proxy returns “HTTP/1.1 400 Bad Request” 2.Could not resolve all artifacts for configuration :classpath 3.!No cached version of com.android.tools.build:gr…

Linux strtol将十六进制转化为十进制

代码 #include <iostream> #include "crypto_util.h"int get_file(const std::string file_name){size_t get_file_id 0;std::cout << hsm::common::get_md5_digest_hex(file_name) << std::endl;get_file_id strtol(reinterpret_cast<const…

Android WebView使用攻略

目录前言一、简介二、作用三、使用介绍1、Webview类常用方法1.1、加载url1.2、WebView的状态1.3、关于前进 / 后退网页1.4、清除缓存数据2、常用工具类2.1、WebSettings类2.2、WebViewClient类2.3、WebChromeClient类3、注意事项&#xff1a;如何避免WebView内存泄露&#xff1…

C++If与Switch语句

IF if语句不加括号就只是一个语句 举例: int a5,b2; if(a)//按逻辑值来理解,0为假,其他为真,这里等价于a!0—>a为真时 ab; else ba; 计算三角形面积代码 #include<iostream> #include<cmath>//数学公式库 #include<iomanip> //格式控制 using namesp…

linux fork多进程 demo

注释 使用系统调用fork()创建三个子进程&#xff1b;各个子进程显示和输出一些提示信息和自己的进程标识符&#xff1b;父进程显示自己的进程ID和一些提示信息&#xff0c;然后调用waitpid()等待多个子进程结束&#xff0c;并在子进程结束后显示输出提示信息表示程序结束。 代…

Android WebView 与 JS 交互

目录二、具体分析2.1 Android通过WebView调用 JS 代码方式1&#xff1a;通过WebView的loadUrl()方式2&#xff1a;通过WebView的evaluateJavascript()方法对比使用建议2.2、JS通过WebView调用 Android 代码2.2.1、方法分析方式1&#xff1a;通过 WebView的addJavascriptInterfa…

关于锁的注意事项

文件锁 Linux 提供了 fcntl 系统调用&#xff0c;可以锁定文件但是文件锁是和进程相关联的&#xff0c;一个进程中的多个线程/协程对同一个文件进行的锁操作会互相覆盖掉&#xff0c;从而无效。fcntl 创建的锁是建议性锁&#xff0c;只有写入的进程和读取的进程都遵循建议才有效…

安卓实现登录与注册界面

使用Intent与Bundle传递数据 登录界面login.xml 1.使用Relativelayout相对布局 <?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"mat…

Android Button字母自动全部大写的问题

两种解决方案&#xff1a; 方法一&#xff1a; 在 xml 布局中设置属性 android:textAllCaps"false" <Buttonandroid:layout_width"wrap_content"android:layout_height"match_parent"android:text"添加动作组"android:textAllCap…

安卓Activity与intent跳转

Activity生命周期 Activity启动模式 Intent跳转 _________startActivity() 1.Intent intentnew Intent(A.this,B.class); startActivity(intent); 2.startActivity(new Intent(A.this,B.class)); _________startActivityForResult() Intent intentnew Intent(A.this,B.class…

将读写锁放到共享内存中,实现进程之间对数据的读写访问控制

代码 #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h> #include <assert.h> #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <fstream> #include <…

Android WebView 使用漏洞

目录一、类型二、具体分析2.1、WebView任意代码执行漏洞2.1.1、addJavascriptInterface 接口引起远程代码执行漏洞漏洞产生原因解决方案关于该方法的其他细节总结2.1.2、searchBoxJavaBridge_接口引起远程代码执行漏洞漏洞产生原因解决方案2.1.3、accessibility和 accessibilit…

将读写锁放到共享内存,实现进程之间对于同一文件的读写操作

思路 将读写锁和读写锁的属性以及一个用于存储共享内存的地址的int型变量三者封装成一个struct结构将这个结构体放到共享内存中&#xff0c;以及将读写锁的属性设置成全局性质&#xff0c;然后使用这个属性初始化锁&#xff0c;以及将锁的地址关联到结构体的内存地址这个变量定…

Android Studio 查看页面布局层次结构

Android Studio有个可以查看手机上app页面布局层次结构的工具。可以协助我们对布局进行优化&#xff0c;去掉没有必要的节点等&#xff0c;通过这个工具可以清晰的看见页面整个结构&#xff1b;废话少说直接上图&#xff0c;再说过程。 这就是我们想要看到的&#xff0c;每个节…

Java web后端 第一章框架搭建

Redis 通用Mapper 通用Mapper->MyBatis动态SQL封装包,增删改查 0 SQL语句 PageHelper PageHelper–>实现分页操作,不需要limit,直接使用静态方法 电商系统技术特点 分布式(数据很多,一台电脑存储一部分数据) 高并发,集群(并发量很高,后台不只一个电脑) ,海量数据 主…

进程锁 读写文件的小例子 C++代码

代码 #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h> #include <cassert> #include <pthread.h> #include <cstdio> #include <cstdlib> #include <fstream> #include <io…

Java 中sleep()与wait()的区别

目录一、原理不同二、锁的处理机制不同三、使用的区域不同四、异常捕获不同五、总结一、原理不同 sleep()是Thread类的静态方法&#xff0c;是线程用来控制自身流程的&#xff0c;它会使此线程暂停执行指定的时间&#xff0c;而把执行机会让给其他的线程&#xff0c;等到计时时…