Redis如何高效实现定时任务

写在文章开头

redis通过单线程结合非阻塞事件轮询机制实现高效的网络IO和时间事件处理,这篇文章我们将从源码的角度深入分析一下redis时间事件的设计与实现。

在这里插入图片描述

Hi,我是 sharkChili ,是个不断在硬核技术上作死的 java coder ,是 CSDN的博客专家 ,也是开源项目 Java Guide 的维护者之一,熟悉 Java 也会一点 Go ,偶尔也会在 C源码 边缘徘徊。写过很多有意思的技术博客,也还在研究并输出技术的路上,希望我的文章对你有帮助,非常欢迎你关注我的公众号: 写代码的SharkChili

因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

详解redis中的时间事件

时间事件的定义

时间事件可以是单次到期执行销毁,也可以是定时任务,对此redis对于时间事件统一封装为aeTimeEvent对象,通过id来唯一标识一个事件,结合when_secwhen_ms记录任务到期执行的秒和分,而执行时间事件的函数也是交由timeProc指针所指向的函数执行。
我们以一个redis定时执行的任务为例,如下所示,该结果通过when_secwhen_ms记录秒之前的时间和毫秒的时间,一旦这个时间到了就会执行timeProc这个函数指针所指向的方法serverCron,该函数会定期执行各种任务,这一点笔者会在后文展开:

在这里插入图片描述

对应的我们给出时间事件的代码描述,即位于ae.h这个头文件中的aeTimeEvent 结构体,这就是对时间事件的封装结构体,可以看到它除了笔者上述提到的核心字段以外,还有一个next指针用于连接下一个注册的时间事件:

//时间事件
typedef struct aeTimeEvent {//时间事件的id全局递增long long id; /* time event identifier. */long when_sec; /* seconds *///时间到达的时间long when_ms; /* milliseconds *///对应时间时间的处理器aeTimeProc *timeProc;//......//连接下一个时间时间struct aeTimeEvent *next;
} aeTimeEvent;

上文提到redis的时间事件是以链表的形式关联起来,这里我们也给出时间时间统一管理对象,即时间轮询器aeEventLoop ,它通过timeEventHead记录第一个时间时间而后续的时间时间统一用时间时间的next指针进行管理:

在这里插入图片描述

对应我们也给出这段时间代码的定义,即位于ae.haeEventLoop 的定义:

typedef struct aeEventLoop {//......//管理时间事件的列表aeTimeEvent *timeEventHead;//......
} aeEventLoop;

注册时间事件

redis在服务器初始化阶段,会注册一个定时的时间事件,大约每1毫秒触发一次,该事件主要做的是:

  1. 更新redis全局时钟,该时钟用于全局变量获取时间用的。
  2. 随机抽取redis内存数据库中的样本删除过期的键值对。
  3. 如果检查到aof重写完成,则进行刷盘操作。
  4. 如果发现当前aof大小过大,则fork子进程进行aof重写操作。
  5. …。

在这里插入图片描述

对应我们给出时间事件注册的源码段,即redis初始化时调用的方法initServer中的aeCreateTimeEvent,可以看到它将定时任务封装为时间事件timeEvent,并设置时间间隔为1毫秒一次:

void initServer(void) {//....../* Create the serverCron() time event, that's our main way to process* background operations. *///创建时间事件注册到eventLoop->timeEventHead中if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {redisPanic("Can't create the serverCron time event.");exit(1);}//......
}

轮询处理时间事件

redis每次处理完所有用户的请求之后,都会调用一次时间时间处理函数processTimeEvents,轮询并处理就绪的时间事件,由此保证尽可能准时执行时间事件,如果事件时间非定时任务则执行完成直接删除,反之设置下一次执行时间。这些步骤全部完成之后,返回本次处理的时间事件数:

在这里插入图片描述

我们给出处理时间循环的入口aeMain,可以看到该函数就是redis核心函数所在,它会循环调用aeProcessEvents处理各种事件:

void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);//处理各种事件    aeProcessEvents(eventLoop, AE_ALL_EVENTS);}
}

