文章目录
- 昨日回顾
- 今日内容
- 1 延迟任务
- 1.1 概述
- 1.2 技术对比
- 1.2.1 DelayQueue
- 1.2.2 RabbitMQ
- 1.2.3 Redis实现
- 1.2.4 总结
- 2 redis实现延迟任务
- 2.0 实现思路
- 2.1 思考
- 2.2 初步配置实现
- 2.2.1 导入heima-leadnews-schedule模块
- 2.2.2 在Nacos注册配置管理leadnews-schedule
- 2.2.3 导入表结构
- 2.2.4 根据表结构导入实体类及其mapper
- 2.2.5 表结构中的乐观锁
- 2.2.5.1 在启动类中加入乐观锁的拦截器
- 2.2.6 安装redis
- 2.2.7 在项目中集成redis
- 2.2.7.1 导入redis依赖
- 2.2.7.2 为redis添加连接配置
- 2.2.7.3 拷贝工具类CacheService
- 2.2.7.4 将CacheService注册到spring自动配置
- 2.2.7.5 测试List
- 2.2.7.6 测试Zset
- 2.3 添加任务
- 2.3.1 导入task类
- 2.3.2 创建TaskService
- 2.3.3 测试
- 2.4 取消任务
- 2.4.1 Service
- 2.4.2 测试
- 2.5 拉取任务
- 2.5.1 Service
- 2.5.2 测试
- 2.6 定时刷新
- 2.6.1 如何获取zset中所有的key?
- 2.6.2 数据如何同步?
- 2.6.3 Redis管道
- 2.6.4 zSet和List数据同步实现
- 2.6.5 开启定时任务
- 2.6.6 分布式下的Schedule
- 2.6.7 Redis分布式锁
- 2.6.8 数据库和Redis的同步
- 2.7 延迟队列对外接口
- 2.7.1 IScheduleClinet接口
- 2.7.2 在微服务中实现类
- 2.8 发布文章集成延迟队列
- 2.8.1 添加askTypeEnum类枚举类
- 2.8.2 Task的参数序列化
- 2.8.3 实现文章发布集成接口及实现类
- 2.8.4 修改文章发布逻辑
- 2.8.5 启动测试
- 2.9 消费任务审核文章
- 2.9.1 综合测试
昨日回顾
今日内容
1 延迟任务
1.1 概述
1.2 技术对比
1.2.1 DelayQueue
1.2.2 RabbitMQ
1.2.3 Redis实现
1.2.4 总结
2 redis实现延迟任务
2.0 实现思路
2.1 思考
2.2 初步配置实现
2.2.1 导入heima-leadnews-schedule模块
2.2.2 在Nacos注册配置管理leadnews-schedule
spring:redis:host: 192.168.204.129password: leadnewsport: 6379datasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=falseusername: rootpassword: 123sjbsjb# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:mapper-locations: classpath*:mapper/*.xml# 设置别名包扫描路径,通过该属性可以给包中的类注册别名type-aliases-package: com.heima.model.schedule.pojosminio:accessKey: miniosecretKey: minio123bucket: leadnewsendpoint: http://192.168.204.129:9000readPath: http://192.168.204.129:9000
2.2.3 导入表结构
2.2.4 根据表结构导入实体类及其mapper
导入heima-leadnews-model模块下com.heima.model.schedule下导入两个Taskinfo和TaskinfoLogs实体类
@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {private static final long serialVersionUID = 1L;/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;/*** 执行时间*/@TableField("execute_time")private Date executeTime;/*** 参数*/@TableField("parameters")private byte[] parameters;/*** 优先级*/@TableField("priority")private Integer priority;/*** 任务类型*/@TableField("task_type")private Integer taskType;}
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {private static final long serialVersionUID = 1L;/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;/*** 执行时间*/@TableField("execute_time")private Date executeTime;/*** 参数*/@TableField("parameters")private byte[] parameters;/*** 优先级*/@TableField("priority")private Integer priority;/*** 任务类型*/@TableField("task_type")private Integer taskType;/*** 版本号,用乐观锁*/@Versionprivate Integer version;/*** 状态 0=int 1=EXECUTED 2=CANCELLED*/@TableField("status")private Integer status;}
对应mapper
@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {}
@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {public List<Taskinfo> queryFutureTime(@Param("taskType")int taskType, @Param("priority")int priority, @Param("future")Date future);
}
TaskinfoMapper对应的mybatis的xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heima.schedule.mapper.TaskinfoMapper"><select id="queryFutureTime" resultType="com.heima.model.schedule.pojos.Taskinfo">select *from taskinfowhere task_type = #{taskType}and priority = #{priority}and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date}</select></mapper>
2.2.5 表结构中的乐观锁
@Version
private Integer version;
2.2.5.1 在启动类中加入乐观锁的拦截器
在heima-leadnews-schedule模块下的启动类中加入乐观锁的拦截器
@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
public class ScheduleApplication {public static void main(String[] args) {SpringApplication.run(ScheduleApplication.class,args);}@Beanpublic MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;}
}
2.2.6 安装redis
移除已有redis
docker rm redis
创建新的redis容器
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"
密码leadnews
2.2.7 在项目中集成redis
2.2.7.1 导入redis依赖
在heima-leadnews-common模块中添加redis依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency>
2.2.7.2 为redis添加连接配置
在nacos中的leadnews-schedule配置中心为redis添加配置
spring:redis:host: 192.168.204.129password: leadnewsport: 6379
2.2.7.3 拷贝工具类CacheService
拷贝工具类CacheService到heima-leadnews-common的com.heima.common.redis下
2.2.7.4 将CacheService注册到spring自动配置
2.2.7.5 测试List
在heima-leadnews-schedule中创建RedisTest测试类
@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {@Autowiredprivate CacheService cacheService;@Testpublic void testList() {//在List的左边添加元素cacheService.lLeftPush("list_001", "hello,redis");}
}
@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {@Autowiredprivate CacheService cacheService;@Testpublic void testList() {//在List的左边添加元素//cacheService.lLeftPush("list_001", "hello,redis");//在List的右边获取元素并删除String value = cacheService.lRightPop("list_001");System.out.println(value);}
}
查看redis发现已经没有数据了
2.2.7.6 测试Zset
@Test
public void testZset() {//添加元素到Zset中,按照分值cacheService.zAdd("zset_key_001", "hello zset 001", 1000);cacheService.zAdd("zset_key_002", "hello zset 002", 8888);cacheService.zAdd("zset_key_003", "hello zset 003", 7777);cacheService.zAdd("zset_key_004", "hello zset 004", 99999);//按照分值获取元素
}
获取前三条数据
@Test
public void testZset() {/*//添加元素到Zset中,按照分值cacheService.zAdd("zset_key_001", "hello zset 001", 1000);cacheService.zAdd("zset_key_001", "hello zset 002", 8888);cacheService.zAdd("zset_key_001", "hello zset 003", 7777);cacheService.zAdd("zset_key_001", "hello zset 004", 99999);*///按照分值获取元素Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);System.out.println(zset_key_001);
}
2.3 添加任务
2.3.1 导入task类
@Data
public class Task implements Serializable {/*** 任务id*/private Long taskId;/*** 类型*/private Integer taskType;/*** 优先级*/private Integer priority;/*** 执行id*/private long executeTime;/*** task参数*/private byte[] parameters;}
2.3.2 创建TaskService
在heima-leadnews-schedule模块下创建com.heima.schedule.service.TaskService接口及实现
public interface TaskService {/*** 添加延迟任务* @param task* @return*/long addTask(Task task);
}
实现包含
1.添加任务到数据库中
2.添加任务到redis中
2.1 如果任务的执行时间小于当前时间,直接执行任务
2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中
@Service
@Slf4j
public class TaskServiceImpl implements TaskService {/*** 添加延迟任务* @param task* @return*/@Overridepublic long addTask(Task task) {//1.添加任务到数据库中boolean success= addTaskToDB(task);//2.添加任务到redis中if(success){addTaskToRedis(task);}return task.getTaskId();}@Autowiredprivate CacheService cacheService;/*** 添加任务到redis中* @param task*/private void addTaskToRedis(Task task) {String key = task.getTaskType()+"_"+task.getPriority();//获取预设时间,5分钟后Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE,5);long nextSchedule = calendar.getTimeInMillis();//2.1 如果任务的执行时间小于当前时间,直接执行任务if(task.getExecuteTime()<=System.currentTimeMillis()){cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));}//2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中else if(task.getExecuteTime()>System.currentTimeMillis()&&task.getExecuteTime()<=nextSchedule){cacheService.zAdd(ScheduleConstants.FUTURE+key,JSON.toJSONString(task),task.getExecuteTime());}}@Autowiredprivate TaskinfoMapper taskinfoMapper;@Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;/*** 添加任务到数据库中* @param task* @return*/private boolean addTaskToDB(Task task) {boolean flag = false;try {//1.保存任务表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task,taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);//设置Task的idtask.setTaskId(taskinfo.getTaskId());//2.保存任务日志表TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo,taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag = true;} catch (Exception e) {log.error("添加任务到数据库失败",e);e.printStackTrace();}return flag;}
}
还有个常量类放入heima-leadnews-common模块下的com.heima.common.constant包下
package com.heima.common.constants;public class ScheduleConstants {//task状态public static final int SCHEDULED=0; //初始化状态public static final int EXECUTED=1; //已执行状态public static final int CANCELLED=2; //已取消状态public static String FUTURE="future_"; //未来数据key前缀public static String TOPIC="topic_"; //当前数据key前缀
}
2.3.3 测试
public class TaskServiceImpl implements TaskService
点击TaskService,CONTROL+SHIFT+T创建测试
@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
class TaskServiceImplTest {@Autowiredprivate TaskService taskService;@Testvoid addTask() {Task task = new Task();task.setTaskType(100);task.setPriority(50);task.setExecuteTime(new Date().getTime()+2000);task.setParameters("task test".getBytes());long taskId = taskService.addTask(task);log.info("taskId:{}", taskId);}
}
显示如此
2.4 取消任务
2.4.1 Service
boolean deleteTask(Long taskId);
/*** 删除任务* @param taskId* @return*/
@Override
public boolean deleteTask(Long taskId) {boolean flag = false;//1.删除数据库中的任务int success = taskinfoMapper.deleteById(taskId);if(success==0){return flag;}try {//2.更新日志状态TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(ScheduleConstants.CANCELLED);taskinfoLogsMapper.updateById(taskinfoLogs);//3.删除redis中的任务Task task = new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());String key = task.getTaskType()+"_"+task.getPriority();if(task.getExecuteTime()<=System.currentTimeMillis()) {cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task));}else{cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task));}flag = true;} catch (Exception e) {log.error("删除任务失败",e);e.printStackTrace();}return flag;
}
2.4.2 测试
@Test
void deleteTask() {boolean flag = taskService.deleteTask(1773909243989106689);log.info("flag:{}", flag);
}
2.5 拉取任务
2.5.1 Service
Task poll(int type, int priority);
/*** 按照类型和优先级拉取任务* @return*/
@Override
public Task poll(int type,int priority) {Task task = null;try {String key = type+"_"+priority;String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);//更新数据库信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error("poll task exception");}return task;
}
2.5.2 测试
@Test
void testPoll() {Task task = taskService.poll(100, 50);log.info("task:{}", task);
}
拉取成功
2.6 定时刷新
2.6.1 如何获取zset中所有的key?
@Test
public void testKeys() {Set<String> keys = cacheService.keys(ScheduleConstants.FUTURE + "*");System.out.println("方式一:");System.out.println(keys);Set<String> scan = cacheService.scan(ScheduleConstants.FUTURE + "*");System.out.println("方式二:");System.out.println(scan);
}
2.6.2 数据如何同步?
2.6.3 Redis管道
//耗时6151
@Test
public void testPiple1(){long start =System.currentTimeMillis();for (int i = 0; i <10000 ; i++) {Task task = new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());cacheService.lLeftPush("1001_1", JSON.toJSONString(task));}System.out.println("耗时"+(System.currentTimeMillis()- start));
}@Test
public void testPiple2(){long start = System.currentTimeMillis();//使用管道技术List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {@Nullable@Overridepublic Object doInRedis(RedisConnection redisConnection) throws DataAccessException {for (int i = 0; i <10000 ; i++) {Task task = new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());}return null;}});System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
}
使用管道技术执行10000次自增操作共耗时:2481毫秒
2.6.4 zSet和List数据同步实现
Cron表达式 @Scheduled(cron="0 */1 * * * ?")
在TaskService中添加方法
public void refresh()
/*** 定时刷新队列,每分钟刷新*/
@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}
}
新增测试方法
public void addTaskNew() {for (int i = 0; i < 5; i++) {Task task = new Task();task.setTaskType(100 + i);task.setPriority(50);task.setParameters("task test".getBytes());task.setExecuteTime(new Date().getTime() + 500 * i);long taskId = taskService.addTask(task);}
}
2.6.5 开启定时任务
在启动类中添加@EnableScheduling
@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
@EnableScheduling
public class ScheduleApplication {public static void main(String[] args) {SpringApplication.run(ScheduleApplication.class,args);}@Beanpublic MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;}
}
启动ScheduleApplication
未来任务已经刷新
2.6.6 分布式下的Schedule
再启动一个ScheduleApplication端口为51702
2.6.7 Redis分布式锁
在heima-leadnews-common的工具类com.heima.common.redis.CacheService中添加方法
/*** 加锁** @param name* @param expire* @return*/
public String tryLock(String name, long expire) {name = name + "_lock";String token = UUID.randomUUID().toString();RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();RedisConnection conn = factory.getConnection();try {//参考redis命令://set key value [EX seconds] [PX milliseconds] [NX|XX]Boolean result = conn.set(name.getBytes(),token.getBytes(),Expiration.from(expire, TimeUnit.MILLISECONDS),RedisStringCommands.SetOption.SET_IF_ABSENT //NX);if (result != null && result)return token;} finally {RedisConnectionUtils.releaseConnection(conn, factory,false);}return null;
}
在定时刷新前加上锁操作
@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);if (StringUtils.isBlank(token)) {log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}}
}
2.6.8 数据库和Redis的同步
在com.heima.schedule.service.impl.TaskServiceImpl中添加新的reloadData方法,数据库任务定时同步到redis中
@PostConstruct
是开机自动同步
@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {clearCache();log.info("数据库数据同步到缓存");Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);//查看小于未来5分钟的所有任务List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));if(allTasks != null && allTasks.size() > 0){for (Taskinfo taskinfo : allTasks) {Task task = new Task();BeanUtils.copyProperties(taskinfo,task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());addTaskToRedis(task);}}
}private void clearCache(){// 删除缓存中未来数据集合和当前消费者队列的所有keySet<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_cacheService.delete(futurekeys);cacheService.delete(topickeys);
}
删除redis中数据,重新启动服务
同步成功
2.7 延迟队列对外接口
2.7.1 IScheduleClinet接口
对外通过Fegin进行接口调用
在heima-leadnews-feign-api模块下创建com.heima.apis.schedule包
再创建接口IScheduleClinet接口,将com.heima.schedule.service.TaskService接口的东西复制过来
@FeignClient(value = "leadnews-schedule")
public interface IScheduleClient {/*** 添加延迟任务* @param task* @return*/@PostMapping("/api/v1/task/add")public ResponseResult addTask(@RequestBody Task task);/*** 删除任务* @param taskId* @return*/@GetMapping("/api/v1/task/{taskId}")public ResponseResult cancelTask(@PathVariable("taskId") long taskId);/*** 按照类型和优先级拉取* @param type* @param priority* @return*/@GetMapping("/api/v1/{type}/{priority}")public ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority);
}
2.7.2 在微服务中实现类
在heima-leadnews-schedule模块下创建com.heima.schedule.feign.ScheduleClient实现类(充当Controller)
@RestController
public class ScheduleClient implements IScheduleClient {@Autowiredprivate TaskService taskService;/*** 添加延迟任务* @param task* @return*/@PostMapping("/api/v1/task/add")@Overridepublic ResponseResult addTask(@RequestBody Task task){return ResponseResult.okResult(taskService.addTask(task));}/*** 删除任务* @param taskId* @return*/@GetMapping("/api/v1/task/{taskId}")@Overridepublic ResponseResult cancelTask(@PathVariable("taskId") long taskId){return ResponseResult.okResult(taskService.cancelTask(taskId));}/*** 按照类型和优先级拉取* @param type* @param priority* @return*/@GetMapping("/api/v1/task/{type}/{priority}")@Overridepublic ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority){return ResponseResult.okResult(taskService.poll(type, priority));}
}
2.8 发布文章集成延迟队列
2.8.1 添加askTypeEnum类枚举类
定义枚举类com.heima.model.common.enums.TaskTypeEnum类
@Getter
@AllArgsConstructor
public enum TaskTypeEnum {NEWS_SCAN_TIME(1001, 1,"文章定时审核"),REMOTEERROR(1002, 2,"第三方接口调用失败,重试");private final int taskType; //对应具体业务private final int priority; //业务不同级别private final String desc; //描述信息
}
2.8.2 Task的参数序列化
Task的参数是一个二进制数据,所以需要序列化
引入序列化工具
导入两个工具类
导入依赖
<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>1.6.0</version>
</dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>1.6.0</version>
</dependency>
2.8.3 实现文章发布集成接口及实现类
添加com.heima.wemedia.service.WmNewsTaskService接口
public interface WmNewsTaskService {/*** 添加文章自动发布任务* @param id 文章id* @param publishTime 发布时间*/public void addNewsToTask(Integer id, Date publishTime);
}
实现类com.heima.wemedia.service.impl.WmNewsTaskServiceImpl
@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {@Autowiredprivate IScheduleClient scheduleClient;@Overridepublic void addNewsToTask(Integer id, Date publishTime) {log.info("添加文章自动发布任务,文章id:{},发布时间:{}",id,publishTime);Task task = new Task();task.setExecuteTime(publishTime.getTime());task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());WmNews wmNews = new WmNews();wmNews.setId(id);task.setParameters(ProtostuffUtil.serialize(wmNews));scheduleClient.addTask(task);log.info("添加文章自动发布任务成功");}
}
2.8.4 修改文章发布逻辑
修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑
第五步审核时,把任务先放到队列中,放在队列中再通过拉取任务进行审核
@Autowired
private WmNewAutoScanService wmNewAutoScanService;
@Autowired
private WmNewsTaskService wmNewsTaskService;
@Override
public ResponseResult submitNews(WmNewsDto wmNewsDto) {// 0.参数检查if(wmNewsDto == null||wmNewsDto.getContent()==null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//1. 保存或修改文章WmNews wmNews = new WmNews();BeanUtils.copyProperties(wmNewsDto,wmNews);//1.1 封面if(wmNewsDto.getImages()!=null&& wmNewsDto.getImages().size()>0){String imageStr = StringUtils.join(wmNewsDto.getImages(), ",");wmNews.setImages(imageStr);}//1.2 如果封面为自动-1,则需要手动设置封面规则if(wmNewsDto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){wmNews.setType(null);}saveOrUpdateWmNews(wmNews);//2.判断是否为草稿,如果为草稿结束当前方法if(wmNews.getStatus().equals(WmNews.Status.NORMAL.getCode())){return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}//3.不是草稿,保存文章内容与图片素材的关系//3.1 获取文章内容的图片素材List<String> imageList=extractUrlInfo(wmNewsDto.getContent());saveRelativeInfoForContent(imageList,wmNews.getId());//4.不是草稿,保存文章封面图片与图片素材的关系saveRelativeInfoForCover(wmNewsDto,wmNews,imageList);//5.审核文章//wmNewAutoScanService.autoScanMediaNews(wmNews.getId());//将文章id和发布时间添加到任务中wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNewsDto.getPublishTime());return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}
2.8.5 启动测试
2.9 消费任务审核文章
修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑
@Autowired
private WmNewAutoScanService wmNewAutoScanService;
/*** 消费任务,审核文章*/
@Override
@Async
@Scheduled(fixedRate = 1000)
public void scanNewsByTask() {log.info("开始执行文章自动审核任务");ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());if(responseResult.getCode().equals(200)&&responseResult.getData()!=null){log.info("task:{}",responseResult.getData());String jsonTask = JSON.toJSONString(responseResult.getData());Task task = JSON.parseObject(jsonTask, Task.class);//逆序列化任务参数拿到idWmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class);wmNewAutoScanService.autoScanMediaNews(wmNews.getId());}
}
这个方法并不会被调用,只需要按照一定频率拉取任务
因此添加@Scheduled(fixedRate = 1000)
1s中拉取一次
同时需要在WediaAppilcation启动类添加@EnableScheduling
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.heima.wemedia.mapper")
@EnableFeignClients(basePackages = "com.heima.apis")
@EnableAsync
@EnableScheduling
public class WemediaApplication {public static void main(String[] args) {SpringApplication.run(WemediaApplication.class,args);}@Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor() {MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));return interceptor;}@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter);return rabbitTemplate;}
}
2.9.1 综合测试
发布一个即时任务
发布一个延迟任务
查看控制台
25分即将被消费
状态为1表示消费成功!