C++简单实现消息队列

简介

消息队列是一种应用间的通讯方式,消息发送后可以立即放回,由消息系统来确保消息的可靠传递。消息发布者只需要将消息发布到消息队列中,而不需要管谁来取。消息使用者只管从消息队列中取消息而不管谁发布的。这样发布者和使用者都不同知道对方的存在。
消息队列普遍使用在生产者和消费者模型中。

  • 优点
  1. 应用解耦: 应用之间不用那么多的同步调用,发消息到消息队列就行,消费者可以自己消费,消费生产者不用管了,降低应用之间的耦合。
  2. 降低延时: 应用之间用同步调用,需要等待对方响应,等待时间比较长,用消息之后,发送消息到消息队列就行,应用就可以返回了,对客户来讲降低了应用延时。
  3. 削峰填谷:请求比较多的时候,应用处理不过来,会丢弃请求;请求比较少时,应用不饱和。
    请求比较多时,把请求放到消息队列,消费者按特定处理速度来处理,请求少时,也让应用有事情可以做;能做到忙时不丢请求,闲时不闲置应用资源。

主流的消息队列:Kafka、ActiveMQ、RabbitMQ、RocketMQ

下面使用C++实现一个简单的消息队列。

具体实现

  • 消息队列类
    里面主要包含一个数组和一个队列,都保存消息。
    应用从数组中拿消息处理,当数组满的时候,消息保存到队列中,当数组中消息处理完,从队列中取消息,再处理。
    数组实现的是一个环形数组,记录下消息写游标和读游标,超过数组大小,对数组大小取余。
//消息长度和消息
typedef std::pair<size_t, char*> msgPair;
#define MsgQueueSize 102400class zMsgQueue
{//消息对,first表示是否存放消息typedef std::pair<bool, msgPair> msgQueue;
public:zMsgQueue();~zMsgQueue();void* msgMalloc(const size_t len);void msgFree(void* p);//获得一个消息msgPair* get();//放入一个消息bool put(const void* msg, size_t msgLen);//将队列中的消息放到消息数组中bool putMsgQueue2Arr();//删除一个消息void erase();bool empty();bool msgArrEmpty();
private:void clear();// 保存正在处理的消息msgQueue msgArr_[MsgQueueSize];// 保存等待处理的消息std::queue<msgPair> msgQueue_;//消息写游标size_t queueWrite_;//消息读游标size_t queueRead_;
};

实现:

zMsgQueue::zMsgQueue()
{bzero(msgArr_, sizeof(msgArr_));queueWrite_ = 0;queueRead_ = 0;
}zMsgQueue::~zMsgQueue()
{clear();
}void* zMsgQueue::msgMalloc(const size_t len)
{char* p = (char*)malloc(len + 1);return (void*)(p + 1);
}void zMsgQueue::msgFree(void* p)
{free((char*)p - 1);
}//获得一个消息
msgPair* zMsgQueue::get()
{if(queueRead_ >= MsgQueueSize)return NULL;if(msgArrEmpty())putMsgQueue2Arr();msgPair* ret = NULL;if(msgArr_[queueRead_].first)ret = &msgArr_[queueRead_].second;return ret;
}//放入一个消息
bool zMsgQueue::put(const void* msg, size_t msgLen)
{char* buf = (char*)msgMalloc(msgLen);if(buf){bcopy(msg, buf, msgLen);//先将队列中的消息放到数组中//数组中还有位置直接放到数组中//没有位置放到队列中if(!putMsgQueue2Arr() && !msgArr_[queueWrite_].first){msgArr_[queueWrite_].first = true;msgArr_[queueWrite_].second.first = msgLen;msgArr_[queueWrite_].second.second = buf;queueWrite_++;queueWrite_ %= MsgQueueSize;}else {msgQueue_.push(std::make_pair(msgLen, buf));}return true;} return false;
}//将队列中的消息放到消息数组中
bool zMsgQueue::putMsgQueue2Arr()
{bool isLeft = false;while(!msgQueue_.empty()){if(!msgArr_[queueWrite_].first){msgArr_[queueWrite_].first = true;msgArr_[queueWrite_].second = msgQueue_.front();queueWrite_++;queueWrite_ %= MsgQueueSize;msgQueue_.pop();}else {isLeft = true;break;}}return isLeft;
}//删除一个消息
void zMsgQueue::erase()
{if(!msgArr_[queueRead_].first)return;msgFree(msgArr_[queueRead_].second.second);msgArr_[queueRead_].second.second = NULL;msgArr_[queueRead_].second.first = 0;msgArr_[queueRead_].first = false;queueRead_++;queueRead_ %= MsgQueueSize;
}void zMsgQueue::clear()
{//队列中还有消息while(putMsgQueue2Arr()){//数组中还有消息while(get()){erase();}}//数组中还有消息while(get()){erase();}
}bool zMsgQueue::empty()
{if(putMsgQueue2Arr()) return false;return msgArrEmpty();
}bool zMsgQueue::msgArrEmpty()
{if(queueRead_ == queueWrite_ && !msgArr_[queueRead_].first){return true;}return false;
}
  • 消息队列的封装
    对消息队列的封装主要是为了对消息进行解析和处理。
    消息解析和处理函数定义成了虚函数,当需要使用消息队列并处理消息时,只需要继承消息队列,然后重写虚函数,进行对应处理即可。
    类中还使用到了读写锁,当多线程的情况下,消息队列是一个临界资源,线程共享,需要进行上锁。单线程的情况下不需要加锁。
//T表示使用的消息队列
//msgT表示消息的类型,有的需要消息头,消息正文等,需要解析,这里是直接使用
template<class T=zMsgQueue, class msgT=char>
class messageQueue : public rwLocker
{
public:messageQueue(){}~messageQueue(){}bool putMsg(const msgT* msg, const size_t msgLen){rwLocker::wlock();msgQueue_.put(msg, msgLen);rwLocker::unlock();return true;}//解析消息,处理消息virtual bool msgParse(const msgT* msg, const size_t msgLen) = 0;//获取消息,解析消息,处理消息bool doCmd(){rwLocker::wlock();msgPair* msg = msgQueue_.get();while(msg){msgParse(msg->second, msg->first);  msgQueue_.erase();msg = msgQueue_.get();}rwLocker::unlock();return true;}bool empty(){return msgQueue_.empty();}private:T msgQueue_;
};
  • 读写锁的封装
    读写锁:可以多个线程进行读,只能一个线程进行写。写时独享资源,读时共享资源。写锁的优先级高。
  • 为什么读写锁需要读锁?

为了防止其他线程请求写锁。一个线程请求了读锁,其他线程在请求写锁会阻塞,但是请求读锁不会阻塞。一个线程请求了写锁,其他线程请求读锁和写锁都会阻塞。

#include <pthread.h>class rwLock 
{
public:rwLock(){pthread_rwlock_init(&rwlc_, NULL);}~rwLock(){pthread_rwlock_destroy(&rwlc_);}void rlock(){pthread_rwlock_rdlock(&rwlc_);}void wlock(){pthread_rwlock_wrlock(&rwlc_);}void unlock(){pthread_rwlock_unlock(&rwlc_);}private:pthread_rwlock_t rwlc_;
};class rwLocker
{
public:void rlock(){rwlc_.rlock();}void wlock(){rwlc_.wlock();}void unlock(){rwlc_.unlock();}private:rwLock rwlc_;
};
  • Makefile:
# ini1=main.cpp 
# in2=messageQueue.cpp
out=main 
cc=g++
std=-std=c++11 -lpthread#$(out):$(in1) $(in2)
$(out): main.cpp messageQueue.cpp rwlock.h$(cc) $^ -o $@ $(std).PHONY:clean
clean:rm -rf $(out)
  • 代码测试
    实现一个类继承消息队列,重写消息处理函数。
    定义对象,调用doCmd函数即可。
#include "messageQueue.h"class test : public messageQueue<>
{bool msgParse(const char* msg, const size_t msgLen){std::cout << msgLen << ":" << msg << std::endl;return true;}
};int main()
{//模拟客户端发送消息char buf[256] = "hello world!";test t;//消息队列放消息t.putMsg(buf, strlen(buf));//处理消息t.doCmd();return 0;
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/51243.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

21.发布确认模式-高级

问题 生产环境中由于一些不明原因&#xff0c;导致rabbitmq重启&#xff0c;在重启的期间生产者消息投递失败&#xff0c;导致消息丢失&#xff0c;需要手动处理恢复。那么如何才能进行rabbitmq的消息可靠性投递&#xff1f;特别是在极端的情况&#xff0c;rabbitmq集群不可用…

Python 教程(六):函数式编程

目录 专栏列表前言函数定义参数返回值 示例函数类型普通函数空函数匿名函数&#xff08;Lambda 函数&#xff09;嵌套函数函数装饰器高阶函数 函数参数位置参数默认参数可变位置参数可变关键字参数 函数属性和方法__name____doc__func.__dict__func.__defaults__func.__annotat…

黑马头条Day11- 实时计算热点文章、KafkaStream

一、今日内容 1. 定时计算与实时计算 2. 今日内容 KafkaStream 什么是流式计算KafkaStream概述KafkaStream入门案例SpringBoot集成KafkaStream 实时计算 用户行为发送消息KafkaStream聚合处理消息更新文章行为数量替换热点文章数据 二、实时流式计算 1. 概念 一般流式计…

4、Python+MySQL+Flask的文件管理系统【附源码,运行简单】

4、PythonMySQLFlask的文件管理系统【附源码&#xff0c;运行简单】 总览 1、《文件管理系统》1.1 方案设计说明书设计目标工具列表 2、详细设计2.1 登录2.2 注册2.3 个人中心界面2.4 文件上传界面2.5 其他功能贴图 3、下载 总览 自己做的项目&#xff0c;禁止转载&#xff0c…

UART 通信协议

文章目录 一 简介二 电平标准三 引脚定义四 数据格式五 波特率 一 简介 ​ UART (Universal Asynchronous Receiver/Transmitter)&#xff0c;通用异步收发器&#xff0c;是一种串行、异步、全双工通信协议。 串行&#xff1a;利用一条传输线&#xff0c;将数据一位一位地传送…

【七】Hadoop3.3.4基于ubuntu24的分布式集群安装

文章目录 1. 下载和准备工作1.1 安装包下载1.2 前提条件 2. 安装过程STEP 1: 解压并配置Hadoop选择环境变量添加位置的原则检查环境变量是否生效 STEP 2: 配置Hadoop2.1. 修改core-site.xml2.2. 修改hdfs-site.xml2.3. 修改mapred-site.xml2.4. 修改yarn-site.xml2.5. 修改hado…

引用的项目“xxxx/tsconfig.node.json”可能不会禁用发出。

vue3 报错&#xff1a; 引用的项目“xxxx/tsconfig.node.json”可能不会禁用发出。 解决&#xff1a; 进入对应的 json 文件&#xff1a; 修改&#xff1a; "noEmit": false 当 noEmit 设置为 false 时&#xff0c;TypeScript 编译器将根据项目配置生成相应的输出文…

基于Java的微博传播分析系统的设计与实现

1 项目介绍 1.1 摘要 本文致力于展示一项创新的微博传播分析系统设计与应用研究&#xff0c;该系统基于Java技术&#xff0c;巧妙利用大数据环境下的社交媒体——微博的庞大用户群及高度活跃特性&#xff0c;旨在深度探索信息传播的内在逻辑与社会影响机制。研究开篇明确定了…

OpenCV 灰度直方图

一 直方图的定义&#xff0c;意义和特征 1 定义 在统计学中&#xff0c;直方图是一种对数据分布情况的图形表示&#xff0c;是一种二维统计图表&#xff0c;他的两个坐标分别是统计样本&#xff08;图像、视频帧&#xff09;和样本的某种属性&#xff08;亮度&#xff0c;像素…

bugku-web-cookies

进来以后看到一个巨长的字符串, 源码同样,发现url后面是base64编码解码得keys.txt 还有一个line参数&#xff0c;修改并没有发生任何变化。我想不到要改keys.txt成index.php&#xff08;base64加密格式&#xff1a;aW5kZXgucGhw&#xff09; line1时&#xff1a; line2时&…

AcWing 802. 区间和

var说明add存储了插入操作&#xff0c;在指定 x x x下标所在位置 a [ x ] c a[x]c a[x]cquery是求 [ L , R ] [L,R] [L,R]区间和用到的数组,最后才用到alls 是存储离散化之后的值 , 对于会访问到的每个下标&#xff0c;统统丢到 a l l s 里面 &#xff0c;会把 x 和 [ L , R …

【Golang 面试 - 基础题】每日 5 题(七)

✍个人博客&#xff1a;Pandaconda-CSDN博客 &#x1f4e3;专栏地址&#xff1a;http://t.csdnimg.cn/UWz06 &#x1f4da;专栏简介&#xff1a;在这个专栏中&#xff0c;我将会分享 Golang 面试中常见的面试题给大家~ ❤️如果有收获的话&#xff0c;欢迎点赞&#x1f44d;收藏…

数据结构----算法复杂度

1.数据结构前言 数据是杂乱无章的&#xff0c;我们要借助结构将数据管理起来 1.1 数据结构 数据结构(Data Structure)是计算机存储、组织数据的⽅式&#xff0c;指相互之间存在⼀种或多种特定关系的数 据元素的集合。没有⼀种单⼀的数据结构对所有⽤途都有⽤&#xff0c;所…

查看路由表 netstat -r

“Kernel IP routing table” 是Linux系统中用于展示和配置IP路由的表。它告诉操作系统如何将数据包从一个网络接口发送到另一个网络或主机。下面是对您给出的路由表条目的解释&#xff1a; Destination&#xff1a;目的地地址&#xff0c;可以是具体的IP地址&#xff0c;也可…

ctfshow解题方法

171 172 爆库名->爆表名->爆字段名->爆字段值 -1 union select 1,database() ,3 -- //返回数据库名 -1 union select 1,2,group_concat(table_name) from information_schema.tables where table_schema库名 -- //获取数据库里的表名 -1 union select 1,group_concat(…

Python爬虫入门02:Fiddler下载使用教程

文章目录 手机抓包全攻略&#xff1a;Fiddler 工具深度解析引言Fiddler 工具简介为什么选择 Fiddler&#xff1f; 安装与配置 Fiddler步骤一&#xff1a;下载与安装步骤二&#xff1a;配置浏览器代理步骤三&#xff1a;安装 HTTPS 证书 配置手机以使用 Fiddler步骤一&#xff1…

操作系统面试知识点总结5

#来自ウルトラマンメビウス&#xff08;梦比优斯&#xff09; 1 IO管理概述 1.1 I/O 设备 I/O 设备的类型分类。 1.1.1 按使用特性 人机交互类外部设备&#xff0c;例如打印机、显示器等。存储设备&#xff0c;例如磁盘、光盘等。网络通信设备&#xff0c;例如网络接口等。 1…

【计算机网络】IP地址和子网掩码(IP地址篇)

个人主页&#xff1a;【&#x1f60a;个人主页】 系列专栏&#xff1a;【❤️计算机网络】 文章目录 前言IP地址网络地址网络地址的定义与组成作用分类网络地址的分配与管理 广播地址&#xff08;Broadcast Address&#xff09;定义构成类型作用注意事项 广播地址功能 组播地址…

HiveSQL题——炸裂+开窗

一、每个学科的成绩第一名是谁&#xff1f; 0 问题描述 基于学生成绩表输出每个科目的第一名是谁呢&#xff1f; 1 数据准备 with t1 as(selectzs as name,[{"Chinese":80},{"Math":70}],{"English"…

CompletableFuture使用详解

简单的任务&#xff0c;用Future获取结果还好&#xff0c;但我们并行提交的多个异步任务&#xff0c;往往并不是独立的&#xff0c;很多 时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。如果要我们手动用 Future 实现&#xff0c;是非常麻烦的。 CompletableFutur…