今天调休,抓住年假的最后一天,将构思多日适合将并行任务串行执行的框架实现出来。
核心思想:
- 将各个独立的功能模块作为周期性的任务。
- 在主循环集中调度所有任务,让各个功能模块依次有处理事项的机会。
- 如果处理事项较为耗时,可以借助 std::async() 的方式通过新线程处理。
设计优点:
- 可以设置调度间隔时长,便于用于需要定时执行某些事项的模块。
- 各个任务可以动态添加或移除,实现可动态开启或关闭指定的功能模块。
- 各个任务可以动态调整间隔时长,实现可动态调整功能模块的执行周期。
- 采用继承接口的方式,将调度执行和增加事项分离,很方便的实现缓存执行的任务。
如需要下载多个文件,可以将下载请求放入列表中,在每次被调度时从列表中取出一条请求执行。
实现的代码很简单:
/**
******************************************************************************
* @文件 PeriodTask.hpp
* @版本 V1.0.0
* @日期
* @概要 管理和执行需要周期性调度的工作任务
* @原作 lmx 1007566569@qq.com
******************************************************************************
* @注意
*
******************************************************************************
*/#ifndef __PERIOD_TASK_H__
#define __PERIOD_TASK_H__#include <list>
#include <mutex>
#include <memory>
#include <algorithm>
#include "MeasureTime.hpp"class PeriodTask // 用于需要被周期性调度的对象继承
{
public:virtual ~PeriodTask() {};/*********************************************************************** 函数: addPeriodTask* 功能: 被加入调度时的回调, 可初始化所需要的资源* 参数: 无需参数* 返回: 无返回值* 注意: 执行时间过长会影响调度, 因此执行时间需要尽量短**********************************************************************/ virtual void addPeriodTask(void){}; /*********************************************************************** 函数: delPeriodTask* 功能: 被移除调度时的回调, 可以释放所占用的资源* 参数: 无需参数* 返回: 无返回值**********************************************************************/ virtual void delPeriodTask(void){};/*********************************************************************** 函数: runPeriodTask* 功能: 被调度时的回调, 必须被实现, 执行时间需要尽可能短* 参数: 无需参数* 返回: 无返回值**********************************************************************/ virtual void runPeriodTask(void) = 0; // 被调度执行
};class PeriodTasks // 用于管理和调度对象
{
private:class Task { // 用于组织调度对象, 为调度对象支持新特性private:std::shared_ptr<PeriodTask> sp_task_;std::shared_ptr<MeasureTime> sp_time_;public:/*********************************************************************** 函数: 构造方法* 功能: 组合调度对象和超时参数* 参数: sp_task:调度对象 interval_msec:调度间隔时长(毫秒)* 返回: 无返回值**********************************************************************/ Task(std::shared_ptr<PeriodTask> sp_task, unsigned long long interval_msec = 0){sp_task_ = sp_task;sp_time_ = interval_msec ? std::make_shared<MeasureTime>(interval_msec) : nullptr;}/*********************************************************************** 函数: getTask* 功能: 获取调度对象* 参数: 无需参数* 返回: 返回调度对象**********************************************************************/ std::shared_ptr<PeriodTask> getTask(){ return sp_task_;}/*********************************************************************** 函数: getIntervalTime* 功能: 获取间隔时间* 参数: 无需参数* 返回: 返回间隔时间**********************************************************************/ unsigned long long getIntervalTime(){return sp_time_ ? sp_time_->getTimeout() : 0;}/*********************************************************************** 函数: isReady* 功能: 该调度对象是否准备好被调度* 参数: 无需参数* 返回: true:已经准备就绪 false:指定间隔时间还未到**********************************************************************/ bool isReady(){if(sp_time_ && sp_time_->isTimeout()){sp_time_->update();return true;}return nullptr != sp_time_ ? false : true;}};private:std::mutex mutex_task_;std::list<std::shared_ptr<Task>> task_list_;public:/*********************************************************************** 函数: ~PeriodTasks* 功能: 析构方法, 清除所有任务* 参数: 无需参数* 返回: 无返回值**********************************************************************/ ~PeriodTasks(){std::unique_lock< std::mutex > lock(mutex_task_);for (auto task = task_list_.begin(); task != task_list_.end();) {(*task)->getTask()->delPeriodTask();task = task_list_.erase(task);}}/*********************************************************************** 函数: isTask* 功能: 判断指定调度对象是否在调度列表中* 参数: sp_task:调度对象* 返回: true:已在列表 false:未在列表**********************************************************************/ bool isTask(std::shared_ptr<PeriodTask> sp_task){std::unique_lock< std::mutex > lock(mutex_task_);if(std::find_if(task_list_.begin(), task_list_.end(), [sp_task](std::shared_ptr<Task> t) { return t->getTask() == sp_task; }) != task_list_.end()){return true;}return false;}/*********************************************************************** 函数: addTask* 功能: 将指定调度对象加入调度列表中, 会触发 addPeriodTask()* 参数: sp_task:调度对象 interval_msec:调度间隔时长(毫秒)* 返回: true:加入成功 false:加入失败,列表已有**********************************************************************/ bool addTask(std::shared_ptr<PeriodTask> sp_task, unsigned long long interval_msec = 0) {std::unique_lock< std::mutex > lock(mutex_task_);if(std::find_if(task_list_.begin(), task_list_.end(), [sp_task](std::shared_ptr<Task> t) { return t->getTask() == sp_task; }) != task_list_.end()){return false;}sp_task->addPeriodTask();task_list_.push_back(std::make_shared<Task>(sp_task, interval_msec));task_list_.sort([](std::shared_ptr<Task> a, std::shared_ptr<Task> b){return a->getIntervalTime() < b->getIntervalTime();});return true;}/*********************************************************************** 函数: delTask* 功能: 从调度列表中删除指定调度对象, 会触发 delPeriodTask()* 参数: sp_task:调度对象* 返回: true:删除成功 false:删除失败,列表未找到**********************************************************************/ bool delTask(std::shared_ptr<PeriodTask> sp_task) {std::unique_lock< std::mutex > lock(mutex_task_);for (auto task = task_list_.begin(); task != task_list_.end(); task++) {if ((*task)->getTask() == sp_task) {task_list_.erase(task);lock.unlock();sp_task->delPeriodTask();return true;}}return false;}/*********************************************************************** 函数: modTask* 功能: 从调度列表中修改指定调度对象的间隔时间* 参数: sp_task:调度对象 interval_msec:新的间隔时间(毫秒级)* 返回: true:修改成功 false:修改失败,列表未找到**********************************************************************/ bool modTask(std::shared_ptr<PeriodTask> sp_task, unsigned long long interval_msec){if(isTask(sp_task) == true){std::unique_lock< std::mutex > lock(mutex_task_);task_list_.remove_if([sp_task](std::shared_ptr<Task> &t) { return t->getTask() == sp_task; });task_list_.push_back(std::make_shared<Task>(sp_task, interval_msec));task_list_.sort([](std::shared_ptr<Task> a, std::shared_ptr<Task> b){return a->getIntervalTime() < b->getIntervalTime();});return true;}return false;}/*********************************************************************** 函数: runTask* 功能: 开始调度所有的对象* 参数: 无需参数* 返回: 无返回值**********************************************************************/ void runTasks(void) {std::unique_lock< std::mutex > lock(mutex_task_);for (auto task = task_list_.begin(); task != task_list_.end(); task++) {if((*task)->isReady()){(*task)->getTask()->runPeriodTask();}}}
};#endif
为了支持任务可指定间隔时长,实现时间测量的工具类,独立出来也方便需要的时候引用。
/**
******************************************************************************
* @文件 MarkTime.hpp
* @版本 V1.0.0
* @日期
* @概要 用于标记和测量时间
* @原作 lmx 1007566569@qq.com
******************************************************************************
* @注意
*
******************************************************************************
*/#ifndef __MARK_TIME_H__
#define __MARK_TIME_H__#include <mutex>
#include <time.h>class MarkTime
{
private:std::mutex mutex_data_;struct timespec timespec_;public:/*********************************************************************** 函数: MarkTime* 功能: 构造方法, 初始化时间基准* 参数: 无需参数* 返回: 无返回值**********************************************************************/MarkTime(){clock_gettime(CLOCK_MONOTONIC_RAW, ×pec_);}/*********************************************************************** 函数: mark* 功能: 记录当前时间* 参数: 无需参数* 返回: 无返回值**********************************************************************/inline void mark(){std::unique_lock< std::mutex > lock(mutex_data_);clock_gettime(CLOCK_MONOTONIC_RAW, ×pec_); }/*********************************************************************** 函数: getMicrosecond* 功能: 获取自标记起, 调用此接口时的时间* 参数: 无需参数* 返回: 返回自标记到调用此接口时的时间(微秒级)**********************************************************************/ inline unsigned long long getMicrosecond(){struct timespec timespec;clock_gettime(CLOCK_MONOTONIC_RAW, ×pec);std::unique_lock< std::mutex > lock(mutex_data_);return ((timespec.tv_sec * 1000000) + (timespec.tv_nsec / 1000.0)) - ((timespec_.tv_sec * 1000000) + (timespec_.tv_nsec / 1000.0));}/*********************************************************************** 函数: getMillisecond* 功能: 获取自标记起, 调用此接口时的时间* 参数: 无需参数* 返回: 返回自标记到调用此接口时的时间(毫秒级)**********************************************************************/ inline unsigned long long getMillisecond(){struct timespec timespec;clock_gettime(CLOCK_MONOTONIC_RAW, ×pec);std::unique_lock< std::mutex > lock(mutex_data_);return ((timespec.tv_sec * 1000) + (timespec.tv_nsec / 1000000.0)) - ((timespec_.tv_sec * 1000) + (timespec_.tv_nsec / 1000000.0));}
};#endif
/**
******************************************************************************
* @文件 MeasureTime.hpp
* @版本 V1.0.0
* @日期
* @概要 用于计时
* @原作 lmx 1007566569@qq.com
******************************************************************************
* @注意
*
******************************************************************************
*/#ifndef __MEASURE_TIME_H__
#define __MEASURE_TIME_H__#include <mutex>
#include <memory>
#include "base/MarkTime.hpp"class MeasureTime
{
private:bool is_activate_;std::mutex mutex_data_;unsigned long long timeout_;std::shared_ptr<MarkTime> sp_marktime_;public:/*********************************************************************** 函数: MeasureTime* 功能: 初始化超时时间* 参数: timeout: 超时时间(毫秒级)* 返回: 无返回值**********************************************************************/ MeasureTime(unsigned long long timeout){timeout_ = timeout;is_activate_ = false;sp_marktime_ = std::make_shared<MarkTime>();}/*********************************************************************** 函数: get* 功能: 获取标记时间对象* 参数: 无需参数* 返回: 返回标记时间对象**********************************************************************/ std::shared_ptr<MarkTime> get(){return sp_marktime_;}/*********************************************************************** 函数: setTimeout* 功能: 设置超时时间* 参数: timeout:超时时间* 返回: 无返回值**********************************************************************/ void setTimeout(unsigned long long timeout){timeout_ = timeout;}/*********************************************************************** 函数: getTimeout* 功能: 获取超时时间* 参数: 无需参数* 返回: 返回设置的超时时间**********************************************************************/ unsigned long long getTimeout(){return timeout_;}/*********************************************************************** 函数: update* 功能: 更新时间* 参数: 无需参数* 返回: 无返回值**********************************************************************/ inline void update(){sp_marktime_->mark();}/*********************************************************************** 函数: isTimeout* 功能: 判断当前是否超时* 参数: 无需参数* 返回: true:已经超出指定超时时间 false:尚未超出指定超时时间**********************************************************************/ inline bool isTimeout(){return sp_marktime_->getMillisecond() >= timeout_;}/*********************************************************************** 函数: isNotTimeout* 功能: 判断当前是否没有超时* 参数: 无需参数* 返回: true:尚未超出指定超时时间 false:已经超出指定超时时间**********************************************************************/ inline bool isNotTimeout(){return sp_marktime_->getMillisecond() < timeout_;}/*********************************************************************** 函数: isActivate* 功能: 判断当前计时器是否激活* 参数: 无需参数* 返回: true:计时器已经被激活 false:计时器未被激活**********************************************************************/ inline bool isActivate(){ std::unique_lock< std::mutex > lock(mutex_data_);return is_activate_; }/*********************************************************************** 函数: setActivate* 功能: 激活计时器* 参数: 无需参数* 返回: 无返回值**********************************************************************/ inline void setActivate() { std::unique_lock< std::mutex > lock(mutex_data_);is_activate_ = true; }/*********************************************************************** 函数: setActivateAndUpdate* 功能: 激活计时器, 并更新时间* 参数: 无需参数* 返回: 无返回值**********************************************************************/ inline void setActivateAndUpdate() { std::unique_lock< std::mutex > lock(mutex_data_);is_activate_ = true; sp_marktime_->mark();}/*********************************************************************** 函数: setCancelActivate* 功能: 取消激活计时器* 参数: 无需参数* 返回: 禁用定时器前的计时器状态**********************************************************************/inline bool setCancelActivate(){std::unique_lock< std::mutex > lock(mutex_data_);bool is_activate = is_activate_;is_activate_ = false;return is_activate;}/*********************************************************************** 函数: isActivateAndTimeout* 功能: 判断当前计时器是否被激活,如果被激活是否已经超时* 参数: 无需参数* 返回: true:计时器已被激活且已经超时 false:计时器未激活或者没有超时**********************************************************************/ inline bool isActivateAndTimeout(){std::unique_lock< std::mutex > lock(mutex_data_);return is_activate_ ? isTimeout() : false;}/*********************************************************************** 函数: isActivateAndNotTimeout* 功能: 判断当前计时器是否被激活,如果被激活是否没有超时* 参数: 无需参数* 返回: true:计时器已被激活且没有超时 false:计时器未激活或者已经超时**********************************************************************/ inline bool isActivateAndNotTimeout(){std::unique_lock< std::mutex > lock(mutex_data_);return is_activate_ ? isNotTimeout() : false;}/*********************************************************************** 函数: setIfActivateAndTimeoutToCancel* 功能: 如果定时器激活并且超时了, 则取消激活定时器, 返回 true* 参数: 无需参数* 返回: true:定时器被激活且超时了(会禁用了定时器) false:计时器未激活或者没有超时**********************************************************************/inline bool setIfActivateAndTimeoutToCancel(){std::unique_lock< std::mutex > lock(mutex_data_);bool is_activate = is_activate_ && isTimeout();is_activate_ = is_activate ? false : is_activate_;return is_activate;}};#endif
测试用例:
#include <future>
#include <unistd.h>
#include <functional>
#include "PeriodTask.hpp"
#include "base/MarkTime.hpp"class Task1 : public PeriodTask { protected:virtual void addPeriodTask() {printf("Task1.addPeriodTask()...\n");}virtual void delPeriodTask() {printf("Task1.delPeriodTask()...\n");}virtual void runPeriodTask() {printf("Task1.runPeriodTask()...\n");}
};class Task2 : public PeriodTask { protected:virtual void addPeriodTask() {printf("Task2.addPeriodTask()...\n");}virtual void delPeriodTask() {printf("Task2.delPeriodTask()...\n");}virtual void runPeriodTask() {printf("Task2.runPeriodTask()...\n");}
};class Task3 : public PeriodTask { private:std::future<bool> future;bool futureExec(){// getDownload() 取出一条下载链接printf("futureExec begin...\n");sleep(3); // 模拟下载耗时动作printf("futureExec done...\n");}bool getDownload(std::string &url){// 从队列中取出一条下载链接return true;}size_t getDownloadSize(){return 10; // 返回下载队列数量}protected:virtual void addPeriodTask() {printf("Task3.addPeriodTask()...\n");}virtual void delPeriodTask() {printf("Task3.delPeriodTask() begin...\n");if(future.valid() == true){printf("wait async exit ing...");future.get();}printf("Task3.delPeriodTask() done...\n");}virtual void runPeriodTask() {printf("Task3.runPeriodTask() ...\n");if(future.valid() == true){if(future.wait_for(std::chrono::milliseconds(1)) != std::future_status::ready){printf("Task3.wait_for() ...\n");return ;}future.get();}// 查看是否有需要下载的工作if(getDownloadSize() > 0){printf("Task3.runPeriodTask() create async ...\n");future = std::async(std::launch::async, std::bind(&Task3::futureExec, this));}}public:bool addDownload(const char *url){// 将下载链接加入队列中return true;}
};int main()
{std::shared_ptr<MarkTime> sp_marktime;std::shared_ptr<PeriodTasks> sp_tasks;std::shared_ptr<Task1> sp_task1;std::shared_ptr<Task2> sp_task2;std::shared_ptr<Task3> sp_task3;sp_marktime = std::make_shared<MarkTime>();sp_tasks = std::make_shared<PeriodTasks>();sp_task1 = std::make_shared<Task1>();sp_task2 = std::make_shared<Task2>();sp_task3 = std::make_shared<Task3>();sp_tasks->addTask(sp_task1, 1000);sp_tasks->addTask(sp_task2, 2000);sp_tasks->addTask(sp_task3, 2000);sp_tasks->addTask(sp_task3, 5000); // 重复添加会失败// 可以在其它线程调用sp_task3->addDownload("https://.....");sp_tasks->modTask(sp_task3, 500);sp_marktime->mark();while(sp_marktime->getMillisecond() < 20 * 1000){if(sp_marktime->getMillisecond() > 10 * 1000){sp_tasks->delTask(sp_task3); // 可以在其它线程调用}sp_tasks->runTasks();}return 0;
}