文章目录
- 延迟任务精准发布文章
- 1)文章定时发布
- 2)延迟任务概述
- 2.1)什么是延迟任务
- 2.2)技术对比
- 2.2.1)DelayQueue
- 2.2.2)RabbitMQ实现延迟任务
- 2.2.3)redis实现
- 3)redis实现延迟任务
- 4)延迟任务服务实现
- 4.1)搭建heima-leadnews-schedule模块
- 4.2)数据库准备
- 4.3)安装redis
- 4.4)项目集成redis
- 4.5)添加任务
- 4.6)取消任务
- 4.7)消费任务
- 4.8)未来数据定时刷新
- 4.8.1)reids key值匹配
- 4.8.2)reids管道
- 4.8.3)未来数据定时刷新-功能完成
- 4.9)分布式锁解决集群下的方法抢占执行
- 4.9.1)问题描述
- 4.9.2)分布式锁
- 4.9.3)redis分布式锁
- 4.9.4)在工具类CacheService中添加方法
- 4.10)数据库同步到redis
- 5)延迟队列解决精准时间发布文章
- 5.1)延迟队列服务提供对外接口
- 5.2)发布文章集成添加延迟队列接口
- 5.3)消费任务进行审核文章
延迟任务精准发布文章
1)文章定时发布
2)延迟任务概述
2.1)什么是延迟任务
- 定时任务:有固定周期的,有明确的触发时间
- 延迟队列:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟
应用场景:
场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消
场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止
2.2)技术对比
2.2.1)DelayQueue
JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素
DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法
getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。
compareTo方法:用于排序,确定元素出队列的顺序。
实现:
1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,
2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,
3:循环的从延迟队列中拉取任务
DelayQueue实现完成之后思考一个问题:
使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保证数据不丢失,需要持久化(磁盘)
2.2.2)RabbitMQ实现延迟任务
- TTL:Time To Live (消息存活时间)
- 死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)
Queue有过期时间,到时间后会将消息转发出去,如第一个Queue的消息到期后自动发送到DLX
2.2.3)redis实现
zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序
例如:
生产者添加到4个任务到延迟队列中,时间毫秒值分别为97、98、99、100。当前时间的毫秒值为90消费者端进行监听,如果当前时间的毫秒值匹配到了延迟队列中的毫秒值就立即消费
本项目就采用redis实现!!
3)redis实现延迟任务
实现思路
- 执行时间<=当前时间 需要立即执行 进入list队列 lpush 配合 rpop
- 执行时间>当前时间 延迟执行 手动再配置一进入zset
2.1 还手动设置了一个预设(延迟)时间,比如5分钟内要执行得任务,才允许加入zset队列中- 只有list队列才是消费队列,只会去list队列找任务来消费,所以每隔一段时间需要定时刷新zset队列,把到期的任务放到list中去
问题思路
1.为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。
2.为什么redis中使用两种数据类型,list和zset?
效率问题,算法的时间复杂度
redis的list是一个双向链表,数据量大时,相对于zset,list的插入删除查找效率要高得多得多
list(当前消费队列):存放立即要执行的任务
zset(未来数据队列):存放未来要执行的任务
3.在添加zset数据的时候,为什么不需要预加载?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。
4)延迟任务服务实现
4.1)搭建heima-leadnews-schedule模块
leadnews-schedule是一个通用的服务,单独创建模块来管理任何类型的延迟任务
资料阿里云盘: https://www.alipan.com/s/5XZbRnvTYc5
①:导入资料文件夹下的heima-leadnews-schedule模块到heima-leadnews-service下,如下图所示:
heima-leadnews-service的pom.xml内导入:(记得刷新maven)
②:添加bootstrap.yml
注意server-addr的ip换成自己的
③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置
4.2)数据库准备
导入资料中leadnews_schedule数据库
taskinfo 任务表
MySQL中,BLOB是一个二进制大型对象,是一个可以存储大量数据的容器;LongBlob 最大存储 4G (上面parameters就这个类型)
实体类
heima-leadnews-model模块下新建包: cn.whu.model.schedule.pojos
taskinfo_logs 任务日志表
实体类
heima-leadnews-model模块的 cn.whu.model.schedule.pojos包下
悲观锁:每次修改之前都将该行数据上锁,直到我修改结束才解锁
乐观锁:每次修改过程中不上锁,但是修改前记录数据原始值副本,修改那一刻判断是否一致,一致才允许修改(当然这里是比较的版本号) 两种方法都有人用,但是乐观锁可能效率会更高一点
@Version修饰的字段,每次修改MP应该会自动帮你自增
乐观锁支持: ScheduleApplication.java启动类里加
4.3)安装redis
①拉取镜像
提供的虚拟机镜像已经下载了redis镜像,
docker images
可查看。
此步可略过,直接执行第二步即可
② 创建容器
指定密码:leadnews
③链接测试
打开资料中的Redis Desktop Manager,输入host、port、password链接测试
能链接成功,即可
4.4)项目集成redis
① 在项目导入redis相关依赖,已经完成
其实在heima-leadnews-common模块下早就已经引入了redis依赖,所以之前本地不开redis,项目都启动不了
② 在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis
谁要用redis,就谁配置呗
③ 拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加自动配置
工具类加了@Component注解,其他微服务导入后不一定直到要扫描这个包,这里手动配置一下
就是将StringRedisTemplate封装成了工具类
1415行的一个工具类,非常不容易了
④:测试
heima-leadnews-schedule模块的test/java下面新建cn.whu.schedule.test.RedisTest
4.5)添加任务
①:拷贝mybatis-plus生成的文件,mapper
②:创建task类,用于接收添加任务的参数
heima-leadnews-model模块下的cn.whu.model.schedule.dtos.Task
③:创建TaskService
heima-leadnews-schedule模块下
实现:
ScheduleConstants常量类
heima-leadnews-common模块的cn.whu.common.constants包下
④:测试
- addTask方法测试
taskinfo表
taskinfo_logs表:
redis: topic_前缀,表示当前就要执行的任务
4.6)取消任务
在TaskService中添加方法
实现
- 测试
4.7)消费任务
在TaskService中添加方法
实现
- 测试
先addTask一个,再poll
1)addTask后
2)poll后
4.8)未来数据定时刷新
定时刷新zset到list中
4.8.1)reids key值匹配
要判断数据是否到期,首先得获取zset中所有的key,然后遍历才能得到,那么问题来了:如何获取redis中zset的所有的key呢
方案1:keys 模糊匹配
keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞
方案2:scan ★
SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。
代码案例:
先执行这个新建一些任务
再执行下面查询keys
4.8.2)reids管道
普通redis客户端和服务器交互模式
1.上面的方式就是一条条地查,然后一条条地写redis,就是每个命令单独执行,可以,但是数据量大时效率会非常低,需要经常与redis建立连接。 (客户端每执行一条命令肯定是要与服务端建立一次连接的)
2.特点:每执行一条命令,服务端都返回一次结果
3.为了解决效率问题,redis提供了管道请求模型
Pipeline请求模型
管道模式下,会将发送的命令存放到管道,待所有命令执行完毕,服务端再统一返回一次结果。效率大大增加了!!
官方测试结果数据对比
测试案例对比:
同样10000条数据,管道只需642ms,而普通的命令方式却需要4864ms,管道快了7.6倍
4.8.3)未来数据定时刷新-功能完成
在TaskService中添加方法
在引导类中添加开启任务调度注解:@EnableScheduling
- 测试
先确保redis中有future数据 (没有就用上面的测试类添加)
然后启动ScheduleApplication即可
4.9)分布式锁解决集群下的方法抢占执行
4.9.1)问题描述
启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法
- 测试,同一个微服务启动两次
参数名最好换成别的,不然容易导致循环引用问题:
可以看到同一时刻完全相同的操作
4.9.2)分布式锁
分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。
解决方案:
4.9.3)redis分布式锁
sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作
- 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
- 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
- 客户端A执行代码完成,删除锁
- 客户端B在等待一段时间后再去请求设置key的值,设置成功
- 客户端B执行代码完成,删除锁
4.9.4)在工具类CacheService中添加方法
heima-leadnews-common模块的cn.whu.common.redis.CacheService
修改未来数据定时刷新的方法,如下:
重启ScheduleApplication的两个实例
(schedule设置的是每分钟执行一次,setNx之前是多个微服务一起执行,现在是严格交叉执行了,每分钟内只有一个实例抢占到锁,执行refresh,这种事儿也确实同一时刻执行一次就够了嘛)
- 小结:
4.10)数据库同步到redis
定时同步模块。DB–》redis
- 测试:
先执行上面的addTasks测试方法,往db里面新增一些任务(会自动同步到redis,需要手动删了)
执行之前可以先清空一下db和redis
taskInfo表:
redis:
重启schdule微服务,一个即可(server.port可能得改成serverPort)
微服务初始化时就执行了这个方法:
topic是因为这个:
5)延迟队列解决精准时间发布文章
5.1)延迟队列服务提供对外接口
提供远程的feign接口,在heima-leadnews-feign-api编写类如下:
在heima-leadnews-schedule微服务下提供对应的实现
test中已经测试过,这里就不再测试接口了,远程接口的提供到这就完成了
5.2)发布文章集成添加延迟队列接口
再创建WmNewsTaskService
heima-leadnews-wemedia模块的cn.whu.wemedia.service包下
实现:
枚举类:
heima-leadnews-model模块下
序列化工具对比
- JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组
- Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类
拷贝资料中的两个类到heima-leadnews-utils下
拷贝到:heima-leadnews-utils的cn.whu.utils.common包下面
Protostuff需要引导依赖:heima-leadnews-utils下的pom.xml
比较:
修改发布文章代码:
把之前的异步调用修改为调用延迟任务
heima-leadnews-wemedia模块的cn.whu.wemedia.service.impl.WmNewsServiceImpl#submitNews方法
- 测试1:
启动ScheduleApplication、WemediaApplication、WemediaGatewayApplication
先清空schedule库的两个表,和redis,然后新增新闻
http://localhost:8802/
提交完新闻后,会feign远程调用到schedule的微服务的addTask方法(全放入db,最近的一部分放到redis缓存)
于是添加完毕查看db:
和redis
任务虽然没有审核,但都存在db或redis中了,不着急,后面可以慢慢来写审核代码,任务丢不掉的
- 测试2:
http://localhost:8802/ 再发布两篇文章,1)一个未来6分钟,2)一个未来3分钟,而不是此刻
1)会只有db中新增记录,redis不会
2)会db中新增记录,redis新增future记录
topic是当前就可以消费的任务,future是未来5分钟之内待要消费的任务。
1)
2)
到时间后再看redis,future会刷新到topic中
5.3)消费任务进行审核文章
现在审核文章的任务都已经缓存到db或redis中啦,下面就得扫描redis消费这些任务,进行文章审核啦
heima-leadnews-wemedia模块下
WmNewsTaskService中添加方法
实现
在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling
- 测试
启动ScheduleApplication、WemediaApplication、WemediaGatewayApplication、ArticleApplication
http://localhost:8802/
1)发布文章,定时选择此刻,刷新自动审核,显示已上架
2)发布文章,定时修改为1分钟以后,刷新不自动上架,1分钟后才会上架
对于2)可以查看日志: