C++轻量级 线程间异步消息架构(向曾经工作的ROSA-RB以及共事的DOPRA的老兄弟们致敬)

1 啰嗦一番背景

这么多年,换着槽位做牛做马,没有什么钱途

手艺仍然很潮,唯有对于第一线的码农工作,孜孜不倦,其实没有啥进步,就是在不断地重复,刷熟练度,和同期的老兄弟们,那是千万不要有比较的想法,Q一把说各有各的活路。

业务线,技术线,管理线,通通都无成就。

唯有在一亩三分地上,辛勤耕耘,有了点成绩,聊以慰藉

拉回正题

当前部门的项目团队,并非纯粹嵌入式开发,偏向于ARM高性能芯片+unbuntu模式,中位机的模块代码加上UT code,前前后后加起来,11W行了,正式代码估计就4W左右,其他都工具或者UT。

UT的改善也是从加入部门后开始加速的,算是起到一定带领作用,新特性的merge同时,也要满足行覆盖率(分支覆盖率没有硬性要求,代码设计的够好,少一些if else就OK)。

目前软件中,一个比较大的问题是组件间只有API接口,一路开发下来,各个组件的API调用互相交织,解耦困难,千辛万苦将UT覆盖刷到超80个点,mock遍布,带来的副作用是UT文件的.o超大。

编出来的执行文件,超过1个多G,Windows上跑GCC交叉编译会OOM,只有VC+linux 两个环境上的UT可以编出来。

其实陷入了瓶颈。

接下来进入新一轮新特性开发期,老的架构已经到了差不多不得不优化的程度。

算是在当前团队的最后一个心愿吧,这不,解决方案除了在业务流程优化,软件架构改进上(主要就是SOLID原则),需要引入轻量级线程间异步消息架构。

并非线程池,单纯的异步消息,事件,定时器这三种基本的功能。

Java环境下,这都不是事,C++环境下,搜索了一遍,没有现成的。

boost有异步消息架构和定时器,简单的也玩过,感觉上我们用起来有些门槛,随着C++新标准不断引入新特性新功能,boost对于中小型代码,反而没有特别吃香的感觉。

Nokia开源的一款 event machine,牛逼是牛逼,门槛也高,代码行数都快抵得上我们当前的项目的一小半了,弃疗。

开源的一些找来找去,在简洁上,差点意思。

另外,线程间异步消息架构,是几乎所有稍微大型的一点的软件的刚需。

关键的关键,是没有现成的,这一点不得不口出脏话,tm什么年代了,这玩意还需要敝帚自珍,当个宝,就不能分享出来,让广大的农民工朋友有一个基本的参考或者可以鄙视的模版?

要自己开发了,回忆起了10多年前,Dopra的FID/PID初始化+消息框架的架构。

琢磨了一段时间,开动。

2 简单介绍线程间异步消息框架的概念

如下的讲述,懒得画图了,到处都可以搜索到,画出来也是知识垃圾。

a 异步消息很简单,

    1对1,C + message broker(桥接)+ C

b 事件,为 1对N(0...:n),发布和订阅模型

    P(publisher) + event broker + S(subscribers)

c 定时器

不搞callback这一套,和起临时线程(异步任务)没有太大区别

T(trigger) + timer engine + user(handler)

上述三种,全部共享一个handler(FID/PID框架中的MsgProc)

message broker + event broker + timer engine 归属到一个全局功能实例,齐活。

基础的架构设计完毕后,开始猛起,要做一个一天赚1000妹币的男人(混到最惨了有木有)

圈里面看到有老哥们6月27号(周四)拿了最后一次加班夜宵,岁月不饶人,一个有生命力的团队必然是一个不断推陈出新的团队。

老司机呆长的,不到一定层级,慢慢就成为重点优化对象。当时也是年轻气盛,13年拿了个杭研十佳程序员,年底被领导平衡绩效C,果断离了。

历程转了个大弯,这不,Dopra的一套消息架构,真香,试着东施效颦了。

3 和Dopra框架的一点区别

不出意外的必然有很大的区别,咱这个是缩水版(就开发了4天,连蒙带猜)

1 受限于个人水平,怎么简单怎么来,元编程玩不来,羞愧。

2 也没有单独的消息内存管理那一套(如果是大型项目,视情况而言,想要做好内存操作隔离,那还是需要的,memory sandbox概念也早就有了,有需要就加上)

消息也不用老式的 VOS_MsgHeader + paylaod这一套来管理(踩内存尾记忆尤深),通通简化,当前的项目并不需要多个coordinator之间搞起,咱就是单体。

用最土的union来描述基本消息结构(想要玩的开,搞成container的话,那还有很多备选,比如同OS内部 IPC,zmq,消息protobuf封装),想要怎么玩,几乎都玩得起了现在。

3 Dopra的FID/PID还负责启动时初始化(也可以参考RT-Thread一套初始化方式),笔者其实也准备用上,没有啥技术上的巧,接口预留,各位看官可以酌情自己添加。

当然得当然,都是tm指针在管理内存,修改上完全开放的。

10年了,该忘的不该忘的,差不多忘记,伴随着年级增加的,只有体重。

4 干货分享

声明: 笔者也是受益于互联网,各位看官下载源码后,可自由分发和修改,笔者天然放弃著作权(有部分源码参考自github  cpptime/cpptime.h at master · eglimi/cpptime · GitHub)。