不如aeProcessEvents可以看到该函数执行完所有用户请求之后调用processTimeEvents方法获取并执行就绪的时间事件:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{//......//处理就绪的客户端事件numevents = aeApiPoll(eventLoop, tvp);for (j = 0; j < numevents; j++) {aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* note the fe->mask & mask & ... code: maybe an already processed* event removed an element that fired and we still didn't* processed, so we check if the event is still valid. */if (fe->mask & mask & AE_READABLE) {rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}if (fe->mask & mask & AE_WRITABLE) {if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}//上述核心网络IO事件完成后处理时间事件if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */
}

最后我们就可以看到处理时间事件的核心代码段,其内部会从timeEventHead开始轮询就绪的时间事件,比对当前时间是否大于或者等于到期时间,如果是则执行当前时间事件,再判断这个事件是否是定时事件,如果是则更新下次执行时间,反之删除,最后累加本次处理的时间时间数:

static int processTimeEvents(aeEventLoop *eventLoop) {int processed = 0;aeTimeEvent *te;long long maxId;time_t now = time(NULL);//......if (now < eventLoop->lastTime) {//从时间事件头开始te = eventLoop->timeEventHead;while(te) {te->when_sec = 0;te = te->next;}}eventLoop->lastTime = now;te = eventLoop->timeEventHead;maxId = eventLoop->timeEventNextId-1;//循环处理到期的时间事件while(te) {long now_sec, now_ms;long long id;if (te->id > maxId) {te = te->next;continue;}aeGetTime(&now_sec, &now_ms);//如果现在的事件大于到达时间if (now_sec > te->when_sec ||(now_sec == te->when_sec && now_ms >= te->when_ms)){int retval;id = te->id;//调用时间时间函数处理该事件retval = te->timeProc(eventLoop, id, te->clientData);//更新处理数processed++;//.....if (retval != AE_NOMORE) {//如果事件类型不是AE_NOMORE则说明是定时事件更新周期,反之删除aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);} else {aeDeleteTimeEvent(eventLoop, id);}te = eventLoop->timeEventHead;} else {te = te->next;}}return processed;
}

redis对于时间事件实现上的优化

因为时间事件有些要求定期执行,所以redis为了保证时间执行的实时性,做了如下两个优化:

  1. 对于比较耗时的时间事件,例如AOF重写,通过fork子进程异步完成:
  2. 对于返回给客户端套接字的内容,如果长度超过预设的值,会主动让出线程执行权,避免时间时间饥饿。

在这里插入图片描述

对应的我们给出第一点时间时间对于aof重写的核心代码段,可以看到serverCron内部判断如果当前没有rdb和aof子进程,且需要进行aof重写则调用rewriteAppendOnlyFileBackground函数fork子进程进行aof重写:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {//....../* Start a scheduled AOF rewrite if this was requested by the user while* a BGSAVE was in progress. *///aof_rewrite_scheduled设置为1,且没有其他持久化子进程则进行aof重写,通过异步避免耗时if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&server.aof_rewrite_scheduled){rewriteAppendOnlyFileBackground();}//......
}//fork子进程进行aof重写
int rewriteAppendOnlyFileBackground(void) {//......if ((childpid = fork()) == 0) {//fork子进程进行aof重写char tmpfile[256];/* Child */closeListeningSockets(0);redisSetProcTitle("redis-aof-rewrite");//生成一个tmp文件snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {//重写aofsize_t private_dirty = zmalloc_get_private_dirty();//......exitFromChild(0);} else {exitFromChild(1);}} else {//......}return REDIS_OK; /* unreached */
}

而回复给客户端结果的处理器sendReplyToClient内部也有一段,判断如果写入数totwritten 大于REDIS_MAX_WRITE_PER_EVENT (宏定义为64M),则直接中止写入,break退出等到下一次循环处理,避免因为这个处理导致其他时间事件饥饿而导致事件执行延期:

void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {//......while(c->bufpos > 0 || listLength(c->reply)) {//......//对于文件事件数据写入超长会让出执行权让时间事件能够尽可能的执行server.stat_net_output_bytes += totwritten;if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&(server.maxmemory == 0 ||zmalloc_used_memory() < server.maxmemory)) break;}//......
}

小结

以上便是笔者从源码角度对于redis时间事件设计与实现的全部分析,希望对你有帮助。

