【Poco库源码解析】Poco库中的通知

1、介绍

PocoPocoPoco 中的通知,是消息源通过中间载体将消息发送给观察者,通知可以分为
同步通知和异步通知。
下图是同步通知,消息发送流程:
在这里插入图片描述

2.同步通知

2.1 消息

class Notification: public RefCountedObject
{
public:typedef AutoPtr<Notification> Ptr;Notification();virtual std::string name() const;
protected:virtual ~Notification();
};

2.2 消息的发送者 source

类 NotificationCenter 类扮演了一个消息源的角色。下面是它的定义:

class  NotificationCenter	
{
public:NotificationCenter();/// Creates the NotificationCenter.~NotificationCenter();/// Destroys the NotificationCenter.void addObserver(const AbstractObserver& observer);/// Registers an observer with the NotificationCenter./// Usage:///     Observer<MyClass, MyNotification> obs(*this, &MyClass::handleNotification);///     notificationCenter.addObserver(obs);////// Alternatively, the NObserver template class can be used instead of Observer.void removeObserver(const AbstractObserver& observer);/// Unregisters an observer with the NotificationCenter.bool hasObserver(const AbstractObserver& observer) const;/// Returns true if the observer is registered with this NotificationCenter.void postNotification(Notification::Ptr pNotification);/// Posts a notification to the NotificationCenter./// The NotificationCenter then delivers the notification/// to all interested observers./// If an observer throws an exception, dispatching terminates/// and the exception is rethrown to the caller./// Ownership of the notification object is claimed and the/// notification is released before returning. Therefore,/// a call like///    notificationCenter.postNotification(new MyNotification);/// does not result in a memory leak.bool hasObservers() const;/// Returns true iff there is at least one registered observer.////// Can be used to improve performance if an expensive notification/// shall only be created and posted if there are any observers.std::size_t countObservers() const;/// Returns the number of registered observers.static NotificationCenter& defaultCenter();/// Returns a reference to the default/// NotificationCenter.private:typedef SharedPtr<AbstractObserver> AbstractObserverPtr;typedef std::vector<AbstractObserverPtr> ObserverList;ObserverList  _observers;mutable Mutex _mutex;
};

通过定义可以看出它是目标对象的集合std::vector<AbstractObserverPtr> _observers
通过调用函数 addObserver(const AbstractObserver& observer),可以完成目标对象
的注册过程。调用函数 removeObserver()则可以完成注销。函数 postNotification 是一个消息传递的过程,其定义如下:

void NotificationCenter::postNotification(Notification::Ptr pNotification)
{poco_check_ptr (pNotification);ScopedLockWithUnlock<Mutex> lock(_mutex);ObserverList observersToNotify(_observers);lock.unlock();for (ObserverList::iterator it = observersToNotify.begin(); it != observersToNotify.end(); ++it){(*it)->notify(pNotification);}
}

可以看到它是向所有观察者发送消息。这里为了避免长期占用_observers,在发送时复制了一份。

2.3 消息的接收者 target

class  AbstractObserver
{
public:AbstractObserver();AbstractObserver(const AbstractObserver& observer);virtual ~AbstractObserver();AbstractObserver& operator = (const AbstractObserver& observer);virtual void notify(Notification* pNf) const = 0;virtual bool equals(const AbstractObserver& observer) const = 0;virtual bool accepts(Notification* pNf, const char* pName = 0) const = 0;virtual AbstractObserver* clone() const = 0;virtual void disable() = 0;
};

所有接收类都需要继承AbstractObserver

Observer类