笔者当前分享的是初稿,只跑了几个UT case,后续看情况,待完善后会更新。

笔者相信,当前的代码离商业级别应该有一些差距,给大家带来的不便也敬请原谅,就开发了4天,中间还不断给各种杂事打断。

4.1 源代码目录

.
├── include
│   ├── VOS_Def.h
│   ├── VOS_IHandler.h
│   ├── VOS_Interface.h
│   └── private
│       ├── VOS_InterfaceImpl.h
│       ├── VOS_MsgQueue.h
│       └── VOS_TimerImpl.h
├── src
│   ├── VOS_IHandler.cpp
│   ├── VOS_InterfaceImpl.cpp
│   └── VOS_TimerImpl.cpp
└── tst└── VOS_IHandler_test.cpp

4.2 头文件介绍

4.2.1 VOS_Def.h

该文件由用户定制(有部分基础结构建议不要修改,比如VosMessage这种),比如消息基础结构,消息ID,timer 类型,event id以及对应的携带参数

#ifndef VOS_DEF_H_
#define VOS_DEF_H_
#include <chrono>namespace VOS
{enum class VOS_MessageType
{VOS_SYNC_MSG,VOS_TIMER,VOS_EVENT,VOS_BOTTOM = 255
};enum class VOS_CompName
{COMP_A = 0,COMP_B = 1,COMP_C = 2,COMP_D = 3,COMP_E = 4,COMP_F = 5,COMP_AR = 6,COMP_Bottom = 7 // the last one, if you add new one,COMP_Bottom must increase
};// user need define own EventID
enum class VOS_EventID
{SUB_SAHA_INIT_START = 0,SUB_SAHA_INIT_READY = 1,EVENT_SAHA_SOMETHING_OK,EVENT_SAHA_SOMETHING_FAIL};struct REP_ID_DATA
{int programId;int assayNumber;int repIdx;
};typedef union
{REP_ID_DATA      repData;int              reserve_1;
} VOSEventArg;enum class VOSTimerType
{VOSTimerType_1,VOSTimerType_2,VOSTimerType_3,VOSTimerType_BOTTOM = 255
};struct VOS_TIMER_MSG
{VOSTimerType timerType;
};/*
message id
VOS_ENV_XXX_REQ
VOS_ENV_XXX_RESP
*/struct VOS_SYNC_MSG
{int msgId;//add your own structure
};struct VOS_EVENT_MSG
{VOS_EventID eventId;VOSEventArg arg;
};/** 消息中携带的结构可能多种多样,要兼容各种结构在实现上难度太大* 这里的轻量级消息结构,基本元素为一段固定大小的内存块,选用union结构* unon结构有限制,不同编译器可能不一样,最好在基层结构中不要用用户自定义的构造函数* 可参看 https://learn.microsoft.com/zh-cn/cpp/cpp/unions?view=msvc-170** union的一大特征在于,一个Union类中的所有数据共享同一段内存。* 如果union类的成员包含自己的构造函数,析构函数,* 那么同一Union类的成员在初始化时,就有可能会执行不同的构造函数。* 这是无法预料的。所以,我们在定义Union类时要尽量避免成员变量是对象(含有自己的构造函数)* 子结构中,最好不要包含自定义的一些类结构* 看下面 std::string 的报错*/
typedef union
{VOS_SYNC_MSG      xSyncMsg;VOS_TIMER_MSG     xTimerMsg;VOS_EVENT_MSG     xEventMsg;//std::string       test; error C2280: “quaero::VOS::VosMsgData::VosMsgData(void)”: 尝试引用已删除的函数
} VosMsgData;struct VosMessage
{VosMessage(VOS_MessageType msgType) :msgType_(msgType){}VOS_MessageType   msgType_;VosMsgData        msgData_;
};using timer_id = std::size_t;
using clock = std::chrono::steady_clock;
using timestamp = std::chrono::time_point<clock>;}#endif // VOS_DEF_H_

4.2.2 VOS_Interface.h

API 总入口,单实例,getInstance()方式调用,笔者懒得private构造函数,形式而已

设计上不完美,这两个接口是内部框架调用的,这里也暴露给了用户

    virtual void VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;virtual void VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;

