redis实现延迟任务(二)

实现思路

        我们实现文章地定时发布主要是利用zset地score属性。我们可以在score里存入要发布地时间戳地值,然后在定时刷新任务方法里,通过获取本地时间与score里的时间进行对比,因为本地时间是在不断变大的,如果大于等于的话那么就将他放到立即执行的list列表任务里。

        除了定时任务还有立即发布的,立即发布的话就不需要存储score了,我们就存在list属性里面就可以了。

redis里list与set

这里为什么要存储在两个不同的数据类型里面呢,可以分析一下:

list:是有序的,并且可以允许key的重复,插入和删除数据快但是查询一般般,list为双向链表,当需要立即发布的时候,数据量特别大的时候,list的效率要高于zset,普遍情况下也都是立即发布。

set:是无序的,value是不可以重复的,并且查找很快。

       

数据存储

由于redis最好不要存放大量的数据,因为redis是基于缓存实现的,虽然最后也会有持久化aof等,但是大量的数据还是存储在数据库中比较好,同时为了防止Zset里面的数据量太大了发生阻塞,所以我们只往zset放未来五分钟内要实现的任务。其他的先放在数据库里,当然这就涉及到了数据同步的问题。

数据同步

我们需要将符合时间条件的数据从数据库当中读取到redis里,又需要将符合条件即将执行的数据从redis里的set集合里读取到list集合里面。

下面给出具体的实现代码

代码实现

model

task类:

这里的参数是需要放置将来的任务对象的,序列化成byte就好了。也可以序列化成json,这个是根据数据协议来的。用fastjson序列化也是可以的。

package com.neu.schedule.model.po;import lombok.Data;import java.io.Serializable;/*** @BelongsProject: llyz-neu-project* @BelongsPackage: com.neu.schedule.model.po* @Author: zzm* @CreateTime: 2024-01-04  21:29* @Description: TODO* @Version: 1.0*/
@Data
public class Task implements Serializable {/*** 任务id*/private Long taskId;/*** 类型*/private Integer taskType;/*** 优先级*/private Integer priority;/*** 执行id long类型时间戳*/private long executeTime;/*** task参数*/private byte[] parameters;}

taskinfo:

package com.neu.schedule.model.po;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;import java.io.Serializable;
import java.util.Date;/*** <p>** </p>** @author zzm*/
@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {private static final long serialVersionUID = 1L;/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;/*** 执行时间*/@TableField("execute_time")private Date executeTime;/*** 参数*/@TableField("parameters")private byte[] parameters;/*** 优先级*/@TableField("priority")private Integer priority;/*** 任务类型*/@TableField("task_type")private Integer taskType;}

taskInfoLog:用来记录task任务运行结果的

package com.neu.schedule.model.po;import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;import java.io.Serializable;
import java.util.Date;/*** <p>** </p>** @author zzm*/
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {private static final long serialVersionUID = 1L;/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;/*** 执行时间*/@TableField("execute_time")private Date executeTime;/*** 参数*/@TableField("parameters")private byte[] parameters;/*** 优先级*/@TableField("priority")private Integer priority;/*** 任务类型*/@TableField("task_type")private Integer taskType;/*** 版本号,用乐观锁*/@Version//乐观锁,mybatis支持的private Integer version;/*** 状态 0=int 1=EXECUTED 2=CANCELLED*/@TableField("status")private Integer status;}

乐观锁支持:

/*** mybatis-plus乐观锁支持* @return*/
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;
}

这个肯定不能任务重复执行,其实乐观锁的思想多采用cas,当有多个实例对同一个任务执行的时候,看谁先将version字段设置成功了,谁就可以执行该任务,它比悲观锁的效率高。

service

这里就不一一解释每个方法的内容了,只说点重要代码和大概思路。

任务队列,添加任务,删除任务和实现任务。

添加任务:首先将task添加到数据库,然后看看任务的执行时间,如果超过五分钟,就不放入redis里,等着后期同步,如果是小于五分钟大于当前时间就放到set里,如果小于等于当前时间就放到redis的list里。

删除任务:从数据库里面删除,再去redis里删除,有就删除。

拉取任务:poll。由于最后都是从list结合里执行数据,所以只从list里面弹就可以了。

