1 啰嗦一番背景
这么多年,换着槽位做牛做马,没有什么钱途
手艺仍然很潮,唯有对于第一线的码农工作,孜孜不倦,其实没有啥进步,就是在不断地重复,刷熟练度,和同期的老兄弟们,那是千万不要有比较的想法,Q一把说各有各的活路。
业务线,技术线,管理线,通通都无成就。
唯有在一亩三分地上,辛勤耕耘,有了点成绩,聊以慰藉
拉回正题
当前部门的项目团队,并非纯粹嵌入式开发,偏向于ARM高性能芯片+unbuntu模式,中位机的模块代码加上UT code,前前后后加起来,11W行了,正式代码估计就4W左右,其他都工具或者UT。
UT的改善也是从加入部门后开始加速的,算是起到一定带领作用,新特性的merge同时,也要满足行覆盖率(分支覆盖率没有硬性要求,代码设计的够好,少一些if else就OK)。
目前软件中,一个比较大的问题是组件间只有API接口,一路开发下来,各个组件的API调用互相交织,解耦困难,千辛万苦将UT覆盖刷到超80个点,mock遍布,带来的副作用是UT文件的.o超大。
编出来的执行文件,超过1个多G,Windows上跑GCC交叉编译会OOM,只有VC+linux 两个环境上的UT可以编出来。
其实陷入了瓶颈。
接下来进入新一轮新特性开发期,老的架构已经到了差不多不得不优化的程度。
算是在当前团队的最后一个心愿吧,这不,解决方案除了在业务流程优化,软件架构改进上(主要就是SOLID原则),需要引入轻量级线程间异步消息架构。
并非线程池,单纯的异步消息,事件,定时器这三种基本的功能。
Java环境下,这都不是事,C++环境下,搜索了一遍,没有现成的。
boost有异步消息架构和定时器,简单的也玩过,感觉上我们用起来有些门槛,随着C++新标准不断引入新特性新功能,boost对于中小型代码,反而没有特别吃香的感觉。
Nokia开源的一款 event machine,牛逼是牛逼,门槛也高,代码行数都快抵得上我们当前的项目的一小半了,弃疗。
开源的一些找来找去,在简洁上,差点意思。
另外,线程间异步消息架构,是几乎所有稍微大型的一点的软件的刚需。
关键的关键,是没有现成的,这一点不得不口出脏话,tm什么年代了,这玩意还需要敝帚自珍,当个宝,就不能分享出来,让广大的农民工朋友有一个基本的参考或者可以鄙视的模版?
要自己开发了,回忆起了10多年前,Dopra的FID/PID初始化+消息框架的架构。
琢磨了一段时间,开动。
2 简单介绍线程间异步消息框架的概念
如下的讲述,懒得画图了,到处都可以搜索到,画出来也是知识垃圾。
a 异步消息很简单,
1对1,C + message broker(桥接)+ C
b 事件,为 1对N(0...:n),发布和订阅模型
P(publisher) + event broker + S(subscribers)
c 定时器
不搞callback这一套,和起临时线程(异步任务)没有太大区别
T(trigger) + timer engine + user(handler)
上述三种,全部共享一个handler(FID/PID框架中的MsgProc)
message broker + event broker + timer engine 归属到一个全局功能实例,齐活。
基础的架构设计完毕后,开始猛起,要做一个一天赚1000妹币的男人(混到最惨了有木有)
圈里面看到有老哥们6月27号(周四)拿了最后一次加班夜宵,岁月不饶人,一个有生命力的团队必然是一个不断推陈出新的团队。
老司机呆长的,不到一定层级,慢慢就成为重点优化对象。当时也是年轻气盛,13年拿了个杭研十佳程序员,年底被领导平衡绩效C,果断离了。
历程转了个大弯,这不,Dopra的一套消息架构,真香,试着东施效颦了。
3 和Dopra框架的一点区别
不出意外的必然有很大的区别,咱这个是缩水版(就开发了4天,连蒙带猜)
1 受限于个人水平,怎么简单怎么来,元编程玩不来,羞愧。
2 也没有单独的消息内存管理那一套(如果是大型项目,视情况而言,想要做好内存操作隔离,那还是需要的,memory sandbox概念也早就有了,有需要就加上)
消息也不用老式的 VOS_MsgHeader + paylaod这一套来管理(踩内存尾记忆尤深),通通简化,当前的项目并不需要多个coordinator之间搞起,咱就是单体。
用最土的union来描述基本消息结构(想要玩的开,搞成container的话,那还有很多备选,比如同OS内部 IPC,zmq,消息protobuf封装),想要怎么玩,几乎都玩得起了现在。
3 Dopra的FID/PID还负责启动时初始化(也可以参考RT-Thread一套初始化方式),笔者其实也准备用上,没有啥技术上的巧,接口预留,各位看官可以酌情自己添加。
当然得当然,都是tm指针在管理内存,修改上完全开放的。
10年了,该忘的不该忘的,差不多忘记,伴随着年级增加的,只有体重。
4 干货分享
声明: 笔者也是受益于互联网,各位看官下载源码后,可自由分发和修改,笔者天然放弃著作权(有部分源码参考自github cpptime/cpptime.h at master · eglimi/cpptime · GitHub)。
笔者当前分享的是初稿,只跑了几个UT case,后续看情况,待完善后会更新。
笔者相信,当前的代码离商业级别应该有一些差距,给大家带来的不便也敬请原谅,就开发了4天,中间还不断给各种杂事打断。
4.1 源代码目录
.
├── include
│ ├── VOS_Def.h
│ ├── VOS_IHandler.h
│ ├── VOS_Interface.h
│ └── private
│ ├── VOS_InterfaceImpl.h
│ ├── VOS_MsgQueue.h
│ └── VOS_TimerImpl.h
├── src
│ ├── VOS_IHandler.cpp
│ ├── VOS_InterfaceImpl.cpp
│ └── VOS_TimerImpl.cpp
└── tst└── VOS_IHandler_test.cpp
4.2 头文件介绍
4.2.1 VOS_Def.h
该文件由用户定制(有部分基础结构建议不要修改,比如VosMessage这种),比如消息基础结构,消息ID,timer 类型,event id以及对应的携带参数
#ifndef VOS_DEF_H_
#define VOS_DEF_H_
#include <chrono>namespace VOS
{enum class VOS_MessageType
{VOS_SYNC_MSG,VOS_TIMER,VOS_EVENT,VOS_BOTTOM = 255
};enum class VOS_CompName
{COMP_A = 0,COMP_B = 1,COMP_C = 2,COMP_D = 3,COMP_E = 4,COMP_F = 5,COMP_AR = 6,COMP_Bottom = 7 // the last one, if you add new one,COMP_Bottom must increase
};// user need define own EventID
enum class VOS_EventID
{SUB_SAHA_INIT_START = 0,SUB_SAHA_INIT_READY = 1,EVENT_SAHA_SOMETHING_OK,EVENT_SAHA_SOMETHING_FAIL};struct REP_ID_DATA
{int programId;int assayNumber;int repIdx;
};typedef union
{REP_ID_DATA repData;int reserve_1;
} VOSEventArg;enum class VOSTimerType
{VOSTimerType_1,VOSTimerType_2,VOSTimerType_3,VOSTimerType_BOTTOM = 255
};struct VOS_TIMER_MSG
{VOSTimerType timerType;
};/*
message id
VOS_ENV_XXX_REQ
VOS_ENV_XXX_RESP
*/struct VOS_SYNC_MSG
{int msgId;//add your own structure
};struct VOS_EVENT_MSG
{VOS_EventID eventId;VOSEventArg arg;
};/** 消息中携带的结构可能多种多样,要兼容各种结构在实现上难度太大* 这里的轻量级消息结构,基本元素为一段固定大小的内存块,选用union结构* unon结构有限制,不同编译器可能不一样,最好在基层结构中不要用用户自定义的构造函数* 可参看 https://learn.microsoft.com/zh-cn/cpp/cpp/unions?view=msvc-170** union的一大特征在于,一个Union类中的所有数据共享同一段内存。* 如果union类的成员包含自己的构造函数,析构函数,* 那么同一Union类的成员在初始化时,就有可能会执行不同的构造函数。* 这是无法预料的。所以,我们在定义Union类时要尽量避免成员变量是对象(含有自己的构造函数)* 子结构中,最好不要包含自定义的一些类结构* 看下面 std::string 的报错*/
typedef union
{VOS_SYNC_MSG xSyncMsg;VOS_TIMER_MSG xTimerMsg;VOS_EVENT_MSG xEventMsg;//std::string test; error C2280: “quaero::VOS::VosMsgData::VosMsgData(void)”: 尝试引用已删除的函数
} VosMsgData;struct VosMessage
{VosMessage(VOS_MessageType msgType) :msgType_(msgType){}VOS_MessageType msgType_;VosMsgData msgData_;
};using timer_id = std::size_t;
using clock = std::chrono::steady_clock;
using timestamp = std::chrono::time_point<clock>;}#endif // VOS_DEF_H_
4.2.2 VOS_Interface.h
API 总入口,单实例,getInstance()方式调用,笔者懒得private构造函数,形式而已
设计上不完美,这两个接口是内部框架调用的,这里也暴露给了用户
virtual void VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;virtual void VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;
#ifndef VOS_INTERFACE_H_
#define VOS_INTERFACE_H_
#include "VOS_Def.h"
#include <functional>namespace VOS
{class VOS_Interface
{
public:explicit VOS_Interface() = default;virtual ~VOS_Interface() = default;VOS_Interface(const VOS_Interface&) = delete;VOS_Interface(VOS_Interface&&) = delete;VOS_Interface& operator=(const VOS_Interface&) = delete;VOS_Interface& operator=(VOS_Interface&&) = delete;virtual void VOS_SendMsg(VOS_CompName name, VosMessage &message) = 0;virtual void VOS_PublishEvent(VOS_EventID eventId, VOSEventArg arg) = 0;virtual void VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;virtual void VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name) = 0;virtual void VOS_RegisterInitCB(VOS_CompName name) = 0;using MsgHandler = std::function<void(const VosMessage &msg)>;// warning: called by inner framework, user should not call thisvirtual void VOS_AddMsgHandler(VOS_CompName name, MsgHandler&& eh) = 0;virtual void VOS_RemoveMsgHandler(VOS_CompName name) = 0;virtual timer_id StartTimer(VOS_CompName name, VOSTimerType timerType, const timestamp &when, unsigned int interval = 0)= 0;virtual bool Canceltimer(timer_id id)= 0;static VOS_Interface& getInstance();};
}#endif // VOS_INTERFACE_H_
4.2.3 VOS_IHandler.h
每个需要处理消息的组件各自创建一个,参数就是VOS_CompName name
用户基本上只要继承,重载
virtual void handleMessage(const VosMessage &msg) = 0;
初始化完毕调用start() ,参看UT调用,并没有明确限制,可自行修改
void start();
具体如下
#ifndef VOS_IHANDLER_H_
#define VOS_IHANDLER_H_#include <memory>
#include "VOS_Def.h"namespace VOS
{class VOS_IHandler {
public:explicit VOS_IHandler(VOS_CompName name);virtual ~VOS_IHandler() = default;VOS_IHandler() = delete;VOS_IHandler(const VOS_IHandler&) = delete;VOS_IHandler(VOS_IHandler&&) = delete;VOS_IHandler& operator=(const VOS_IHandler&) = delete;VOS_IHandler& operator=(VOS_IHandler&&) = delete;void start();virtual void handleMessage(const VosMessage &msg) = 0;/* void VOS_IHandler::handleMessage(const VosMessage &msg){switch (msg.msgType_){case VOS_MessageType::VOS_TIMER://do somethingbreak;case VOS_MessageType::VOS_BOTTOM:default:break;}}*/protected:class Impl; std::shared_ptr<Impl> pimpl_;
};}
#endif /* VOS_IHANDLER_H_ */
接下来是内部实现
4.2.4 VOS_InterfaceImpl.h
该内部头文件为VOS_Interface.h 接口的具体实现类
#ifndef VOS_INTERFACE_IMPL_H_
#define VOS_INTERFACE_IMPL_H_
#include <memory>
#include <map>
#include <vector>
#include <thread>
#include <mutex>
#include "VOS_Interface.h"
#include "VOS_TimerImpl.h"namespace VOS
{
class VOS_InterfaceImpl : public VOS_Interface
{
public:VOS_InterfaceImpl();virtual ~VOS_InterfaceImpl();VOS_InterfaceImpl(const VOS_InterfaceImpl&) = delete;VOS_InterfaceImpl(VOS_InterfaceImpl&&) = delete;VOS_InterfaceImpl& operator=(const VOS_InterfaceImpl&) = delete;VOS_InterfaceImpl& operator=(VOS_InterfaceImpl&&) = delete;void VOS_SendMsg(VOS_CompName name, VosMessage &message) override;void VOS_PublishEvent(VOS_EventID eventId, VOSEventArg arg) override;void VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name) override;void VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name) override;void VOS_RegisterInitCB(VOS_CompName name) override;void VOS_AddMsgHandler(VOS_CompName name, MsgHandler&& eh) override;void VOS_RemoveMsgHandler(VOS_CompName name) override;timer_id StartTimer(VOS_CompName name, VOSTimerType timerType, const timestamp &when, unsigned int interval) override;bool Canceltimer(timer_id id) override;void VOS_SendTimerMsg(VOS_CompName name, VOSTimerType timerType);private:std::map<VOS_CompName, MsgHandler> handlers_;std::map<VOS_EventID, std::vector<VOS_CompName>> event_map_;std::shared_ptr<VOS_Timer> timers_;std::mutex interface_lock_;};}#endif // VOS_INTERFACE_IMPL_H_
4.2.5 VOS_MsgQueue.h
为安全队列的基本封装,有些冗余的代码,部分原子变量操作时不完全规范,大家可以用原生的atomic操作自行修改
#ifndef VOS_MSGQUEUE_H_
#define VOS_MSGQUEUE_H_#include <memory>
#include <atomic>
#include <queue>
#include <functional>
#include <iostream>
#include <condition_variable>#include "VOS_Def.h"namespace VOS
{using VOSQueueElementPtr = std::unique_ptr<VosMessage>;
using VOSQueueElements = std::queue<VOSQueueElementPtr>;using SubscriberCb = std::function<void(const VosMessage &msg )>;class VOS_MsgQueue
{
public:explicit VOS_MsgQueue(){};~VOS_MsgQueue(){notify();};VOS_MsgQueue(const VOS_MsgQueue&) = delete;VOS_MsgQueue(VOS_MsgQueue&&) = delete;VOS_MsgQueue& operator=(const VOS_MsgQueue&) = delete;VOS_MsgQueue& operator=(VOS_MsgQueue&&) = delete;void addSubscriberCb(SubscriberCb Cb){subScriberCb = Cb;}void sendMessage(VOSQueueElementPtr msg){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);// if queue size > MAX_QUEUE_SIZE, that means VOS task might have some issue or might be blocked if(getRunningFlag() && (VOSQueue_messages_.size() < MAX_QUEUE_SIZE)){VOSQueue_messages_.push(std::move(msg));ready = true;lck.unlock();notify();}}void handleMessage(){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);cv.wait(lck, [this]() { return this->ready; });VOSQueueElements tmpVOSQueue;while(VOSQueue_messages_.size()){tmpVOSQueue.push(std::move(VOSQueue_messages_.front()));VOSQueue_messages_.pop();}ready = false;lck.unlock();while(tmpVOSQueue.size()){auto msg = std::move(tmpVOSQueue.front());tmpVOSQueue.pop();//do somethingsubScriberCb(*msg);}}int getQueueSize(){std::unique_lock<std::mutex> lock(VOSQueue_message_lock_);return static_cast<int>(VOSQueue_messages_.size());}void Stop(){{std::lock_guard<std::mutex> lk{VOSQueue_message_lock_};ready = true;}// set atomic flag to true and notify event handler threadnotify();}void clear(){notify();}void notify(){cv.notify_all();}void clearQueue(){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);while(VOSQueue_messages_.size()){VOSQueue_messages_.pop();}interfaceRunningFlag_ = false;}void setRunningFlag(bool flag){std::unique_lock<std::mutex> lck(VOSQueue_message_lock_);interfaceRunningFlag_ = flag;}bool getRunningFlag(){return interfaceRunningFlag_;}private:const int MAX_QUEUE_SIZE = 500;VOSQueueElements VOSQueue_messages_;SubscriberCb subScriberCb;std::mutex VOSQueue_message_lock_;std::condition_variable cv;std::atomic<bool> interfaceRunningFlag_ = false;bool ready = false;std::atomic_flag synFlag_ = ATOMIC_FLAG_INIT;
};}#endif // VOS_MSGQUEUE_H_
4.2.6 VOS_TimerImpl.h
这个稍微复杂,参考代码来自网页cpptime/cpptime.h at master · eglimi/cpptime · GitHub
#ifndef VOS_TIMERIMPL_H_
#define VOS_TIMERIMPL_H_#include "VOS_Def.h"
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <set>
#include <stack>
#include <thread>
#include <vector>namespace VOS
{using TimerSubscriberCb = std::function<void(VOS_CompName name, VOSTimerType timerType)>;// Public typesusing duration = std::chrono::microseconds;// The event structure that holds the information about a timer.
struct Event
{timer_id id;timestamp start;duration period;VOS_CompName name;VOSTimerType timerType;bool valid;Event(): id(0), start(duration::zero()),period(duration::zero()),name(VOS_CompName::COMP_Bottom),timerType(VOSTimerType::VOSTimerType_BOTTOM),valid(false){}Event(timer_id id,timestamp start,duration period,VOS_CompName name,VOSTimerType timerType): id(id),start(start),period(period),name(name),timerType(timerType),valid(true){}Event(Event &&r) = default;Event &operator=(Event &&ev) = default;Event(const Event &r) = delete;Event &operator=(const Event &r) = delete;
};// A time event structure that holds the next timeout and a reference to its
// Event struct.
struct Time_event
{timestamp next;timer_id ref;
};inline bool operator<(const Time_event &l, const Time_event &r)
{return l.next < r.next;
}class VOS_Timer
{using scoped_m = std::unique_lock<std::mutex>;public:VOS_Timer();~VOS_Timer();// user should register first and then startvoid addSubscriberCb(TimerSubscriberCb Cb){subScriberCb = Cb;}void start(){setRunningFlag(true);threadForTimer();}timer_id StartTimer(VOS_CompName name, VOSTimerType timerType, const timestamp &when, unsigned int interval = 0){std::chrono::duration<int, std::ratio<1, 1>> period(interval);return add(when,name,timerType,period);}bool Canceltimer(timer_id id){return remove(id);}/*** Add a new timer.** \param when The time at which the handler is invoked.* \param handler The callable that is invoked when the timer fires.* \param period The periodicity at which the timer fires. Only used for* periodic timers.*/timer_id add(const timestamp &when, VOS_CompName name, VOSTimerType timerType,const duration &period = duration::zero()){scoped_m lock(m);timer_id id = 0;// Add a new event. Prefer an existing and free id. If none is// available, add a new one.if (free_ids.empty()){id = events.size();Event e(id, when, period, name, timerType);events.push_back(std::move(e));}else{id = free_ids.top();free_ids.pop();Event e(id, when, period, name, timerType);events[id] = std::move(e);}time_events.insert(Time_event{when, id});lock.unlock();cond.notify_all();return id;}/*** Overloaded `add` function that uses a `std::chrono::duration` instead of* a `time_point` for the first timeout.*/template <class Rep, class Period>inline timer_id add(const std::chrono::duration<Rep, Period> &when,VOS_CompName name, VOSTimerType timerType,const duration &period = duration::zero()){return add(clock::now() +std::chrono::duration_cast<std::chrono::microseconds>(when),name, timerType, period);}/*** Overloaded `add` function that uses a uint64_t instead of a `time_point`* for the first timeout and the period.*/inline timer_id add(const uint64_t when, VOS_CompName name, VOSTimerType timerType,const uint64_t period = 0){return add(duration(when), name, timerType, duration(period));}/*** Removes the timer with the given id.*/bool remove(timer_id id){scoped_m lock(m);if (events.size() == 0 || events.size() <= id){return false;}events[id].valid = false;events[id].name = VOS_CompName::COMP_Bottom;events[id].timerType = VOSTimerType::VOSTimerType_BOTTOM;auto it =std::find_if(time_events.begin(), time_events.end(),[&](const Time_event &te) { return te.ref == id; });if (it != time_events.end()){free_ids.push(it->ref);time_events.erase(it);}lock.unlock();cond.notify_all();return true;}protected:TimerSubscriberCb subScriberCb;void setRunningFlag(bool flag);void release();void timerEngine();void threadForTimer();bool loopCondition();
private: // Thread and locking variables.std::mutex m;std::condition_variable cond;// The vector that holds all active events.std::vector<Event> events;// Sorted queue that has the next timeout at its top.std::multiset<Time_event> time_events;// A list of ids to be re-used. If possible, ids are used from this pool.std::stack<timer_id> free_ids;std::atomic<bool> runningFlag_ = true;std::thread timer_thread_;};} // namespace VOS#endif /* VOS_TIMERIMPL_H_ */
4.3 src下 cpp实现
4.3.1 VOS_IHandler.cpp
#include <iostream>
#include "VOS_IHandler.h"
#include "VOS_MsgQueue.h"
#include "VOS_Interface.h"namespace VOS
{class VOS_IHandler::Impl {
public:explicit Impl(VOS_CompName name): Impl(name, std::make_shared<VOS_MsgQueue>(), VOS_Interface::getInstance()){}Impl(VOS_CompName name,std::shared_ptr<VOS_MsgQueue> msgQueue,VOS_Interface& vos_instance): name_(name),msgQueue_(msgQueue),vos_instance_(vos_instance){}~Impl(){release();}Impl() = delete;Impl(const Impl&) = delete;Impl(Impl&&) = delete;Impl& operator=(const Impl&) = delete;Impl& operator=(Impl&&) = delete;void sendMsg(const VosMessage &msg){auto element = std::make_unique<VosMessage>(msg);msgQueue_->sendMessage(std::move(element));}void start(SubscriberCb&& Cb){msgQueue_->addSubscriberCb(std::forward<SubscriberCb>(Cb));msgQueue_->setRunningFlag(true);initAsyncMessageTask();vos_instance_.VOS_AddMsgHandler(name_, [=](const VosMessage &msg){this->sendMsg(msg);});}protected:void initAsyncMessageTask();void release();void setRunningFlag(bool flag);bool loopCondition();void threadForEventHandler();void eventHandler();bool runningFlag_ = false;std::thread event_handler_thread_;VOS_CompName name_;std::shared_ptr<VOS_MsgQueue> msgQueue_;VOS_Interface& vos_instance_;
};bool VOS_IHandler::Impl::loopCondition()
{return runningFlag_;
}void VOS_IHandler::Impl::eventHandler()
{ while (loopCondition()){try{msgQueue_->handleMessage();}catch(const std::exception& e){return;}catch (...){return;}}
}void VOS_IHandler::Impl::threadForEventHandler()
{if(!event_handler_thread_.joinable()){event_handler_thread_ = std::thread([this](){this->eventHandler();});}
}void VOS_IHandler::Impl::setRunningFlag(bool flag)
{runningFlag_ = flag;
}
void VOS_IHandler::Impl::initAsyncMessageTask()
{setRunningFlag(true);threadForEventHandler();
}void VOS_IHandler::Impl::release()
{vos_instance_.VOS_RemoveMsgHandler(name_);setRunningFlag(false);if(event_handler_thread_.joinable()){msgQueue_->Stop();event_handler_thread_.join();}
}VOS_IHandler::VOS_IHandler(VOS_CompName name): pimpl_( std::make_unique<VOS_IHandler::Impl>(name) )
{}void VOS_IHandler::start()
{auto handler = [this](const VosMessage &msg){this->handleMessage(msg);};pimpl_->start(handler);
}}
4.3.2 VOS_InterfaceImpl.cpp
#include "VOS_InterfaceImpl.h"
#include <algorithm>namespace VOS
{VOS_InterfaceImpl::VOS_InterfaceImpl(): timers_(std::make_shared<VOS_Timer>())
{timers_->addSubscriberCb([this](VOS_CompName name, VOSTimerType timerType){this->VOS_SendTimerMsg(name, timerType);});timers_->start();
}
VOS_InterfaceImpl:: ~VOS_InterfaceImpl()
{timers_.reset();
}void VOS_InterfaceImpl::VOS_AddMsgHandler(VOS_CompName name, MsgHandler&& eh)
{if(!eh){return;}std::unique_lock<std::mutex> lck(interface_lock_);if (handlers_.find(name) != handlers_.end()){handlers_[name] = std::move(eh);}else{handlers_.insert(std::make_pair(name, std::forward<MsgHandler>(eh)));}
}void VOS_InterfaceImpl::VOS_RemoveMsgHandler(VOS_CompName name)
{std::unique_lock<std::mutex> lck(interface_lock_);auto handler = [](const VosMessage &){};if (handlers_.find(name) != handlers_.end()){handlers_[name] = std::move(handler);}}void VOS_InterfaceImpl::VOS_SendMsg(VOS_CompName name, VosMessage &message)
{std::unique_lock<std::mutex> lck(interface_lock_);auto iter = handlers_.find(name);if(iter != handlers_.end()){iter->second(message);}//handlers_[name](message);
}void VOS_InterfaceImpl::VOS_SendTimerMsg(VOS_CompName name, VOSTimerType timerType)
{VosMessage message(VOS_MessageType::VOS_TIMER);message.msgData_.xTimerMsg.timerType = timerType;VOS_SendMsg(name, message);
}void VOS_InterfaceImpl::VOS_PublishEvent(VOS_EventID eventId, VOSEventArg arg)
{std::unique_lock<std::mutex> lck(interface_lock_);auto map_iter = event_map_.find(eventId);if(map_iter != event_map_.end()){std::for_each(map_iter->second.begin(), map_iter->second.end(), [eventId, arg, this](VOS_CompName name){VosMessage message(VOS_MessageType::VOS_EVENT);message.msgData_.xEventMsg.eventId = static_cast<VOS_EventID> (eventId);message.msgData_.xEventMsg.arg = arg;this->VOS_SendMsg(name, message);});}
}void VOS_InterfaceImpl::VOS_RegisterEvent(VOS_EventID eventId, VOS_CompName name)
{std::unique_lock<std::mutex> lck(interface_lock_);auto map_iter = event_map_.find(eventId);if(map_iter != event_map_.end()){auto vec_iter = std::find(map_iter->second.begin(), map_iter->second.end(), name);//already exist, returnif(vec_iter != map_iter->second.end()){return;}// push back new oneelse{map_iter->second.push_back(name);}}else{event_map_[eventId].push_back(name);}return;
}void VOS_InterfaceImpl::VOS_UnRegisterEvent(VOS_EventID eventId, VOS_CompName name)
{std::unique_lock<std::mutex> lck(interface_lock_);auto map_iter = event_map_.find(eventId);if(map_iter != event_map_.end()){auto vec_iter = std::find(map_iter->second.begin(), map_iter->second.end(), name);//already exist, returnif(vec_iter != map_iter->second.end()){map_iter->second.erase(vec_iter);return;}}return;
}void VOS_InterfaceImpl::VOS_RegisterInitCB(VOS_CompName name)
{}timer_id VOS_InterfaceImpl::StartTimer(VOS_CompName name, VOSTimerType timerType, const timestamp &when, unsigned int interval)
{std::unique_lock<std::mutex> lck(interface_lock_);return timers_->StartTimer(name, timerType, when, interval );
}bool VOS_InterfaceImpl::Canceltimer(timer_id id)
{std::unique_lock<std::mutex> lck(interface_lock_);return timers_->Canceltimer(id);
}VOS_Interface& VOS_Interface::getInstance()
{static VOS_InterfaceImpl instance;return instance;
}}
4..3.3 VOS_TimerImpl.cpp
#include <algorithm>
#include "VOS_TimerImpl.h"namespace VOS
{VOS_Timer::VOS_Timer() : m{}, cond{}, events{}, time_events{}, free_ids{}
{}VOS_Timer:: ~VOS_Timer()
{release();
}void VOS_Timer::setRunningFlag(bool flag)
{runningFlag_ = flag;
}
void VOS_Timer::release()
{setRunningFlag(false);if(timer_thread_.joinable()){cond.notify_all();timer_thread_.join();}events.clear();time_events.clear();while (!free_ids.empty()){free_ids.pop();}
}bool VOS_Timer::loopCondition()
{return runningFlag_;
}void VOS_Timer::timerEngine()
{int i = 0; while (loopCondition()){scoped_m lock(m);if (time_events.empty()){// Wait for workcond.wait(lock);}else{Time_event te = *time_events.begin();if (clock::now() >= te.next){// Remove time eventtime_events.erase(time_events.begin());// this is vital, remmber to addSubscriberCb firstlock.unlock();if(subScriberCb){subScriberCb(events[te.ref].name,events[te.ref].timerType);}lock.lock();if (events[te.ref].valid &&events[te.ref].period.count() > 0){// The event is valid and a periodic timer.te.next += events[te.ref].period;time_events.insert(te);}else{// The event is either no longer valid because it was// removed in the callback, or it is a one-shot timer.events[te.ref].valid = false;events[te.ref].name = VOS_CompName::COMP_Bottom;events[te.ref].timerType = VOSTimerType::VOSTimerType_BOTTOM;free_ids.push(te.ref);}}else{cond.wait_until(lock, te.next);}}}}void VOS_Timer::threadForTimer()
{if(!timer_thread_.joinable()){timer_thread_ = std::thread([this](){this->timerEngine();});}
}}
简单党员测试代码(VC2022 上 google test的配置见之前文章)
4.4 UT 简单测试
VOS_IHandler_test.cpp
#include <iostream>
#include <thread>
#include <string>#include "VOS_IHandler.h"
#include "VOS_Interface.h"#include "gmock/gmock.h"
#include "gtest/gtest.h"using ::testing::_;
using ::testing::NiceMock;
using ::testing::Return;
using namespace testing;namespace VOS
{class VOS_IHandlerWrapper : public VOS_IHandler
{
public:VOS_IHandlerWrapper(VOS_CompName name):VOS_IHandler(name){}void handleMessage(const VosMessage &msg){messagenum++;switch (msg.msgType_){case VOS_MessageType::VOS_SYNC_MSG:sync_msg = msg.msgData_.xSyncMsg;break;case VOS_MessageType::VOS_TIMER:timer_msg = msg.msgData_.xTimerMsg;break;case VOS_MessageType::VOS_BOTTOM:break;case VOS_MessageType::VOS_EVENT:event_msg = msg.msgData_.xEventMsg;break;default:break;}}void SleepForPeriod(int numberOfseconds){std::this_thread::sleep_for(std::chrono::seconds(numberOfseconds));}int messagenum = 0;VOS_EVENT_MSG event_msg;VOS_SYNC_MSG sync_msg;VOS_TIMER_MSG timer_msg;};class VOS_IHandlerTest : public ::testing::Test
{public:void SetUp() override{messageHandler_ = std::make_shared<VOS_IHandlerWrapper>(VOS_CompName::COMP_AR);messageHandler_->start();}std::shared_ptr<VOS_IHandlerWrapper> messageHandler_;};TEST_F(VOS_IHandlerTest, VOS_SYNC_MSG_Process_OK)
{VosMessage message(VOS_MessageType::VOS_SYNC_MSG);message.msgData_.xSyncMsg.reserve_1 = 999;VOS_Interface::getInstance().VOS_SendMsg(VOS_CompName::COMP_AR, message);messageHandler_->SleepForPeriod(2);if(messageHandler_->messagenum != 0){EXPECT_EQ(messageHandler_->messagenum, 1);EXPECT_EQ(messageHandler_->sync_msg.reserve_1, 999);}
}TEST_F(VOS_IHandlerTest, VOS_PublishEvent_OK)
{VOS_Interface::getInstance().VOS_RegisterEvent( static_cast<VOS_EventID>(5), VOS_CompName::COMP_AR);VosMessage message(VOS_MessageType::VOS_EVENT);message.msgData_.xEventMsg.eventId = static_cast<VOS_EventID>(5);message.msgData_.xEventMsg.arg.repData={1,2,3};VOS_Interface::getInstance().VOS_SendMsg(VOS_CompName::COMP_AR, message);messageHandler_->SleepForPeriod(2);if(messageHandler_->messagenum != 0){EXPECT_EQ(messageHandler_->messagenum, 1);EXPECT_EQ(messageHandler_->event_msg.eventId, static_cast<VOS_EventID>(5));EXPECT_EQ(messageHandler_->event_msg.arg.repData.programId, 1);EXPECT_EQ(messageHandler_->event_msg.arg.repData.assayNumber, 2);EXPECT_EQ(messageHandler_->event_msg.arg.repData.repIdx, 3);}VOS_Interface::getInstance().VOS_UnRegisterEvent( static_cast<VOS_EventID>(5), VOS_CompName::COMP_AR);
}TEST_F(VOS_IHandlerTest, VOS_TIMER_MSG_ONCE_TIMER_OK)
{auto timer_id = VOS_Interface::getInstance().StartTimer(VOS_CompName::COMP_AR, VOSTimerType::VOSTimerType_BOTTOM, clock::now());messageHandler_->SleepForPeriod(2);if(messageHandler_->messagenum != 0){EXPECT_EQ(messageHandler_->messagenum, 1);EXPECT_EQ(messageHandler_->timer_msg.timerType, VOSTimerType::VOSTimerType_BOTTOM);}
}TEST_F(VOS_IHandlerTest, VOS_TIMER_MSG_PERIODIC_TIMER_OK)
{auto timer_id = VOS_Interface::getInstance().StartTimer(VOS_CompName::COMP_AR, VOSTimerType::VOSTimerType_BOTTOM, clock::now(), 1);messageHandler_->SleepForPeriod(2);if(messageHandler_->messagenum != 0){EXPECT_EQ(messageHandler_->timer_msg.timerType, VOSTimerType::VOSTimerType_BOTTOM);}VOS_Interface::getInstance().Canceltimer(timer_id);
}}
先写到这里,无烟无酒无故事。