我是 sharkchiliCSDN Java 领域博客专家开源项目—JavaGuide contributor,我想写一些有意思的东西,希望对你有帮助,如果你想实时收到我写的硬核的文章也欢迎你关注我的公众号: 写代码的SharkChili
因为近期收到很多读者的私信,所以也专门创建了一个交流群,感兴趣的读者可以通过上方的公众号获取笔者的联系方式完成好友添加,点击备注 “加群” 即可和笔者和笔者的朋友们进行深入交流。

在这里插入图片描述

参考

《redis设计与实现》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/43651.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

项目三层架构详情

三层架构 三层架构就是为了符合“高内聚&#xff0c;低耦合”思想&#xff0c;把各个功能模块划分为表示层&#xff08;UI&#xff09;、业务逻辑层&#xff08;BLL&#xff09;和数据访问层&#xff08;DAL&#xff09;三层架构&#xff0c;各层之间采用接口相互访问&#xf…

(正向)代理 vs. 反向代理

&#xff08;正向&#xff09;代理 vs. 反向代理 代理和反向代理都是针对用户而言的。 一、&#xff08;正向&#xff09;代理——代理客户端 1. 流程 代理会隐藏客户端的真实信息&#xff08;IP、端口&#xff09;&#xff0c;代替客户端在互联网上发起请求&#xff0c;并将…

09:C语言进阶篇一

C语言进阶篇一 数据类型1.1、内存占用与sizeof运算符1.2、有符号数和无符号数1.3、整形数和浮点型数存储方式1.4、数据类型转换1.4.1、隐式转换1.4.2、强制转换 数据类型 基本数据类型&#xff1a;char&#xff0c;short&#xff0c;int&#xff0c;long&#xff0c;float&…

什么是RLHF(基于人类反馈的强化学习)?

什么是RLHF&#xff08;基于人类反馈的强化学习&#xff09;&#xff1f; 基于人类反馈的强化学习&#xff08;Reinforcement Learning from Human Feedback, RLHF&#xff09;是一种结合强化学习和人类反馈的技术&#xff0c;用于训练智能体&#xff0c;使其行为更符合人类期…

哪些类型的工作需要六西格玛绿带培训?

一、六西格玛绿带是什么&#xff1f; 首先&#xff0c;让我们来了解一下六西格玛绿带。六西格玛绿带是六西格玛管理体系中的一个重要角色&#xff0c;他们通常负责在项目中执行六西格玛方法和工具&#xff0c;协助黑带完成复杂的项目任务。绿带需要掌握基本的六西格玛知识和技…

OpenJudge | 最高的分数

目录 描述输入输出样例输入样例输出思路方法一方法二 CodeCC 总时间限制: 1000ms 内存限制: 65536kB 描述 孙老师讲授的《计算概论》这门课期中考试刚刚结束&#xff0c;他想知道考试中取得的最高分数。因为人数比较多&#xff0c;他觉得这件事情交给计算机来做比较方便。你能…

萝卜快跑:未来出行的双刃剑

欢迎来到 破晓的历程的 博客 ⛺️不负时光&#xff0c;不负己✈️ 在这个日新月异的科技时代&#xff0c;无人驾驶技术正以前所未有的速度改变着我们的出行方式。萝卜快跑&#xff0c;作为自动驾驶出租车领域的佼佼者&#xff0c;其出现无疑为城市交通注入了新的活力&#xff…

如何在在system_real_robot.launch修改订阅的雷达

在 system_real_robot.launch 文件中修改订阅的雷达,以使用开源 SLAM 包(如 FastLIO 和 TARE)输出的优化后雷达话题。可以让你的系统使用这些 SLAM 包提供的高精度雷达数据。 假设你的 Launch 文件中包括这一行: xml <param name="registeredScanTopic" ty…

Kylin系列(六)查询优化:提升 Kylin 查询性能

目录 1. Kylin查询优化的基础知识 1.1 Kylin的架构概述 1.2 Cube的构建与存储 2. 索引设计与优化 2.1 选择适当的维度和度量 2.2 使用层级维度 2.3 使用字典编码 3. 查询改写与优化 3.1 选择合适的查询语法 3.2 避免不必要的计算 3.3 使用过滤条件 4. Cube设计优化…

政企单位光纤资源高效管理与优化策略