执行任务:这个方法是需要定时刷新的,用的是schedule注解,这个需要在启动类上加注解:

@EnableScheduling//开启定时任务

所有的任务执行都需要在taskinfolog里面记录status的执行结果,结果的状态大家可以自行定义。

package com.neu.schedule.service.impl;import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.neu.base.constants.ScheduleConstants;
import com.neu.base.redis.CacheService;
import com.neu.schedule.mapper.TaskinfoLogsMapper;
import com.neu.schedule.mapper.TaskinfoMapper;
import com.neu.schedule.model.po.Task;
import com.neu.schedule.model.po.Taskinfo;
import com.neu.schedule.model.po.TaskinfoLogs;
import com.neu.schedule.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import javax.annotation.PostConstruct;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Set;/*** @BelongsProject: llyz-neu-project* @BelongsPackage: com.neu.schedule.service.impl* @Author: zzm* @CreateTime: 2024-01-04  21:30* @Description: TODO* @Version: 1.0*/
@Service
@Transactional
@Slf4j
public class TaskServiceImpl implements TaskService {/*** 添加延迟任务** @param task* @return*/@Overridepublic long addTask(Task task) {//1.添加任务到数据库中boolean success = addTaskToDb(task);if (success) {//2.添加任务到redisaddTaskToCache(task);}return task.getTaskId();}@Autowiredprivate CacheService cacheService;/*** 把任务添加到redis中** @param task*/private void addTaskToCache(Task task) {String key = task.getTaskType() + "_" + task.getPriority();//获取5分钟之后的时间  毫秒值Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);long nextScheduleTime = calendar.getTimeInMillis();//2.1 如果任务的执行时间小于等于当前时间,存入list中,list的key是topic+type+priority,可以重复if (task.getExecuteTime() <= System.currentTimeMillis()) {cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));} else if (task.getExecuteTime() <= nextScheduleTime) {//2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());}}@Autowiredprivate TaskinfoMapper taskinfoMapper;@Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;/*** 添加任务到数据库中** @param task* @return*/private boolean addTaskToDb(Task task) {boolean flag = false;try {//保存任务表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task, taskinfo);//task里的执行时间是不一样的,是毫秒值,需要转换成Date类型taskinfo.setExecuteTime(new Date(task.getExecuteTime()));System.out.println(taskinfo.getTaskId());taskinfoMapper.insert(taskinfo);//在插入数据库之后,用的是数据库自动生成的雪花idSystem.out.println(taskinfo.getTaskId());//设置taskIDtask.setTaskId(taskinfo.getTaskId());//保存任务日志数据TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);//乐观锁初始版本号,默认为1taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag = true;} catch (Exception e) {e.printStackTrace();}return flag;}/*** 取消任务* @param taskId* @return*/@Overridepublic boolean cancelTask(long taskId) {boolean flag = false;//删除任务,更新日志Task task = updateDb(taskId,ScheduleConstants.CANCELLED);//删除redis的数据if(task != null){removeTaskFromCache(task);flag = true;}return false;}/*** 删除redis中的任务数据* @param task*/private void removeTaskFromCache(Task task) {String key = task.getTaskType()+"_"+task.getPriority();if(task.getExecuteTime()<=System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));}else {cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));}}/*** 删除任务,更新任务日志状态* @param taskId* @param status* @return*/private Task updateDb(long taskId, int status) {Task task = null;try {//删除任务taskinfoMapper.deleteById(taskId);TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(status);taskinfoLogsMapper.updateById(taskinfoLogs);task = new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());}catch (Exception e){log.error("task cancel exception taskid={}",taskId);}return task;}/*** 按照类型和优先级拉取任务* @return*/@Overridepublic Task poll(int type,int priority) {Task task = null;try {String key = type+"_"+priority;String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);//更新数据库信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error("poll task exception");}return task;}//未来数据定时刷新@Scheduled(cron = "0 */1 * * * ?")//一分钟调用一次public void refresh() {//setnx实现分布式锁String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);if(StringUtils.isNotBlank(token)){log.info("未来数据定时刷新");System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据//参数:0:为从0开始查 0~当前时间的毫秒值Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}}}//数据库任务定时同步到redis中,每五分钟执行一次@Scheduled(cron = "0 */5 * * * ?")@PostConstruct//初始化方法,微服务启动了,就会做同步操作public void reloadData() {//清理缓存中的数据 list,zsetclearCache();log.info("数据库数据同步到缓存");//获取五分钟之后的时间,毫秒值Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);//查看小于未来5分钟的所有任务List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));if(allTasks != null && allTasks.size() > 0){for (Taskinfo taskinfo : allTasks) {Task task = new Task();BeanUtils.copyProperties(taskinfo,task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());addTaskToCache(task);}}log.info("数据库任务同步到redis");}private void clearCache(){// 删除缓存中未来数据集合和当前消费者队列的所有keySet<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_cacheService.delete(futurekeys);cacheService.delete(topickeys);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/620584.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

领域驱动设计应用之WebAPI

领域驱动设计应用之WebAPI 此篇文章主要讲述领域驱动设计在WebApi中的应用&#xff0c;以及设计方式&#xff0c;这种设计的原理以及有点。 文章目录 领域驱动设计应用之WebAPI前言一、相对于传统设计模式的有点二、WebAPI对接中的使用案例业务拆分父类设计HttpResponse(返回)…

2024PMP考试新考纲-【过程领域】近期典型真题和很详细解析(5)

今天华研荟继续为您分享【过程Process领域】的新考纲下的真题&#xff0c;进一步帮助大家体会和理解新考纲下PMP的考试特点和如何应用知识来解题&#xff0c;并且举一反三&#xff0c;在两个多月的时间内&#xff0c;一次性、高等级通过2024年PMP考试。 2024年PMP考试新考纲-【…

JavaScript基础04

1 - 数组 1.1 数组的概念 数组可以把一组相关的数据一起存放&#xff0c;并提供方便的访问(获取&#xff09;方式。 数组是指一组数据的集合&#xff0c;其中的每个数据被称作元素&#xff0c;在数组中可以存放任意类型的元素。数组是一种将一组数据存储在单个变量名下的优雅…

HBase 复制、备份、迁移

行业分享 HBase金融大数据乾坤大挪移 https://www.jianshu.com/p/cb4a645dd66a HBase跨机房迁移技术分享总结 https://www.jianshu.com/p/defc787b2704 dbaplus181期&#xff1a;腾讯金融HBase跨机房迁移实战 https://m.qlchat.com/topic/details?topicId2000003847589595 ht…

神经网络的三个特征,和卷积和最大池化有什么联系

神经网络的三个特征是层次结构、权重共享和非线性激活函数。 层次结构&#xff1a;神经网络由多个层组成&#xff0c;包括输入层、隐藏层和输出层。这种层次结构使得神经网络能够逐层提取数据的特征&#xff0c;并且通过调整每一层的权重来学习数据的表征。 权重共享&#xff…

HarmonyOS的应用类型(FA vs Stage)

HarmonyOS目前提供两种应用模型 FA(Feature Ability)模型: HarmonyOS API 7开始支持的模型,已经不再主推。 Stage模型: HarmonyOS API 9开始新增的模型,是目前主推且会长期演进的模型。在该模型中,由于提供了AbilityStage、WindowStage等类作为应用组件和Window窗口的…

(五)Python中第三方常用库(webbrower、pyautogui、smtplib、xlwt、xlrd、openpyxl等)

文章目录 一、库的安装方法二、pyautogui库&#xff08;模拟键盘按键、鼠标操作和GUI交互&#xff09;三、webbrower库&#xff08;操作浏览器&#xff09;四、smtplib库&#xff08;模拟发送邮件&#xff09;五、xlwt库&#xff08;操作Excel写入数据&#xff09;六、xlrd库&a…

C++学习笔记——友元、嵌套类、异常

目录 一、友元 一个使用友元的示例代码 输出结果 二、嵌套类 一个使用嵌套类的示例代码 输出结果 三、异常 一个使用异常处理的示例代码 输出结果 四、结论 五、使用它们的注意事项 上一篇文章链接&#xff1a; C中的继承和模板是非常强大和灵活的特性&#xff0c;它…

【HuggingFace Transformer库学习笔记】基础组件学习:Datasets

基础组件——Datasets datasets基本使用 导入包 from datasets import *加载数据 datasets load_dataset("madao33/new-title-chinese") datasetsDatasetDict({train: Dataset({features: [title, content],num_rows: 5850})validation: Dataset({features: [titl…

【图形学】探秘图形学奥秘:DDA与Bresenham算法的解密与实战

​&#x1f308;个人主页&#xff1a;Sarapines Programmer&#x1f525; 系列专栏&#xff1a;《图形学 | 图像解码》⏰诗赋清音&#xff1a;云生高巅梦远游&#xff0c; 星光点缀碧海愁。 山川深邃情难晤&#xff0c; 剑气凌云志自修。 ​ 目录 &#x1f30c;1. 初识模式识别…

AUTOSAR从入门到精通-Autosar 中断机制(六)

目录 知识储备 安全机制的程序流监控 看门狗在autosar 架构中位置 看门狗在autosar中简单流程

VMware workstation安装debian-12.1.0虚拟机并配置网络

VMware workstation安装debian-12.1.0虚拟机并配置网络 Debian 是一个完全自由的操作系统&#xff01;Debian 有一个由普罗大众组成的社区&#xff01;该文档适用于在VMware workstation平台安装debian-12.1.0虚拟机。 1.安装准备 1.1安装平台 Windows 11 1.2软件信息 软…

Nodejs软件安装​

Nodejs软件安装​ 一、简介 Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境。 官网&#xff1a;http://nodejs.cn/api/ 我们关注于 node.js 的 npm 功能&#xff0c;NPM 是随同 NodeJS 一起安装的包管理工具&#xff0c;JavaScript-NPM&#xff0c;Java-Maven&…

Redis命令 - Strings命令组常用命令

1、Set命令 SET key value [EX seconds] [PX milliseconds] [NX|XX]1.1 参数说明&#xff1a; EX seconds: 设置key的过期时间&#xff0c;单位时秒PX milliseconds: 设置key的过期时间&#xff0c;单位时毫秒NX: 只有key不存在的时候&#xff0c;才会设置key的值XX: 只有key…

花了三天的时间做了一个多功能 AI 助手

嗨&#xff01;我是团子&#xff0c;大家新年快乐呀~ 前几天看到一些好朋友在朋友圈晒自己的年度总结&#xff0c;立新年 Flag&#xff0c;看到大家一年满满的收获&#xff0c;再看看自己&#xff0c;不由得想再看看人家&#xff0c;然后再看看自己&#xff0c;然后再看看人家…

Vue学习计划-Vue3--核心语法(九)slot插槽

【slot】 插槽&#xff1a;简单说就是父组件内部使用了子组件&#xff0c;但是子组件内部某些结构需要使用者自行定义&#xff0c;此时就需要用到插槽实现 默认插槽 默认插槽的name是default 父组件 <template><div><Child><h3>默认插槽</h3>&…

MAVROS的进一步理解

一、Mavros简介 顾名思义&#xff0c; mavros就是mavlinkros。mavros是PX4官方提供的一个运行于ros下收发mavlink消息的工具&#xff0c;利用mavros可以发送mavlink消息给飞控(可以控制飞机)&#xff0c;并且可以从飞控中接受数据(例如&#xff1a;飞控的位置速度 IMU数据等等…

阿里云服务器ECS介绍_高性能云服务器_为了无法计算的价值

阿里云高性能云服务器60%单实例最大性能提升&#xff0c;35Gbps内网带宽&#xff0c;网络增强&通用型云服务器、本地SSD型云服务器、大数据型云服务器、GPU异构型云服务器&#xff0c;阿里云百科aliyunbaike.com分享阿里云高性能云服务器&#xff1a; 阿里云高性能云服务器…

大模型实战营Day4 作业

基础作业&#xff1a; 构建数据集&#xff0c;使用 XTuner 微调 InternLM-Chat-7B 模型, 让模型学习到它是你的智能小助手&#xff0c;效果如下图所示&#xff0c;本作业训练出来的模型的输出需要将不要葱姜蒜大佬替换成自己名字或昵称&#xff01; 微调前&#xff08;回答比较…