#ifndef VOS_INTERFACE_H_
#define VOS_INTERFACE_H_
#include "VOS_Def.h"
#include <functional>namespace VOS
{class VOS_Interface
{
public:explicit VOS_Interface() = default;virtual ~VOS_Interface() = default;VOS_Interface(const VOS_Interface&) = delete;VOS_Interface(VOS_Interface&&) = delete;VOS_Interface& operator=(const VOS_Interface&) = delete;VOS_Interface& operator=(VOS_Interface&&) = delete;virtual void VOS_SendMsg(VOS_CompName name, VosMessage &message) = 0;virtual void VOS_PublishEvent(VOS_EventID eventId, VOSEventArg arg) = 0;virtual void VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;virtual void VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;virtual void VOS_RegisterInitCB(VOS_CompName name) = 0;using MsgHandler = std::function<void(const VosMessage &msg)>;// warning: called by inner framework, user should not call thisvirtual void VOS_AddMsgHandler(VOS_CompName name, MsgHandler&& eh) = 0;virtual void VOS_RemoveMsgHandler(VOS_CompName name)  = 0;virtual timer_id StartTimer(VOS_CompName name, VOSTimerType timerType, const timestamp &when, unsigned int interval = 0)= 0;virtual bool Canceltimer(timer_id id)= 0;static VOS_Interface& getInstance();};
}#endif // VOS_INTERFACE_H_

4.2.3 VOS_IHandler.h

每个需要处理消息的组件各自创建一个,参数就是VOS_CompName name

用户基本上只要继承,重载

virtual void handleMessage(const VosMessage &msg) = 0;

初始化完毕调用start() ,参看UT调用,并没有明确限制,可自行修改

void start();

具体如下


#ifndef VOS_IHANDLER_H_
#define VOS_IHANDLER_H_#include <memory>
#include "VOS_Def.h"namespace VOS
{class VOS_IHandler {
public:explicit VOS_IHandler(VOS_CompName name);virtual ~VOS_IHandler() = default;VOS_IHandler() = delete;VOS_IHandler(const VOS_IHandler&) = delete;VOS_IHandler(VOS_IHandler&&) = delete;VOS_IHandler& operator=(const VOS_IHandler&) = delete;VOS_IHandler& operator=(VOS_IHandler&&) = delete;void start();virtual void handleMessage(const VosMessage &msg) = 0;/* void VOS_IHandler::handleMessage(const VosMessage &msg){switch (msg.msgType_){case VOS_MessageType::VOS_TIMER://do somethingbreak;case VOS_MessageType::VOS_BOTTOM:default:break;}}*/protected:class Impl; std::shared_ptr<Impl> pimpl_;
};}
#endif /* VOS_IHANDLER_H_ */

接下来是内部实现

4.2.4 VOS_InterfaceImpl.h

该内部头文件为VOS_Interface.h 接口的具体实现类

#ifndef VOS_INTERFACE_IMPL_H_
#define VOS_INTERFACE_IMPL_H_
#include <memory>
#include <map>
#include <vector>
#include <thread>
#include <mutex>
#include "VOS_Interface.h"
#include "VOS_TimerImpl.h"namespace VOS
{
class VOS_InterfaceImpl : public VOS_Interface
{
public:VOS_InterfaceImpl();virtual ~VOS_InterfaceImpl();VOS_InterfaceImpl(const VOS_InterfaceImpl&) = delete;VOS_InterfaceImpl(VOS_InterfaceImpl&&) = delete;VOS_InterfaceImpl& operator=(const VOS_InterfaceImpl&) = delete;VOS_InterfaceImpl& operator=(VOS_InterfaceImpl&&) = delete;void VOS_SendMsg(VOS_CompName name, VosMessage &message) override;void VOS_PublishEvent(VOS_EventID eventId, VOSEventArg arg) override;void VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name) override;void VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name) override;void VOS_RegisterInitCB(VOS_CompName name) override;void VOS_AddMsgHandler(VOS_CompName name, MsgHandler&& eh) override;void VOS_RemoveMsgHandler(VOS_CompName name) override;timer_id StartTimer(VOS_CompName name, VOSTimerType timerType, const timestamp &when, unsigned int interval) override;bool Canceltimer(timer_id id) override;void VOS_SendTimerMsg(VOS_CompName name, VOSTimerType timerType);private:std::map<VOS_CompName, MsgHandler> handlers_;std::map<VOS_EventID, std::vector<VOS_CompName>> event_map_;std::shared_ptr<VOS_Timer> timers_;std::mutex interface_lock_;};}#endif // VOS_INTERFACE_IMPL_H_
4.2.5 VOS_MsgQueue.h

为安全队列的基本封装,有些冗余的代码,部分原子变量操作时不完全规范,大家可以用原生的atomic操作自行修改

#ifndef VOS_MSGQUEUE_H_
#define VOS_MSGQUEUE_H_#include <memory>
#include <atomic>
#include <queue>
#include <functional>
#include <iostream>
#include <condition_variable>#include "VOS_Def.h"namespace VOS
{using VOSQueueElementPtr = std::unique_ptr<VosMessage>;
using VOSQueueElements   = std::queue<VOSQueueElementPtr>;using SubscriberCb = std::function<void(const VosMessage &msg )>;class VOS_MsgQueue
{
public:explicit VOS_MsgQueue(){};~VOS_MsgQueue(){notify();};VOS_MsgQueue(const VOS_MsgQueue&) = delete;VOS_MsgQueue(VOS_MsgQueue&&) = delete;VOS_MsgQueue& operator=(const VOS_MsgQueue&) = delete;VOS_MsgQueue& operator=(VOS_MsgQueue&&) = delete;void addSubscriberCb(SubscriberCb Cb){subScriberCb = Cb;}void sendMessage(VOSQueueElementPtr msg){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);// if queue size > MAX_QUEUE_SIZE, that means VOS task might have some issue or might be blocked if(getRunningFlag() && (VOSQueue_messages_.size() < MAX_QUEUE_SIZE)){VOSQueue_messages_.push(std::move(msg));ready = true;lck.unlock();notify();}}void handleMessage(){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);cv.wait(lck, [this]() { return this->ready; });VOSQueueElements tmpVOSQueue;while(VOSQueue_messages_.size()){tmpVOSQueue.push(std::move(VOSQueue_messages_.front()));VOSQueue_messages_.pop();}ready = false;lck.unlock();while(tmpVOSQueue.size()){auto msg = std::move(tmpVOSQueue.front());tmpVOSQueue.pop();//do somethingsubScriberCb(*msg);}}int getQueueSize(){std::unique_lock<std::mutex> lock(VOSQueue_message_lock_);return static_cast<int>(VOSQueue_messages_.size());}void Stop(){{std::lock_guard<std::mutex> lk{VOSQueue_message_lock_};ready = true;}// set atomic flag to true and notify event handler threadnotify();}void clear(){notify();}void notify(){cv.notify_all();}void clearQueue(){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);while(VOSQueue_messages_.size()){VOSQueue_messages_.pop();}interfaceRunningFlag_ = false;}void setRunningFlag(bool flag){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);interfaceRunningFlag_ = flag;}bool getRunningFlag(){return interfaceRunningFlag_;}private:const int MAX_QUEUE_SIZE = 500;VOSQueueElements VOSQueue_messages_;SubscriberCb subScriberCb;std::mutex VOSQueue_message_lock_;std::condition_variable cv;std::atomic<bool> interfaceRunningFlag_ = false;bool ready = false;std::atomic_flag synFlag_ = ATOMIC_FLAG_INIT;
};}#endif // VOS_MSGQUEUE_H_
4.2.6 VOS_TimerImpl.h