引言 随着信息技术的飞速发展&#xff0c;政企单位对于通信基础设施的管理要求日益提高。然而&#xff0c;传统的管理模式&#xff0c;如Excel表格记录和纸质审批流程&#xff0c;已难以满足当前复杂多变的业务需求。在此背景下&#xff0c;我们实施了光纤管理的数字化转型项目…

双栈实现一个队列

两个栈可实现将列表倒序&#xff1a;设有含三个元素的栈 A [1,2,3] 和空栈 B [] 。若循环执行 A 元素出栈并添加入栈 B &#xff0c;直到栈 A 为空&#xff0c;则 A [] , B [3,2,1] &#xff0c;即栈 B 元素为栈 A 元素倒序。 利用栈 B 删除队首元素&#xff1a;倒序后&am…

自定义异步线程服务

异步线程池配置&#xff1a; /*** 启动异步线程-并配置线程池*/ Configuration EnableAsync public class AsyncConfig {Bean(name "taskExecutor")public Executor taskExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setC…

玩转springboot之SpringBoot单元测试

SpringBoot单元测试 spring单元测试 之前在spring项目中使用单元测试时是使用注解RunWith(SpringJUnit4ClassRunner.class)来进行的 RunWith(SpringJUnit4ClassRunner.class)// 通过自动织入从应用程序上下文向测试本身注入beanWebAppConfiguration // 指定web环境ContextConfi…

电商出海第一步,云手机或成重要因素

电商出海第一步并非易事&#xff0c;挑战和机遇并存&#xff0c;出海企业或个人或将借助云手机从而达成商业部署全球化的目的&#xff1b; 下面我们从网络稳定、数据安全、成本、以及多平台适配方面来看&#xff0c;究竟为什么说云手机会成为出海的重要因素&#xff1b; 首先…

新手前端系列入门-什么是前端开发

一、什么是前端 前端&#xff0c;也称为前端开发或客户端开发&#xff0c;一般是指在构建网站或Web应用程序时&#xff0c;与用户直接交互的部分。就是指那些我们在网页上能看到、能直接跟用户打交道的部分。 简单来说&#xff0c;就是你打开一个网站&#xff0c;能看到的所有…

西门子大手笔又买一家公司,2024年“两买”和“两卖”的背后……

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》 更多的海量【智能制造】相关资料&#xff0c;请到智能制造online知识星球自行下载。 今年&#xff0c;这家全球工业巨头不仅精准出击&#xff0c…

第4章 引擎提供的着色器工具函数和数据结构

4.1 UnityShaderVariables.cginc文件中的着色器常量和函数 4.1.1 进行变换操作用的矩阵 1.判断USING DIRECTIONAL LIGTH宏是否定义并分析与立体渲染相关的宏 立体多例化渲染技术的核心思想是一次向渲染管道上提交两份待渲染的几何体数据&#xff0c;减少绘制调用&#xff08;d…

【信创国产化】Nacos 2.3.2连接达梦数据库

JeecgBoot 目前提供的nacos版本号 2.3.2已经支持与达梦数据库对接。 jeecg-boot/jeecg-server-cloud/jeecg-cloud-nacos项目默认加入了达梦驱动和yml配置。如果你是老代码&#xff0c;可以参考下面的步骤手工集成 项目地址&#xff1a;https://github.com/jeecgboot/JeecgBoot…

Anaconda 安装与基本使用总结

最近需要在服务器上安装和使用aconada&#xff0c;发现之前总是在网上找教程&#xff0c;每次都要找&#xff0c;很麻烦。这次就自己写一个安装笔记。以备日后使用。 1.服务器系统版本 ubuntu22.04 2. 软件安装 aconda软件的安装可以下面的教程&#xff08;实测有效&#xf…

斐波那契查找算法

斐波那契查找原理&#xff0c;仅仅改变了中间结点(mid)的位置&#xff0c;mid不再是中间或插值得到,而是位于黄金分割点附近&#xff0c;即midlowF(k-1)-1(F代表斐波那契数列) F[k]F[k-1]F[k-2],>(F[k]-1) (F[k-1]-1)(F[k-2]-1)1 说明:只要顺序表的长度为F[k]-1,则可以将该…