template <class C, class N>
class Observer: public AbstractObserver
{
public:typedef void (C::*Callback)(N*);Observer(C& object, Callback method): _pObject(&object), _method(method){}Observer(const Observer& observer):AbstractObserver(observer),_pObject(observer._pObject), _method(observer._method){}~Observer(){}Observer& operator = (const Observer& observer){if (&observer != this){_pObject = observer._pObject;_method  = observer._method;}return *this;}void notify(Notification* pNf) const{Poco::Mutex::ScopedLock lock(_mutex);if (_pObject){N* pCastNf = dynamic_cast<N*>(pNf);if (pCastNf){pCastNf->duplicate();(_pObject->*_method)(pCastNf);}}}bool equals(const AbstractObserver& abstractObserver) const{const Observer* pObs = dynamic_cast<const Observer*>(&abstractObserver);return pObs && pObs->_pObject == _pObject && pObs->_method == _method;}bool accepts(Notification* pNf) const{return dynamic_cast<N*>(pNf) != 0;}AbstractObserver* clone() const{return new Observer(*this);}void disable(){Poco::Mutex::ScopedLock lock(_mutex);_pObject = 0;}private:Observer();C*       _pObject;Callback _method;mutable Poco::Mutex _mutex;
};

Observer 中存在一个类实例对象的指针_pObject,以及对应函数入口地址_method。

NObserver类

template <class C, class N>
class NObserver: public AbstractObserver
{
public:typedef AutoPtr<N> NotificationPtr;typedef void (C::*Callback)(const NotificationPtr&);NObserver(C& object, Callback method): _pObject(&object), _method(method){}NObserver(const NObserver& observer):AbstractObserver(observer),_pObject(observer._pObject), _method(observer._method){}~NObserver(){}NObserver& operator = (const NObserver& observer){if (&observer != this){_pObject = observer._pObject;_method  = observer._method;}return *this;}void notify(Notification* pNf) const{Poco::Mutex::ScopedLock lock(_mutex);if (_pObject){N* pCastNf = dynamic_cast<N*>(pNf);if (pCastNf){NotificationPtr ptr(pCastNf, true);(_pObject->*_method)(ptr);}}}bool equals(const AbstractObserver& abstractObserver) const{const NObserver* pObs = dynamic_cast<const NObserver*>(&abstractObserver);return pObs && pObs->_pObject == _pObject && pObs->_method == _method;}bool accepts(Notification* pNf) const{return dynamic_cast<N*>(pNf) != 0;}AbstractObserver* clone() const{return new NObserver(*this);}void disable(){Poco::Mutex::ScopedLock lock(_mutex);_pObject = 0;}private:NObserver();C*       _pObject;Callback _method;mutable Poco::Mutex _mutex;
};

2.4 使用例子