这个稍微复杂,参考代码来自网页cpptime/cpptime.h at master · eglimi/cpptime · GitHub


#ifndef VOS_TIMERIMPL_H_
#define VOS_TIMERIMPL_H_#include "VOS_Def.h"
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <set>
#include <stack>
#include <thread>
#include <vector>namespace VOS
{using TimerSubscriberCb = std::function<void(VOS_CompName name, VOSTimerType timerType)>;// Public typesusing duration = std::chrono::microseconds;// The event structure that holds the information about a timer.
struct Event
{timer_id id;timestamp start;duration period;VOS_CompName name;VOSTimerType timerType;bool valid;Event(): id(0), start(duration::zero()),period(duration::zero()),name(VOS_CompName::COMP_Bottom),timerType(VOSTimerType::VOSTimerType_BOTTOM),valid(false){}Event(timer_id id,timestamp start,duration period,VOS_CompName name,VOSTimerType timerType): id(id),start(start),period(period),name(name),timerType(timerType),valid(true){}Event(Event &&r) = default;Event &operator=(Event &&ev) = default;Event(const Event &r) = delete;Event &operator=(const Event &r) = delete;
};// A time event structure that holds the next timeout and a reference to its
// Event struct.
struct Time_event
{timestamp next;timer_id ref;
};inline bool operator<(const Time_event &l, const Time_event &r)
{return l.next < r.next;
}class VOS_Timer
{using scoped_m = std::unique_lock<std::mutex>;public:VOS_Timer();~VOS_Timer();// user should register first and then startvoid addSubscriberCb(TimerSubscriberCb Cb){subScriberCb = Cb;}void start(){setRunningFlag(true);threadForTimer();}timer_id StartTimer(VOS_CompName name, VOSTimerType timerType, const timestamp &when, unsigned int interval = 0){std::chrono::duration<int, std::ratio<1, 1>> period(interval);return add(when,name,timerType,period);}bool Canceltimer(timer_id id){return remove(id);}/*** Add a new timer.** \param when The time at which the handler is invoked.* \param handler The callable that is invoked when the timer fires.* \param period The periodicity at which the timer fires. Only used for* periodic timers.*/timer_id add(const timestamp &when,  VOS_CompName name, VOSTimerType timerType,const duration &period = duration::zero()){scoped_m lock(m);timer_id id = 0;// Add a new event. Prefer an existing and free id. If none is// available, add a new one.if (free_ids.empty()){id = events.size();Event e(id, when, period, name, timerType);events.push_back(std::move(e));}else{id = free_ids.top();free_ids.pop();Event e(id, when, period, name, timerType);events[id] = std::move(e);}time_events.insert(Time_event{when, id});lock.unlock();cond.notify_all();return id;}/*** Overloaded `add` function that uses a `std::chrono::duration` instead of* a `time_point` for the first timeout.*/template <class Rep, class Period>inline timer_id add(const std::chrono::duration<Rep, Period> &when,VOS_CompName name, VOSTimerType timerType,const duration &period = duration::zero()){return add(clock::now() +std::chrono::duration_cast<std::chrono::microseconds>(when),name, timerType, period);}/*** Overloaded `add` function that uses a uint64_t instead of a `time_point`* for the first timeout and the period.*/inline timer_id add(const uint64_t when, VOS_CompName name, VOSTimerType timerType,const uint64_t period = 0){return add(duration(when), name, timerType, duration(period));}/*** Removes the timer with the given id.*/bool remove(timer_id id){scoped_m lock(m);if (events.size() == 0 || events.size() <= id){return false;}events[id].valid = false;events[id].name = VOS_CompName::COMP_Bottom;events[id].timerType = VOSTimerType::VOSTimerType_BOTTOM;auto it =std::find_if(time_events.begin(), time_events.end(),[&](const Time_event &te) { return te.ref == id; });if (it != time_events.end()){free_ids.push(it->ref);time_events.erase(it);}lock.unlock();cond.notify_all();return true;}protected:TimerSubscriberCb subScriberCb;void setRunningFlag(bool flag);void release();void timerEngine();void threadForTimer();bool loopCondition();
private: // Thread and locking variables.std::mutex m;std::condition_variable cond;// The vector that holds all active events.std::vector<Event> events;// Sorted queue that has the next timeout at its top.std::multiset<Time_event> time_events;// A list of ids to be re-used. If possible, ids are used from this pool.std::stack<timer_id> free_ids;std::atomic<bool> runningFlag_ = true;std::thread timer_thread_;};} // namespace VOS#endif /* VOS_TIMERIMPL_H_ */

4.3 src下 cpp实现

4.3.1 VOS_IHandler.cpp

#include <iostream>
#include "VOS_IHandler.h"
#include "VOS_MsgQueue.h"
#include "VOS_Interface.h"namespace VOS
{class VOS_IHandler::Impl {
public:explicit Impl(VOS_CompName name): Impl(name, std::make_shared<VOS_MsgQueue>(), VOS_Interface::getInstance()){}Impl(VOS_CompName name,std::shared_ptr<VOS_MsgQueue> msgQueue,VOS_Interface& vos_instance): name_(name),msgQueue_(msgQueue),vos_instance_(vos_instance){}~Impl(){release();}Impl() = delete;Impl(const Impl&) = delete;Impl(Impl&&) = delete;Impl& operator=(const Impl&) = delete;Impl& operator=(Impl&&) = delete;void sendMsg(const VosMessage &msg){auto element = std::make_unique<VosMessage>(msg);msgQueue_->sendMessage(std::move(element));}void start(SubscriberCb&& Cb){msgQueue_->addSubscriberCb(std::forward<SubscriberCb>(Cb));msgQueue_->setRunningFlag(true);initAsyncMessageTask();vos_instance_.VOS_AddMsgHandler(name_, [=](const VosMessage &msg){this->sendMsg(msg);});}protected:void initAsyncMessageTask();void release();void setRunningFlag(bool flag);bool loopCondition();void threadForEventHandler();void eventHandler();bool runningFlag_ = false;std::thread event_handler_thread_;VOS_CompName name_;std::shared_ptr<VOS_MsgQueue> msgQueue_;VOS_Interface& vos_instance_;
};bool VOS_IHandler::Impl::loopCondition()
{return runningFlag_;
}void VOS_IHandler::Impl::eventHandler()
{    while (loopCondition()){try{msgQueue_->handleMessage();}catch(const std::exception& e){return;}catch (...){return;}}
}void VOS_IHandler::Impl::threadForEventHandler()
{if(!event_handler_thread_.joinable()){event_handler_thread_ = std::thread([this](){this->eventHandler();});}
}void VOS_IHandler::Impl::setRunningFlag(bool flag)
{runningFlag_ = flag;
}
void VOS_IHandler::Impl::initAsyncMessageTask()
{setRunningFlag(true);threadForEventHandler();
}void VOS_IHandler::Impl::release()
{vos_instance_.VOS_RemoveMsgHandler(name_);setRunningFlag(false);if(event_handler_thread_.joinable()){msgQueue_->Stop();event_handler_thread_.join();}
}VOS_IHandler::VOS_IHandler(VOS_CompName name): pimpl_( std::make_unique<VOS_IHandler::Impl>(name)  )
{}void  VOS_IHandler::start()
{auto handler = [this](const VosMessage &msg){this->handleMessage(msg);};pimpl_->start(handler);
}}
4.3.2 VOS_InterfaceImpl.cpp
#include "VOS_InterfaceImpl.h"
#include <algorithm>namespace VOS
{VOS_InterfaceImpl::VOS_InterfaceImpl(): timers_(std::make_shared<VOS_Timer>())
{timers_->addSubscriberCb([this](VOS_CompName name, VOSTimerType timerType){this->VOS_SendTimerMsg(name, timerType);});timers_->start();
}
VOS_InterfaceImpl:: ~VOS_InterfaceImpl()
{timers_.reset();
}void VOS_InterfaceImpl::VOS_AddMsgHandler(VOS_CompName name, MsgHandler&& eh)
{if(!eh){return;}std::unique_lock<std::mutex> lck(interface_lock_);if (handlers_.find(name) != handlers_.end()){handlers_[name] = std::move(eh);}else{handlers_.insert(std::make_pair(name, std::forward<MsgHandler>(eh)));}    
}void VOS_InterfaceImpl::VOS_RemoveMsgHandler(VOS_CompName name)
{std::unique_lock<std::mutex> lck(interface_lock_);auto handler = [](const VosMessage &){};if (handlers_.find(name) != handlers_.end()){handlers_[name] = std::move(handler);}}void VOS_InterfaceImpl::VOS_SendMsg(VOS_CompName name, VosMessage &message)
{std::unique_lock<std::mutex> lck(interface_lock_);auto iter = handlers_.find(name);if(iter != handlers_.end()){iter->second(message);}//handlers_[name](message);
}void VOS_InterfaceImpl::VOS_SendTimerMsg(VOS_CompName name, VOSTimerType timerType)
{VosMessage message(VOS_MessageType::VOS_TIMER);message.msgData_.xTimerMsg.timerType = timerType;VOS_SendMsg(name, message);
}void VOS_InterfaceImpl::VOS_PublishEvent(VOS_EventID eventId, VOSEventArg arg)
{std::unique_lock<std::mutex> lck(interface_lock_);auto map_iter = event_map_.find(eventId);if(map_iter != event_map_.end()){std::for_each(map_iter->second.begin(),  map_iter->second.end(), [eventId, arg, this](VOS_CompName name){VosMessage message(VOS_MessageType::VOS_EVENT);message.msgData_.xEventMsg.eventId = static_cast<VOS_EventID> (eventId);message.msgData_.xEventMsg.arg = arg;this->VOS_SendMsg(name, message);});}
}void VOS_InterfaceImpl::VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name)
{std::unique_lock<std::mutex> lck(interface_lock_);auto map_iter = event_map_.find(eventId);if(map_iter != event_map_.end()){auto vec_iter = std::find(map_iter->second.begin(),  map_iter->second.end(), name);//already exist, returnif(vec_iter != map_iter->second.end()){return;}// push back new oneelse{map_iter->second.push_back(name);}}else{event_map_[eventId].push_back(name);}return;
}void VOS_InterfaceImpl::VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name)
{std::unique_lock<std::mutex> lck(interface_lock_);auto map_iter = event_map_.find(eventId);if(map_iter != event_map_.end()){auto vec_iter = std::find(map_iter->second.begin(),  map_iter->second.end(), name);//already exist, returnif(vec_iter != map_iter->second.end()){map_iter->second.erase(vec_iter);return;}}return;
}void VOS_InterfaceImpl::VOS_RegisterInitCB(VOS_CompName name)
{}timer_id VOS_InterfaceImpl::StartTimer(VOS_CompName name, VOSTimerType timerType, const timestamp &when, unsigned int interval)
{std::unique_lock<std::mutex> lck(interface_lock_);return timers_->StartTimer(name, timerType, when, interval );
}bool VOS_InterfaceImpl::Canceltimer(timer_id id)
{std::unique_lock<std::mutex> lck(interface_lock_);return timers_->Canceltimer(id);
}VOS_Interface& VOS_Interface::getInstance()
{static VOS_InterfaceImpl instance;return instance;
}}
4..3.3 VOS_TimerImpl.cpp
#include <algorithm>
#include "VOS_TimerImpl.h"namespace VOS
{VOS_Timer::VOS_Timer() : m{}, cond{},  events{}, time_events{}, free_ids{}
{}VOS_Timer:: ~VOS_Timer()
{release();
}void VOS_Timer::setRunningFlag(bool flag)
{runningFlag_ = flag;
}
void VOS_Timer::release()
{setRunningFlag(false);if(timer_thread_.joinable()){cond.notify_all();timer_thread_.join();}events.clear();time_events.clear();while (!free_ids.empty()){free_ids.pop();}
}bool VOS_Timer::loopCondition()
{return runningFlag_;
}void VOS_Timer::timerEngine()
{int i = 0;    while (loopCondition()){scoped_m lock(m);if (time_events.empty()){// Wait for workcond.wait(lock);}else{Time_event te = *time_events.begin();if (clock::now() >= te.next){// Remove time eventtime_events.erase(time_events.begin());// this is vital, remmber to addSubscriberCb firstlock.unlock();if(subScriberCb){subScriberCb(events[te.ref].name,events[te.ref].timerType);}lock.lock();if (events[te.ref].valid &&events[te.ref].period.count() > 0){// The event is valid and a periodic timer.te.next += events[te.ref].period;time_events.insert(te);}else{// The event is either no longer valid because it was// removed in the callback, or it is a one-shot timer.events[te.ref].valid = false;events[te.ref].name = VOS_CompName::COMP_Bottom;events[te.ref].timerType = VOSTimerType::VOSTimerType_BOTTOM;free_ids.push(te.ref);}}else{cond.wait_until(lock, te.next);}}}}void VOS_Timer::threadForTimer()
{if(!timer_thread_.joinable()){timer_thread_ = std::thread([this](){this->timerEngine();});}
}}

简单党员测试代码(VC2022 上 google test的配置见之前文章)

4.4  UT 简单测试

VOS_IHandler_test.cpp
#include <iostream>
#include <thread>
#include <string>#include "VOS_IHandler.h"
#include "VOS_Interface.h"#include "gmock/gmock.h"
#include "gtest/gtest.h"using ::testing::_;
using ::testing::NiceMock;
using ::testing::Return;
using namespace testing;namespace VOS
{class VOS_IHandlerWrapper : public VOS_IHandler
{
public:VOS_IHandlerWrapper(VOS_CompName name):VOS_IHandler(name){}void handleMessage(const VosMessage &msg){messagenum++;switch (msg.msgType_){case VOS_MessageType::VOS_SYNC_MSG:sync_msg = msg.msgData_.xSyncMsg;break;case VOS_MessageType::VOS_TIMER:timer_msg = msg.msgData_.xTimerMsg;break;case VOS_MessageType::VOS_BOTTOM:break;case VOS_MessageType::VOS_EVENT:event_msg = msg.msgData_.xEventMsg;break;default:break;}}void SleepForPeriod(int numberOfseconds){std::this_thread::sleep_for(std::chrono::seconds(numberOfseconds));}int messagenum = 0;VOS_EVENT_MSG event_msg;VOS_SYNC_MSG  sync_msg;VOS_TIMER_MSG timer_msg;};class VOS_IHandlerTest : public ::testing::Test
{public:void SetUp() override{messageHandler_ = std::make_shared<VOS_IHandlerWrapper>(VOS_CompName::COMP_AR);messageHandler_->start();}std::shared_ptr<VOS_IHandlerWrapper> messageHandler_;};TEST_F(VOS_IHandlerTest, VOS_SYNC_MSG_Process_OK)
{VosMessage message(VOS_MessageType::VOS_SYNC_MSG);message.msgData_.xSyncMsg.reserve_1 = 999;VOS_Interface::getInstance().VOS_SendMsg(VOS_CompName::COMP_AR, message);messageHandler_->SleepForPeriod(2);if(messageHandler_->messagenum != 0){EXPECT_EQ(messageHandler_->messagenum, 1);EXPECT_EQ(messageHandler_->sync_msg.reserve_1,  999);}
}TEST_F(VOS_IHandlerTest, VOS_PublishEvent_OK)
{VOS_Interface::getInstance().VOS_RegisterEvent( static_cast<VOS_EventID>(5), VOS_CompName::COMP_AR);VosMessage message(VOS_MessageType::VOS_EVENT);message.msgData_.xEventMsg.eventId = static_cast<VOS_EventID>(5);message.msgData_.xEventMsg.arg.repData={1,2,3};VOS_Interface::getInstance().VOS_SendMsg(VOS_CompName::COMP_AR, message);messageHandler_->SleepForPeriod(2);if(messageHandler_->messagenum != 0){EXPECT_EQ(messageHandler_->messagenum, 1);EXPECT_EQ(messageHandler_->event_msg.eventId,  static_cast<VOS_EventID>(5));EXPECT_EQ(messageHandler_->event_msg.arg.repData.programId,  1);EXPECT_EQ(messageHandler_->event_msg.arg.repData.assayNumber,  2);EXPECT_EQ(messageHandler_->event_msg.arg.repData.repIdx,  3);}VOS_Interface::getInstance().VOS_UnRegisterEvent( static_cast<VOS_EventID>(5), VOS_CompName::COMP_AR);
}TEST_F(VOS_IHandlerTest, VOS_TIMER_MSG_ONCE_TIMER_OK)
{auto timer_id = VOS_Interface::getInstance().StartTimer(VOS_CompName::COMP_AR, VOSTimerType::VOSTimerType_BOTTOM, clock::now());messageHandler_->SleepForPeriod(2);if(messageHandler_->messagenum != 0){EXPECT_EQ(messageHandler_->messagenum, 1);EXPECT_EQ(messageHandler_->timer_msg.timerType,  VOSTimerType::VOSTimerType_BOTTOM);}
}TEST_F(VOS_IHandlerTest, VOS_TIMER_MSG_PERIODIC_TIMER_OK)
{auto timer_id = VOS_Interface::getInstance().StartTimer(VOS_CompName::COMP_AR, VOSTimerType::VOSTimerType_BOTTOM, clock::now(), 1);messageHandler_->SleepForPeriod(2);if(messageHandler_->messagenum != 0){EXPECT_EQ(messageHandler_->timer_msg.timerType,  VOSTimerType::VOSTimerType_BOTTOM);}VOS_Interface::getInstance().Canceltimer(timer_id);
}}

先写到这里,无烟无酒无故事。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/37647.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[OtterCTF 2018]Recovery

里克必须找回他的文件&#xff01;用于加密文件的随机密码是什么 恢复他的文件 &#xff0c;感染的文件 &#xff1f; vmware-tray.ex 前面导出的3720.dmp 查找一下 搜索主机 strings -e l 3720.dmp | grep “WIN-LO6FAF3DTFE” 主机名 后面跟着一串 代码 aDOBofVYUNVnmp7 是不…

快速应用开发(RAD):加速软件开发的关键方法

目录 前言1. 快速应用开发的概念1.1 什么是快速应用开发&#xff1f;1.2 RAD与传统开发方法的对比 2. 快速应用开发的实施步骤2.1 需求分析与规划2.2 快速原型开发2.3 用户评估与反馈2.4 迭代开发与改进2.5 最终交付与维护 3. 快速应用开发的优点与应用场景3.1 优点3.2 应用场景…

微调Llama2自我认知

一、概述 最近在学习了解大模型微调相关的内容&#xff0c;在学习的过程中也遇到了很多问题&#xff0c;所以将自己的学习过程记录下来&#xff0c;希望对大模型微调感兴趣的小伙伴提供一点帮助&#xff0c;本文主要介绍一下如何通过SFT微调Llama2的自我认知&#xff0c;先看一…

Summaries

摘要是网格项&#xff0c;它利用聚合函数来显示有关所显示数据的摘要信息&#xff1a;总记录计数、最小值等。 GridControl-Grid View Summary Types 汇总 汇总总数&#xff08;GridSummaryItem&#xff09;是根据所有数据网格记录计算的&#xff0c;并显示在视图页脚中。启…

学懂C#编程:常用高级技术——学会泛型与集合(一)

C# 中的泛型和集合是两个非常重要的概念&#xff0c;它们极大地增强了代码的灵活性和可重用性。下面我将详细讲解这两个概念。 一、泛型 (Generics) 泛型允许你在定义类、接口、方法或委托时使用类型参数&#xff0c;从而使这些类型或方法可以在不指定具体类型的情况下…

【ACM出版-EI稳检索】第三届金融创新、金融科技与信息技术国际学术会议(FFIT 2024,7月26-28)

第三届金融创新、科技与信息技术国际学术会议&#xff08;FFIT 2024&#xff09;将于2024年07月26-28日于重庆举行。 FFIT2024 将围绕“金融创新”、"金融科技”与“信息技术”等相关最新研究领域&#xff0c;为来自国内外高等院校、科学研究所、企事业单位的专家、教授、…

第三阶段Spark

Spark和PySpark的介绍 PySpark的相关设置 安装PySpark库 pip install pyspark pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark 构建PySpark执行环境入口对象 # 导包 from pyspark import SparkConf, SparkContext# 创建SparkConf类对象 conf SparkConf()…

【代码随想录算法训练营第五十三天|739.每日温度、496.下一个更大元素I、503.下一个更大元素II】

文章目录 739.每日温度496.下一个更大元素I503.下一个更大元素II 739.每日温度 在这里单调栈相当于是在遍历数组的同时记录下了额外的大小关系的信息。因为这里需要的是每个数组元素右边第一个比他大的元素的间隔&#xff0c;因此单调栈是用来在遍历数组的时候存放还没有找到比…

算法题--华为od机试考试(整数对最小和、素数之积、找城市)

目录 整数对最小和 题目描述 注意 输出描述 示例1 输入 输出 说明 解析 答案 素数之积 题目描述 输入描述 输出描述 示例1 输入 输出 说明 示例2 输入 输出 说明 解析 找城市 题目描述 输入 输出 示例1 输入 输出 示例2 输入 输出 说明 解析…

PyTorch学习之torch.nn.functional.conv2d函数

PyTorch学习之torch.nn.functional.conv2d函数 一、简介 torch.nn.functional.conv2d 是 PyTorch 中用于进行二维卷积操作的函数。卷积操作是深度学习中卷积神经网络&#xff08;CNN&#xff09;的核心部分&#xff0c;用于提取图像特征&#xff0c;常见于图像分类、目标检测和…

Nvidia显卡GeForce Experience录屏操作流程

安装软件 首先我们从英伟达官网下载GeForce Experience程序&#xff0c;安装在电脑中GeForce Experience&#xff08;简称 GFE&#xff09;自动更新驱动并优化游戏设置 | NVIDIA 登录软件 安装完成后登录 开启录屏功能 登录后点击右上角的设置&#xff08;小齿轮图标&#x…

MT1595 点到矩形

题目 用下面的数据类型分别表示点和矩形&#xff1a; struct POINT { //点 int x, y; //坐标值x和y } ; struct RECT { //矩形 POINT lt, rb; //矩形的左上角和右下角 } ; 输入矩形两个点的坐标值x和y&#xff0c;再输入第3个点的坐标&#xff0c;计算第3个点距这个矩形的最近…

SQL面试真题解答 数据统计分析,求“同比、环比”等(SQL窗口函数使用)

SQL面试真题解答 数据统计分析&#xff0c;求“同比、环比”等&#xff08;SQL窗口函数使用&#xff09; 环比、环比增长率、同比、同比增长率&#xff0c;根据百度百科上的 说明&#xff1a; 环比增长率 环比增长率&#xff0c;一般是指和上期相比较的增长率。 环比增长率&a…

【linux】vim的使用

目录 一、Vim的基本模式 二、Vim的常见命令 三、Vim的高级用法 四、Vim的进阶使用技巧 在Linux系统中&#xff0c;Vim是一款功能强大的文本编辑器&#xff0c;特别适用于程序员的代码编辑和修改。以下是Vim的详细使用教程&#xff0c;包括其基本模式、常见命令和高级用法。…

什么是区块链与去中心化技术?

区块链和去中心化技术代表了当今数字世界中最前沿的创新。这些技术不仅重新定义了数据的管理和交换方式&#xff0c;还开启了全新的应用场景。本文将详细介绍区块链和去中心化技术&#xff0c;探讨它们的原理、特点以及应用。 一、区块链技术概述 1. 区块链的定义 区块链是一…

隐藏Python运行产生的缓存文件(__pycache__)

不少同学使用VScode 提交或运行python代码的时候&#xff0c;出现一些缓存文件 类似于(__pycache__) 这种&#xff0c;对于我这种有一丢丢强迫症的人来说&#xff0c;运行一次就得删除一次&#xff0c;那有没有什么办法将其隐藏的&#xff1f; 在vscode编辑器中打开设置&#…

HarmonyOS Next开发学习手册——创建轮播 (Swiper)

Swiper 组件提供滑动轮播显示的能力。Swiper本身是一个容器组件&#xff0c;当设置了多个子组件后&#xff0c;可以对这些子组件进行轮播显示。通常&#xff0c;在一些应用首页显示推荐的内容时&#xff0c;需要用到轮播显示的能力。 针对复杂页面场景&#xff0c;可以使用 Sw…

lua5.3.4的Linux的库文件下载地址

从这个链接选lua5.3.4 Lua Binaries (sourceforge.net) 进入-> 这个页面 LuaBinaries - Browse /5.3.4/Linux Libraries at SourceForge.net 之后就可以下载了。

第2章_开发板使用

文章目录 第2章 开发板使用2.1 硬件连接2.1.1 连接 ST-Link2.1.2 连接 USB 串口2.1.3 连接 SPI 屏 2.2 运行测试程序验证硬件2.2.1 硬件接线&#xff08;RS485、CAN&#xff09;2.2.2 编译工程2.2.3 配置调试器2.2.4 烧录运行 2.3 创建第 1 个工程2.3.1 创建工程2.3.2 选择调试…

深入理解Spring Boot中的自动配置原理

深入理解Spring Boot中的自动配置原理 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; Spring Boot 的自动配置是其核心特性之一&#xff0c;它极大地简化了 S…