提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 1、用timerfd加epoll封装定时器的优点
- 2、代码实现
1、用timerfd加epoll封装定时器的优点
定时器为什么需要timerfd
在设计定时器时,我们首先想到的就是设置一个定时任务触发的时间,然后不断判断(死循环)当前时间是否大于等于定时任务触发的时间,如果是,那么就处理定时任务。这就是最为简单的设计,在我之前的博客中[定时器的简单实现],就是这么实现的,但是这样设计会存在诸多问题
- CPU资源浪费
使用死循环来检查时间意味着CPU必须不断地执行这段代码,即使大部分时间都是在做无用的比较。这会导致CPU资源的浪费,尤其是在高性能的服务器或多任务环境中。 - 响应性下降
由于CPU忙于执行定时器的检查,它可能无法及时响应其他重要的事件或任务,导致系统响应性下降。 - 不准确
依赖于系统的时钟分辨率和调度器延迟,使用死循环检查时间的方法可能无法实现精确的定时。例如,如果系统时钟的分辨率是毫秒级,而你尝试实现微秒级的定时,那么这种方法就无法满足需求。 - 不适合长时间等待
如果定时任务触发的时间间隔很长(例如几小时或几天),那么使用死循环来等待这段时间是非常低效的。
为解决上述问题,就产生了timerfd,当使用timerfd_create创建timerfd时,并设置了定时任务,当定时任务时间到达时,那么timerfd就变成了可读,经常与 select/poll/epoll搭配使用
这里我们不需要轮询这个timerfd,判断timerfd是否有数据(是否可读),因为这样做也会带来上述问题,因此我们需要将timerfd加入到select/poll/epoll中,让它们轮询,一般来说使用epoll更高效
- 统一的事件处理:epoll是Linux下多路复用IO接口select/poll的增强版本,它可以高效地处理大量的文件描述符和I/O事件。通过将timerfd的文件描述符加入epoll的监控集合中,可以将定时器超时事件与其他I/O事件进行统一处理,简化了事件驱动编程的复杂性。
- 提高并发性能:在高并发的网络服务器中,使用epoll可以监控多个套接字的I/O事件,而使用timerfd可以实现定时任务(如心跳检测、超时处理等)。这种结合使用的方式可以提高系统的并发性能和吞吐量。
- 减少系统调用开销:由于epoll采用I/O多路复用机制,并且只在有事件发生时才进行通知,因此可以减少不必要的系统调用开销。同时,由于timerfd的精度较高,可以减少因轮询而产生的额外开销。
2、代码实现
定时任务
//TimerEvent.h
#pragma once
#include <cstdint>
#include <functional>
#include <sys/time.h>
#include <memory>
class TimerEvent
{
public:using s_ptr = std::shared_ptr<TimerEvent>;template<typename F, typename... Args>TimerEvent(int interval, bool is_repeated, F&& f, Args&&... args):interval_(interval), is_repeated_(is_repeated){auto task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);task_ = task;}int64_t getArriveTime() const{return arrive_time_;}void setCancler(bool flag){is_cancled_ = flag;}bool isCancle(){return is_cancled_;}bool isRepeated(){return is_repeated_;}std::function<void()> getCallBack(){return task_;}//重新设定任务到达时间void resetArriveTime();//获取当前时间static int64_t getNowMs();
private:int64_t arrive_time_;//ms 执行任务时毫秒级时间戳,达到对应的时间戳就执行对应的任务int64_t interval_;//ms 隔多少ms后执行bool is_repeated_{false};//是否为周期性的定时任务bool is_cancled_{false};//是否取消std::function<void()> task_;
};//TimerEvent.cpp
#include"TimerEvent.h"int64_t TimerEvent::getNowMs()
{timeval val;gettimeofday(&val, NULL);return val.tv_sec*1000 + val.tv_usec/1000;
}void TimerEvent::resetArriveTime()
{arrive_time_ = getNowMs() + interval_;
}
对timerfd的封装
//Timer.h
#pragma once
#include <map>
#include <vector>
#include <iostream>
#include "TimerEvent.h"class Timer
{
public:Timer();~Timer();int getFd(){return fd_;}void addTimerEvent(TimerEvent::s_ptr event);void deleteTimerEvent(TimerEvent::s_ptr event);//时间到达就触发void onTimer();std::vector<std::function<void()>> &getCallbacks(){return callbacks_;}//重新设置任务的到达时间void resetArriveTime();private:int fd_;std::multimap<int64_t, TimerEvent::s_ptr> pending_events_;std::vector<std::function<void()>> callbacks_;
};//Timer.cpp
#include <sys/timerfd.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include "Timer.h"Timer::Timer() : fd_(timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK))
{
}Timer::~Timer()
{}void Timer::resetArriveTime()
{if (pending_events_.empty()){return;}int64_t now = TimerEvent::getNowMs();auto it = pending_events_.begin();int64_t inteval = 0;// 第一个任务的定时时间比当前时间大,则重新设置if (it->second->getArriveTime() > now){inteval = it->second->getArriveTime() - now;}else{// 第一个任务的定时时间比当前时间小或相等,说明第一个任务已经超时了,应该立马执行该任务inteval = 100; // ms}timespec ts;memset(&ts, 0, sizeof(ts));ts.tv_sec = inteval / 1000;//秒ts.tv_nsec = (inteval % 1000) * 1000000;//纳秒itimerspec value;memset(&value, 0, sizeof(value));value.it_value = ts;int result = timerfd_settime(fd_, 0, &value, NULL);if (result != 0){printf("timerfd_settime error, errno=%d, error=%s", errno, strerror(errno));}
}void Timer::addTimerEvent(TimerEvent::s_ptr event)
{bool is_reset_timerfd = false;if (pending_events_.empty()){is_reset_timerfd = true;}else{auto it = pending_events_.begin();// 当前需要插入的定时任务时间比已经存在的定时任务时间要早,那么就需要重新设定超时时间,防止任务延时if (it->first > event->getArriveTime()){is_reset_timerfd = true;}}pending_events_.emplace(event->getArriveTime(), event);if (is_reset_timerfd){resetArriveTime();}
}void Timer::deleteTimerEvent(TimerEvent::s_ptr event)
{event->setCancler(true);//pending_events_是multimap,key是时间,可能存在多个相同时间的event//将对应的event从pending_events_中删除auto begin = pending_events_.lower_bound(event->getArriveTime());auto end = pending_events_.upper_bound(event->getArriveTime());auto it = begin;for(;it != end; ++it){if(it->second == event){break;}}if(it != end){pending_events_.erase(it);}}void Timer::onTimer()
{char buf[8];for(;;){if((read(fd_, buf, 8) == -1) && errno == EAGAIN){break;}}int64_t now = TimerEvent::getNowMs();std::vector<TimerEvent::s_ptr> tmps;std::vector<std::function<void()>>& callbacks_ = getCallbacks();auto it = pending_events_.begin();for(; it != pending_events_.end(); ++it){// 任务已经到时或者超时,并且没有被取消,就需要执行if((it->first <= now) && !it->second->isCancle()){tmps.push_back(it->second);callbacks_.push_back(it->second->getCallBack());}else{break;// 因为定时任务是升序排的,只要第一个任务没到时,后面的都没到时}}//因为把任务已经保存好了,因此需要把m_pending_events中对应的定时任务删除,防止下次又执行了pending_events_.erase(pending_events_.begin(), it);// 需要把重复的TimerEvent再次添加进去for(auto i = tmps.begin(); i != tmps.end(); ++i){if(!(*i)->isCancle()){//std::cout<<"重新添加"<<std::endl;(*i)->resetArriveTime();addTimerEvent(*i);}}resetArriveTime();
}
对epoll的封装
//TimerPollPoller.h
#pragma once
#include <sys/epoll.h>
#include <cstring>
#include <unistd.h>
#include <fcntl.h>
#include <atomic>
#include <iostream>
#include "ThreadPool.h"
#include "Timer.h"class TimerPollPoller
{
public:TimerPollPoller(unsigned int num = std::thread::hardware_concurrency()):epollfd_(::epoll_create1(EPOLL_CLOEXEC)),thread_pool_(ThreadPool::instance()),stop_(true){timer_ = std::make_shared<Timer>();struct epoll_event event;memset(&event, 0, sizeof(event));event.data.ptr = reinterpret_cast<void*>(&timer_);event.events = EPOLLIN;::epoll_ctl(epollfd_, EPOLL_CTL_ADD, timer_->getFd(), &event);start();}~TimerPollPoller(){::close(epollfd_);stop();if(t.joinable()){std::cout << "主线程 join thread " << t.get_id() << std::endl;t.join();}}void start();void stop();void addTimerEvent(TimerEvent::s_ptr event);void cancelTimeEvent(TimerEvent::s_ptr event);void handleTimerfdInEpoll();
private:const int epollfd_;std::shared_ptr<Timer> timer_;std::thread t;//单独起一个线程,进行轮询epollThreadPool& thread_pool_;std::atomic<bool> stop_;
};//TimerPollPoller.cpp
#include "TimerPollPoller.h"void TimerPollPoller::start()
{t = std::move(std::thread(&TimerPollPoller::handleTimerfdInEpoll, this));
}
void TimerPollPoller::stop()
{stop_.store(true);
}
void TimerPollPoller::addTimerEvent(TimerEvent::s_ptr event)
{timer_->addTimerEvent(event);
}
void TimerPollPoller::cancelTimeEvent(TimerEvent::s_ptr event)
{timer_->deleteTimerEvent(event);
}
void TimerPollPoller::handleTimerfdInEpoll()
{struct epoll_event event;stop_.store(false);while(!stop_.load()){int numEvents = ::epoll_wait(epollfd_, &event, 1, 0);if(numEvents == 1){std::shared_ptr<Timer> timer_ptr = *reinterpret_cast<std::shared_ptr<Timer>*>(event.data.ptr);timer_ptr->onTimer();std::vector<std::function<void()>> callbacks = std::move(timer_ptr->getCallbacks());for(auto task:callbacks){thread_pool_.commit(task);}}}
}
处理任务的线程池
#pragma once
#include <atomic>
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <functional>
class ThreadPool {
public:static ThreadPool& instance() {static ThreadPool ins;return ins;}using Task = std::packaged_task<void()>;~ThreadPool() {stop();}template <class F, class... Args>auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {using RetType = decltype(f(args...));if (stop_.load())return std::future<RetType>{};auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<RetType> ret = task->get_future();{std::lock_guard<std::mutex> cv_mt(cv_mt_);//将任务放进任务队列中tasks_.emplace([task] { (*task)(); });}//唤醒一个线程cv_lock_.notify_one();return ret;}int idleThreadCount() {return thread_num_;}
private:ThreadPool(const ThreadPool&) = delete;ThreadPool& operator=(const ThreadPool&) = delete;ThreadPool(unsigned int num = std::thread::hardware_concurrency()): stop_(false) {{if (num < 1)thread_num_ = 1;elsethread_num_ = num;}start();}//启动所有线程void start() {for (int i = 0; i < thread_num_; ++i) {pool_.emplace_back([this]() {while (!this->stop_.load()) {Task task;{std::unique_lock<std::mutex> cv_mt(cv_mt_);this->cv_lock_.wait(cv_mt, [this] {//stop_为true或者tasks_不为空(return 返回true),则进行下一步,否则阻塞在条件变量上return this->stop_.load() || !this->tasks_.empty();});if (this->tasks_.empty())return;task = std::move(this->tasks_.front());this->tasks_.pop();}this->thread_num_--;task();this->thread_num_++;}});}}void stop() {stop_.store(true);cv_lock_.notify_all();for (auto& td : pool_) {if (td.joinable()) {std::cout << "join thread " << td.get_id() << std::endl;td.join();}}}
private:std::mutex cv_mt_;std::condition_variable cv_lock_;std::atomic_bool stop_;std::atomic_int thread_num_;std::queue<Task> tasks_;std::vector<std::thread> pool_;
};
测试代码
#include "TimerPollPoller.h"
#include <iostream>
void print()
{std::cout << "I love psy" << std::endl;
}
void print1()
{std::cout << "I love fl" << std::endl;
}int main()
{TimerPollPoller timerPollPoller;TimerEvent::s_ptr timer1 = std::make_shared<TimerEvent>(500, true, print);TimerEvent::s_ptr timer2 = std::make_shared<TimerEvent>(1000, true, print1);timerPollPoller.addTimerEvent(timer1);timerPollPoller.addTimerEvent(timer2);std::this_thread::sleep_for(std::chrono::seconds(2));timerPollPoller.cancelTimeEvent(timer1);std::this_thread::sleep_for(std::chrono::seconds(2));return 0;
}
makefile
PATH_SRC := .
PATH_BIN = bin
PATH_OBJ = objCXX := g++
CXXFLAGS := -g -O0 -std=c++11 -lpthread -Wall -Wno-deprecated -Wno-unused-but-set-variable
CXXFLAGS += -I./SRCS := $(wildcard $(PATH_SRC)/*.cpp)
OBJS := $(patsubst $(PATH_SRC)/%.cpp,$(PATH_OBJ)/%.o,$(SRCS)) TARGET := $(PATH_BIN)/main# 默认目标:生成可执行文件
all : $(TARGET)# 链接规则
$(TARGET): $(OBJS)$(CXX) $(CXXFLAGS) $(OBJS) -o $@ $(PATH_OBJ)/%.o: $(PATH_SRC)/%.cpp $(CXX) $(CXXFLAGS) -c $< -o $@ clean:rm -rf $(PATH_OBJ)/*.o $(TARGET).PHONY : clean
使用之间,在当前目录下需要创建bin目录和obj目录,然后再进行make,就能在bin目录下生产可执行程序main