#include "Poco/NotificationCenter.h"
#include "Poco/Notification.h"
#include "Poco/Observer.h"
#include "Poco/NObserver.h"
#include "Poco/AutoPtr.h"
#include <iostream>
using Poco::NotificationCenter;
using Poco::Notification;
using Poco::Observer;
using Poco::NObserver;
using Poco::AutoPtr;
class BaseNotification: public Notification
{
};
class SubNotification: public BaseNotification
{
};class Target
{
public:void handleBase(BaseNotification* pNf){std::cout << "handleBase: " << pNf->name() << std::endl;pNf->release(); // we got ownership, so we must release}void handleSub(const AutoPtr<SubNotification>& pNf){std::cout << "handleSub: " << pNf->name() << std::endl;}
};int main(int argc, char** argv)
{NotificationCenter nc;Target target;nc.addObserver(Observer<Target, BaseNotification>(target, &Target::handleBase));nc.addObserver(NObserver<Target, SubNotification>(target, &Target::handleSub));nc.postNotification(new BaseNotification);nc.postNotification(new SubNotification);nc.removeObserver(Observer<Target, BaseNotification>(target, &Target::handleBase));nc.removeObserver(NObserver<Target, SubNotification>(target, &Target::handleSub));return 0;
}

总结:
类似于观察者模式,通过创建观察者,增加到NotificationCenter,通知中心遍历所有观察者,调用到观察者,的notify函数,然后回调到用户的函数。

3. 异步通知

3.1 介绍

Poco 中的异步通知是通过 NotificationQueue 类来实现的,同它功能类似还有类PriorityNotificationQueue 和 TimedNotificationQueue。不同的是 PriorityNotificationQueue类中对消息分了优先级,对优先级高的消息优先处理;而 TimedNotificationQueue 对消息给了时间戳,时间戳早的优先处理,而和其压入队列的时间无关。所以接下来我们主要关注NotificationQueue 的实现。

class Foundation_API NotificationQueue/// A NotificationQueue object provides a way to implement asynchronous/// notifications. This is especially useful for sending notifications/// from one thread to another, for example from a background thread to /// the main (user interface) thread. /// /// The NotificationQueue can also be used to distribute work from/// a controlling thread to one or more worker threads. Each worker thread/// repeatedly calls waitDequeueNotification() and processes the/// returned notification. Special care must be taken when shutting/// down a queue with worker threads waiting for notifications./// The recommended sequence to shut down and destroy the queue is to///   1. set a termination flag for every worker thread///   2. call the wakeUpAll() method///   3. join each worker thread///   4. destroy the notification queue.
{
public:NotificationQueue();/// Creates the NotificationQueue.~NotificationQueue();/// Destroys the NotificationQueue.void enqueueNotification(Notification::Ptr pNotification);/// Enqueues the given notification by adding it to/// the end of the queue (FIFO)./// The queue takes ownership of the notification, thus/// a call like///     notificationQueue.enqueueNotification(new MyNotification);/// does not result in a memory leak.void enqueueUrgentNotification(Notification::Ptr pNotification);/// Enqueues the given notification by adding it to/// the front of the queue (LIFO). The event therefore gets processed/// before all other events already in the queue./// The queue takes ownership of the notification, thus/// a call like///     notificationQueue.enqueueUrgentNotification(new MyNotification);/// does not result in a memory leak.Notification* dequeueNotification();/// Dequeues the next pending notification./// Returns 0 (null) if no notification is available./// The caller gains ownership of the notification and/// is expected to release it when done with it.////// It is highly recommended that the result is immediately/// assigned to a Notification::Ptr, to avoid potential/// memory management issues.Notification* waitDequeueNotification();/// Dequeues the next pending notification./// If no notification is available, waits for a notification/// to be enqueued. /// The caller gains ownership of the notification and/// is expected to release it when done with it./// This method returns 0 (null) if wakeUpWaitingThreads()/// has been called by another thread.////// It is highly recommended that the result is immediately/// assigned to a Notification::Ptr, to avoid potential/// memory management issues.Notification* waitDequeueNotification(long milliseconds);/// Dequeues the next pending notification./// If no notification is available, waits for a notification/// to be enqueued up to the specified time./// Returns 0 (null) if no notification is available./// The caller gains ownership of the notification and/// is expected to release it when done with it.////// It is highly recommended that the result is immediately/// assigned to a Notification::Ptr, to avoid potential/// memory management issues.void dispatch(NotificationCenter& notificationCenter);/// Dispatches all queued notifications to the given/// notification center.void wakeUpAll();/// Wakes up all threads that wait for a notification.bool empty() const;/// Returns true iff the queue is empty.int size() const;/// Returns the number of notifications in the queue.void clear();/// Removes all notifications from the queue.bool hasIdleThreads() const;	/// Returns true if the queue has at least one thread waiting /// for a notification.static NotificationQueue& defaultQueue();/// Returns a reference to the default/// NotificationQueue.protected:Notification::Ptr dequeueOne();private:typedef std::deque<Notification::Ptr> NfQueue;struct WaitInfo{Notification::Ptr pNf;Event             nfAvailable;};typedef std::deque<WaitInfo*> WaitQueue;NfQueue           _nfQueue;WaitQueue         _waitQueue;mutable FastMutex _mutex;
};

定义可以看到 NotificationQueue 类管理了两个 deque 容器。其中一个是 WaitInfo对象的 deque,另一个是 Notification 对象的 deque。而 WaitInfo 一对一的对应了一个消息对象 pNf 和事件对象 nfAvailable,毫无疑问 Event 对象是用来同步多线程的。

Notification* NotificationQueue::waitDequeueNotification()
{Notification::Ptr pNf;WaitInfo* pWI = 0;{FastMutex::ScopedLock lock(_mutex);pNf = dequeueOne();if (pNf) return pNf.duplicate();pWI = new WaitInfo;_waitQueue.push_back(pWI);}pWI->nfAvailable.wait();pNf = pWI->pNf;delete pWI;return pNf.duplicate();
}

消费者线程首先从 Notification 对象的 deque 中获取消息,如果消息获取不为空,则接返回处理,如果消息为空,则创建一个新的 WaitInfo 对象,并压入 WaitInfo 对象的deque。 消费者线程开始等待,直到生产者通知有消息的存在,然后再从 WaitInfo 对象中取出消息,返回处理。当消费者线程能从 Notification 对象的 deque 中获取到消息时,说明消费者处理消息的速度要比生成者低;反之则说明消费者处理消息的速度要比生成者高。

void NotificationQueue::enqueueNotification(Notification::Ptr pNotification)
{poco_check_ptr (pNotification);FastMutex::ScopedLock lock(_mutex);if (_waitQueue.empty()){_nfQueue.push_back(pNotification);}else{WaitInfo* pWI = _waitQueue.front();_waitQueue.pop_front();pWI->pNf = pNotification;pWI->nfAvailable.set();}	
}

生产者线程首先判断 WaitInfo 对象的 deque 是否为空,如果不为空,说明存在消费者线程等待,则从 deque 中获取一个 WaitInfo 对象,灌入 Notification 消息,释放信号量激活消费者线程;而如果为空,说明目前说有的消费者线程都在工作,则把消息暂时存入Notification 对象的 deque,等待消费者线程有空时处理。

3.2 例子

#include "Poco/Notification.h"
#include "Poco/NotificationQueue.h"
#include "Poco/ThreadPool.h"
#include "Poco/Runnable.h"
#include "Poco/AutoPtr.h"
using Poco::Notification;
using Poco::NotificationQueue;
using Poco::ThreadPool;
using Poco::Runnable;
using Poco::AutoPtr;
class WorkNotification: public Notification
{public:WorkNotification(int data): _data(data) {}int data() const{return _data;}private:int _data;
};class Worker: public Runnable
{
public:Worker(NotificationQueue& queue): _queue(queue) {}void run(){AutoPtr<Notification> pNf(_queue.waitDequeueNotification());while (pNf){WorkNotification* pWorkNf = dynamic_cast<WorkNotification*>(pNf.get());if (pWorkNf){// do some work}pNf = _queue.waitDequeueNotification();}}private:NotificationQueue& _queue;
};int main(int argc, char** argv)
{NotificationQueue queue;Worker worker1(queue); // create worker threadsWorker worker2(queue);ThreadPool::defaultPool().start(worker1); // start workersThreadPool::defaultPool().start(worker2);// create some workfor (int i = 0; i < 100; ++i){queue.enqueueNotification(new WorkNotification(i));}while (!queue.empty()) // wait until all work is donePoco::Thread::sleep(100);queue.wakeUpAll(); // tell workers they're doneThreadPool::defaultPool().joinAll();return 0;
}

总结
生产者入队列,消费者出队列,实现消息的异步处理。

4、参考链接

https://pocoproject.org/slides/090-NotificationsEvents.pdf
https://blog.csdn.net/arau_sh/article/details/8673459

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/631569.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

生产环境LVM磁盘扩容

使用df -Th 命令查看磁盘信息 ,可以看到当前LVM逻辑卷容量是38G [rootZ ~]# df -TH 文件系统 类型 容量 已用 可用 已用% 挂载点 /dev/mapper/centos-root xfs 38G 2.4G 36G 7% / devtmpfs devtmpfs 1.1G 0 1.1G …

使用屏幕捕捉API:一站式解决屏幕录制需求

随着科技的发展&#xff0c;屏幕捕捉API技术逐渐成为一种热门的录屏方法。本文将详细介绍屏幕捕捉API技术的原理、应用场景以及如何利用这一技术为用户提供便捷、高效的录屏体验。 在线录屏 | 一个覆盖广泛主题工具的高效在线平台(amd794.com) https://amd794.com/recordscre…

win10系统的hiberfil.sys如何删除

C盘莫名其妙地出现了一个叫hiberfil.sys的文件。我一看&#xff0c;好家伙&#xff0c;6个多G&#xff0c;让我本就所剩无几的C盘空间再次雪上加霜&#xff01; 然后我就研究了一下。 hiberfil.sys是什么&#xff1f; 该文件用于将计算机处于休眠状态时的所有内容保存到硬盘…

Windows安装WSL2精简版教程

文章目录 一、安装WSL二、更改WSL的存放路径/备份WSL三、安装WSL Terminall四、WSL界面&#xff1a;xlaunch五、WSL1升级WSL2六、WSL2与VMware兼容问题七、更改手动导入的wsl的默认登录用户参考 一、安装WSL 步骤1 - 启用适用于 Linux 的 Windows 子系统&#xff1a; 需要先启…

.NET分库分表:高性能分页(mycat之外的选择)

&#x1f3c6;作者&#xff1a;科技、互联网行业优质创作者 &#x1f3c6;专注领域&#xff1a;.Net技术、软件架构、人工智能、数字化转型、DeveloperSharp、微服务、工业互联网、智能制造 &#x1f3c6;欢迎关注我&#xff08;Net数字智慧化基地&#xff09;&#xff0c;里面…

【USTC】verilog 习题练习 21-25

21 基于端口名称的实例化 题目描述 创建一 verilog 电路&#xff0c;实现对模块 mod_a 基于端口名称的实例化&#xff0c;如下图所示&#xff1a; 其中mod_a模块的代码为&#xff1a; module mod_a (output out1,output out2,input in1,input in2,input in3,in…

边缘计算AI智能分析网关V4客流统计算法的概述

客流量统计AI算法是一种基于人工智能技术的数据分析方法&#xff0c;通过机器学习、深度学习等算法&#xff0c;实现对客流量的实时监测和统计。该算法主要基于机器学习和计算机视觉技术&#xff0c;其基本流程包括图像采集、图像预处理、目标检测、目标跟踪和客流量统计等步骤…

【架构】docker实现3主3从架构配置【案例1/4】

一&#xff0c;集群规划及准备工作 架构实现&#xff1a;Redis3主3从 二&#xff0c;搭建命令 第一步&#xff0c;创建6台服务&#xff1a; docker run -d --name redis-node-1 --net host --privilegedtrue -v /data/redis/share/redis-node-1:/data redis:6.0.8 --clust…

基于Springboot+vue图书管理系统(前后端分离)

该项目完全免费 项目技术栈前后端分离&#xff1a; 后端&#xff1a;Springboot Mybatis-plus 前端&#xff1a;Vue ElementUI 数据库&#xff1a; MySQL 项目功能描述 管理员&#xff1a; 登录、个人信息、修改密码、管理后台管理系统所有数据 首页统计&#xff1a;…

Python(37):使用logging的配置文件配置日志

Python(37):使用logging的配置文件配置日志 输出日志到控制台和日志文件方法&#xff1a; 创建一个日志配置文件&#xff0c;然后使用fileConfig()函数来读取该文件的内容。 方法1&#xff1a;输出日志到文件&#xff0c;文件是固定的 方法2&#xff1a;输出日志到文件&…

【机器学习】机器学习四大类第01课

一、机器学习四大类 有监督学习 (Supervised Learning) 有监督学习是通过已知的输入-输出对&#xff08;即标记过的训练数据&#xff09;来学习函数关系的过程。在训练阶段&#xff0c;模型会根据这些示例调整参数以尽可能准确地预测新的、未见过的数据点的输出。 实例&#x…

docker安装 unexpected wsl error

docker unexpected wsl error 问题描述&#xff1a; 很诡异的一个问题 大概现象和这个帖子很像 https://developer.aliyun.com/article/1395485 docker版本4.26.1 系统&#xff1a; windows 10 winR 输入winver可以看见自己的版本号 华为matebook 16s 重装的Win10 解决流程…

代码随想录算法训练营29期|day 23 任务以及具体安排

669. 修剪二叉搜索树 class Solution {public TreeNode trimBST(TreeNode root, int low, int high) {if (root null) {return null;}if (root.val < low) {return trimBST(root.right, low, high);}if (root.val > high) {return trimBST(root.left, low, high);}// ro…

农用拖拉机市场调研:预计2029年将达到171亿美元

由于近些年来新兴市场的迅速崛起和技术创新的不断涌现&#xff0c;全球农用拖拉机市场的竞争也日趋激烈。生产商不仅需要提供质量可靠的产品&#xff0c;还需要提供良好的售后服务以赢得客户的信赖。 农业是支撑国民经济建设与发展的基础产业&#xff0c;而农业机械化是建设现代…

2024杭州国际智慧城市,人工智能,安防展览会(杭州智博会)

在智能化浪潮的冲击下&#xff0c;我们的生活与环境正在经历一场深刻的变革。这是一场前所未有的技术革命&#xff0c;它以前所未有的速度和广度&#xff0c;改变着我们的生活方式、工作方式、思维方式和社会结构。在这场变革中&#xff0c;有的人选择激流勇进&#xff0c;拥抱…

ACL实验

一&#xff1a;实验要求 二&#xff1a;实验分析 PC1可以telnet R1但不能ping通R1 PC1可以ping通R2但不能telnet R2 PC2可以ping通R1但不能telnet R1 PCR可以telnet R2但不能ping通R2 三&#xff1a;实验过程 配置IP 配置静态路由 检查是否全网可通 配置Telnet r1创建…

Xshell无法ssh连接虚拟机问题或主机无法ping通虚拟机。

常见报错如下&#xff1a; 1&#xff0c;Could not connect to ‘&#xff1f;&#xff1f;&#xff1f;’ (port 22): Connection failed. 2&#xff0c;卡在To escape to local shell, press ‘CtrlAlt]’. 3&#xff0c;Connection closing…Socket close. Connection clos…

一款轻量级、基于Java语言开发的低代码开发框架,开箱即用!

数字化时代&#xff0c;企业对于灵活、高效和安全的软件开发需求日益旺盛。为了满足这些需求&#xff0c;许多组织转向低代码技术&#xff0c;以寻求更具成本效益和创新性的解决方案。JNPF基础框架正是在这一背景下应运而生&#xff0c;凭借其私有化部署和100%源码交付的特性&a…

Unity之铰链关节和弹簧组件

《今天闪电侠他回来了&#xff0c;这一次他要拿回属于他的一切》 目录 &#x1f4d5;一、铰链关节组件HingeJoint 1. 实例 2. 铰链关节的坐标属性 ​3.铰链关节的马达属性Motor &#x1f4d5;二、弹簧组件 &#x1f4d5;三、杂谈 一、铰链关节组件HingeJoint 1. 实例 说…

【STM32调试】寄存器调试不良问题记录持续版

STM32寄存器调试不良问题记录 NVIC&#xff08;内嵌的中断向量控制器&#xff09;EXTI&#xff08;外部中断/事件&#xff09; 记录一些stm32调试过程中&#xff1a;不易被理解、存在使用误区、不清不楚、是坑、使用常识等方面的一些记录。本记录只包含stm32的内核以及外设等寄…