【黑马头条】-day05延迟队列文章发布审核-Redis-zSet实现延迟队列-Feign远程调用


文章目录

  • 昨日回顾
  • 今日内容
  • 1 延迟任务
    • 1.1 概述
    • 1.2 技术对比
      • 1.2.1 DelayQueue
      • 1.2.2 RabbitMQ
      • 1.2.3 Redis实现
      • 1.2.4 总结
  • 2 redis实现延迟任务
    • 2.0 实现思路
    • 2.1 思考
    • 2.2 初步配置实现
      • 2.2.1 导入heima-leadnews-schedule模块
      • 2.2.2 在Nacos注册配置管理leadnews-schedule
      • 2.2.3 导入表结构
      • 2.2.4 根据表结构导入实体类及其mapper
      • 2.2.5 表结构中的乐观锁
        • 2.2.5.1 在启动类中加入乐观锁的拦截器
      • 2.2.6 安装redis
      • 2.2.7 在项目中集成redis
        • 2.2.7.1 导入redis依赖
        • 2.2.7.2 为redis添加连接配置
        • 2.2.7.3 拷贝工具类CacheService
        • 2.2.7.4 将CacheService注册到spring自动配置
        • 2.2.7.5 测试List
        • 2.2.7.6 测试Zset
    • 2.3 添加任务
      • 2.3.1 导入task类
      • 2.3.2 创建TaskService
      • 2.3.3 测试
    • 2.4 取消任务
      • 2.4.1 Service
      • 2.4.2 测试
    • 2.5 拉取任务
      • 2.5.1 Service
      • 2.5.2 测试
    • 2.6 定时刷新
      • 2.6.1 如何获取zset中所有的key?
      • 2.6.2 数据如何同步?
      • 2.6.3 Redis管道
      • 2.6.4 zSet和List数据同步实现
      • 2.6.5 开启定时任务
      • 2.6.6 分布式下的Schedule
      • 2.6.7 Redis分布式锁
      • 2.6.8 数据库和Redis的同步
    • 2.7 延迟队列对外接口
      • 2.7.1 IScheduleClinet接口
      • 2.7.2 在微服务中实现类
    • 2.8 发布文章集成延迟队列
      • 2.8.1 添加askTypeEnum类枚举类
      • 2.8.2 Task的参数序列化
      • 2.8.3 实现文章发布集成接口及实现类
      • 2.8.4 修改文章发布逻辑
      • 2.8.5 启动测试
    • 2.9 消费任务审核文章
      • 2.9.1 综合测试


昨日回顾

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

今日内容

在这里插入图片描述

1 延迟任务

1.1 概述

在这里插入图片描述

1.2 技术对比

1.2.1 DelayQueue

在这里插入图片描述

1.2.2 RabbitMQ

在这里插入图片描述

1.2.3 Redis实现

在这里插入图片描述

1.2.4 总结

在这里插入图片描述

2 redis实现延迟任务

2.0 实现思路

在这里插入图片描述

2.1 思考

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.2 初步配置实现

在这里插入图片描述

2.2.1 导入heima-leadnews-schedule模块

在这里插入图片描述

在这里插入图片描述

2.2.2 在Nacos注册配置管理leadnews-schedule

spring:redis:host: 192.168.204.129password: leadnewsport: 6379datasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=falseusername: rootpassword: 123sjbsjb# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:mapper-locations: classpath*:mapper/*.xml# 设置别名包扫描路径,通过该属性可以给包中的类注册别名type-aliases-package: com.heima.model.schedule.pojosminio:accessKey: miniosecretKey: minio123bucket: leadnewsendpoint: http://192.168.204.129:9000readPath: http://192.168.204.129:9000

在这里插入图片描述

2.2.3 导入表结构

在这里插入图片描述

在这里插入图片描述

2.2.4 根据表结构导入实体类及其mapper

导入heima-leadnews-model模块下com.heima.model.schedule下导入两个Taskinfo和TaskinfoLogs实体类

@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {private static final long serialVersionUID = 1L;/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;/*** 执行时间*/@TableField("execute_time")private Date executeTime;/*** 参数*/@TableField("parameters")private byte[] parameters;/*** 优先级*/@TableField("priority")private Integer priority;/*** 任务类型*/@TableField("task_type")private Integer taskType;}
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {private static final long serialVersionUID = 1L;/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;/*** 执行时间*/@TableField("execute_time")private Date executeTime;/*** 参数*/@TableField("parameters")private byte[] parameters;/*** 优先级*/@TableField("priority")private Integer priority;/*** 任务类型*/@TableField("task_type")private Integer taskType;/*** 版本号,用乐观锁*/@Versionprivate Integer version;/*** 状态 0=int 1=EXECUTED 2=CANCELLED*/@TableField("status")private Integer status;}

对应mapper

@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {}
@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {public List<Taskinfo> queryFutureTime(@Param("taskType")int taskType, @Param("priority")int priority, @Param("future")Date future);
}

TaskinfoMapper对应的mybatis的xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heima.schedule.mapper.TaskinfoMapper"><select id="queryFutureTime" resultType="com.heima.model.schedule.pojos.Taskinfo">select *from taskinfowhere task_type = #{taskType}and priority = #{priority}and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date}</select></mapper>

2.2.5 表结构中的乐观锁

@Version
private Integer version;

在这里插入图片描述

在这里插入图片描述

2.2.5.1 在启动类中加入乐观锁的拦截器

在heima-leadnews-schedule模块下的启动类中加入乐观锁的拦截器

@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
public class ScheduleApplication {public static void main(String[] args) {SpringApplication.run(ScheduleApplication.class,args);}@Beanpublic MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;}
}

2.2.6 安装redis

移除已有redis

docker rm redis

创建新的redis容器

docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"

密码leadnews

在这里插入图片描述

2.2.7 在项目中集成redis

2.2.7.1 导入redis依赖

在heima-leadnews-common模块中添加redis依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency>
2.2.7.2 为redis添加连接配置

在nacos中的leadnews-schedule配置中心为redis添加配置

spring:redis:host: 192.168.204.129password: leadnewsport: 6379

在这里插入图片描述

2.2.7.3 拷贝工具类CacheService

拷贝工具类CacheService到heima-leadnews-common的com.heima.common.redis下

在这里插入图片描述

2.2.7.4 将CacheService注册到spring自动配置

在这里插入图片描述

2.2.7.5 测试List

在heima-leadnews-schedule中创建RedisTest测试类

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {@Autowiredprivate CacheService cacheService;@Testpublic void testList() {//在List的左边添加元素cacheService.lLeftPush("list_001", "hello,redis");}
}

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {@Autowiredprivate CacheService cacheService;@Testpublic void testList() {//在List的左边添加元素//cacheService.lLeftPush("list_001", "hello,redis");//在List的右边获取元素并删除String value = cacheService.lRightPop("list_001");System.out.println(value);}
}

在这里插入图片描述

查看redis发现已经没有数据了

在这里插入图片描述

2.2.7.6 测试Zset
@Test
public void testZset() {//添加元素到Zset中,按照分值cacheService.zAdd("zset_key_001", "hello zset 001", 1000);cacheService.zAdd("zset_key_002", "hello zset 002", 8888);cacheService.zAdd("zset_key_003", "hello zset 003", 7777);cacheService.zAdd("zset_key_004", "hello zset 004", 99999);//按照分值获取元素
}

在这里插入图片描述

获取前三条数据

@Test
public void testZset() {/*//添加元素到Zset中,按照分值cacheService.zAdd("zset_key_001", "hello zset 001", 1000);cacheService.zAdd("zset_key_001", "hello zset 002", 8888);cacheService.zAdd("zset_key_001", "hello zset 003", 7777);cacheService.zAdd("zset_key_001", "hello zset 004", 99999);*///按照分值获取元素Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);System.out.println(zset_key_001);
}

在这里插入图片描述

2.3 添加任务

在这里插入图片描述

2.3.1 导入task类

@Data
public class Task implements Serializable {/*** 任务id*/private Long taskId;/*** 类型*/private Integer taskType;/*** 优先级*/private Integer priority;/*** 执行id*/private long executeTime;/*** task参数*/private byte[] parameters;}

2.3.2 创建TaskService

在heima-leadnews-schedule模块下创建com.heima.schedule.service.TaskService接口及实现

public interface TaskService {/*** 添加延迟任务* @param task* @return*/long addTask(Task task);
}

实现包含

1.添加任务到数据库中

2.添加任务到redis中

2.1 如果任务的执行时间小于当前时间,直接执行任务

2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中

@Service
@Slf4j
public class TaskServiceImpl implements TaskService {/*** 添加延迟任务* @param task* @return*/@Overridepublic long addTask(Task task) {//1.添加任务到数据库中boolean success= addTaskToDB(task);//2.添加任务到redis中if(success){addTaskToRedis(task);}return task.getTaskId();}@Autowiredprivate CacheService cacheService;/*** 添加任务到redis中* @param task*/private void addTaskToRedis(Task task) {String key = task.getTaskType()+"_"+task.getPriority();//获取预设时间,5分钟后Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE,5);long nextSchedule = calendar.getTimeInMillis();//2.1 如果任务的执行时间小于当前时间,直接执行任务if(task.getExecuteTime()<=System.currentTimeMillis()){cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));}//2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中else if(task.getExecuteTime()>System.currentTimeMillis()&&task.getExecuteTime()<=nextSchedule){cacheService.zAdd(ScheduleConstants.FUTURE+key,JSON.toJSONString(task),task.getExecuteTime());}}@Autowiredprivate TaskinfoMapper taskinfoMapper;@Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;/*** 添加任务到数据库中* @param task* @return*/private boolean addTaskToDB(Task task) {boolean flag = false;try {//1.保存任务表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task,taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);//设置Task的idtask.setTaskId(taskinfo.getTaskId());//2.保存任务日志表TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo,taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag = true;} catch (Exception e) {log.error("添加任务到数据库失败",e);e.printStackTrace();}return flag;}
}

还有个常量类放入heima-leadnews-common模块下的com.heima.common.constant包下

package com.heima.common.constants;public class ScheduleConstants {//task状态public static final int SCHEDULED=0;   //初始化状态public static final int EXECUTED=1;       //已执行状态public static final int CANCELLED=2;   //已取消状态public static String FUTURE="future_";   //未来数据key前缀public static String TOPIC="topic_";     //当前数据key前缀
}

2.3.3 测试

public class TaskServiceImpl implements TaskService点击TaskService,CONTROL+SHIFT+T创建测试

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
class TaskServiceImplTest {@Autowiredprivate TaskService taskService;@Testvoid addTask() {Task task = new Task();task.setTaskType(100);task.setPriority(50);task.setExecuteTime(new Date().getTime()+2000);task.setParameters("task test".getBytes());long taskId = taskService.addTask(task);log.info("taskId:{}", taskId);}
}

在这里插入图片描述

显示如此

2.4 取消任务

在这里插入图片描述

2.4.1 Service

boolean deleteTask(Long taskId);
/*** 删除任务* @param taskId* @return*/
@Override
public boolean deleteTask(Long taskId) {boolean flag = false;//1.删除数据库中的任务int success = taskinfoMapper.deleteById(taskId);if(success==0){return flag;}try {//2.更新日志状态TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(ScheduleConstants.CANCELLED);taskinfoLogsMapper.updateById(taskinfoLogs);//3.删除redis中的任务Task task = new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());String key = task.getTaskType()+"_"+task.getPriority();if(task.getExecuteTime()<=System.currentTimeMillis()) {cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task));}else{cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task));}flag = true;} catch (Exception e) {log.error("删除任务失败",e);e.printStackTrace();}return flag;
}

2.4.2 测试

@Test
void deleteTask() {boolean flag = taskService.deleteTask(1773909243989106689);log.info("flag:{}", flag);
}

在这里插入图片描述

2.5 拉取任务

在这里插入图片描述

2.5.1 Service

Task poll(int type, int priority);
/*** 按照类型和优先级拉取任务* @return*/
@Override
public Task poll(int type,int priority) {Task task = null;try {String key = type+"_"+priority;String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);//更新数据库信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error("poll task exception");}return task;
}

2.5.2 测试

@Test
void testPoll() {Task task = taskService.poll(100, 50);log.info("task:{}", task);
}

拉取成功

在这里插入图片描述

2.6 定时刷新

在这里插入图片描述

在这里插入图片描述

2.6.1 如何获取zset中所有的key?

在这里插入图片描述

在这里插入图片描述

@Test
public void testKeys() {Set<String> keys = cacheService.keys(ScheduleConstants.FUTURE + "*");System.out.println("方式一:");System.out.println(keys);Set<String> scan = cacheService.scan(ScheduleConstants.FUTURE + "*");System.out.println("方式二:");System.out.println(scan);
}

2.6.2 数据如何同步?

在这里插入图片描述

2.6.3 Redis管道

在这里插入图片描述

在这里插入图片描述

//耗时6151
@Test
public  void testPiple1(){long start =System.currentTimeMillis();for (int i = 0; i <10000 ; i++) {Task task = new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());cacheService.lLeftPush("1001_1", JSON.toJSONString(task));}System.out.println("耗时"+(System.currentTimeMillis()- start));
}@Test
public void testPiple2(){long start  = System.currentTimeMillis();//使用管道技术List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {@Nullable@Overridepublic Object doInRedis(RedisConnection redisConnection) throws DataAccessException {for (int i = 0; i <10000 ; i++) {Task task = new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());}return null;}});System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
}

使用管道技术执行10000次自增操作共耗时:2481毫秒

2.6.4 zSet和List数据同步实现

Cron表达式 @Scheduled(cron="0 */1 * * * ?")

在TaskService中添加方法

public void refresh()
/*** 定时刷新队列,每分钟刷新*/
@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}
}

新增测试方法

public void addTaskNew() {for (int i = 0; i < 5; i++) {Task task = new Task();task.setTaskType(100 + i);task.setPriority(50);task.setParameters("task test".getBytes());task.setExecuteTime(new Date().getTime() + 500 * i);long taskId = taskService.addTask(task);}
} 

2.6.5 开启定时任务

在启动类中添加@EnableScheduling

@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
@EnableScheduling
public class ScheduleApplication {public static void main(String[] args) {SpringApplication.run(ScheduleApplication.class,args);}@Beanpublic MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;}
}

启动ScheduleApplication

未来任务已经刷新

在这里插入图片描述

2.6.6 分布式下的Schedule

再启动一个ScheduleApplication端口为51702

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.6.7 Redis分布式锁

在这里插入图片描述

在heima-leadnews-common的工具类com.heima.common.redis.CacheService中添加方法

/*** 加锁** @param name* @param expire* @return*/
public String tryLock(String name, long expire) {name = name + "_lock";String token = UUID.randomUUID().toString();RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();RedisConnection conn = factory.getConnection();try {//参考redis命令://set key value [EX seconds] [PX milliseconds] [NX|XX]Boolean result = conn.set(name.getBytes(),token.getBytes(),Expiration.from(expire, TimeUnit.MILLISECONDS),RedisStringCommands.SetOption.SET_IF_ABSENT //NX);if (result != null && result)return token;} finally {RedisConnectionUtils.releaseConnection(conn, factory,false);}return null;
}

在定时刷新前加上锁操作

@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);if (StringUtils.isBlank(token)) {log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}}
}

在这里插入图片描述

在这里插入图片描述

2.6.8 数据库和Redis的同步

在这里插入图片描述

在com.heima.schedule.service.impl.TaskServiceImpl中添加新的reloadData方法,数据库任务定时同步到redis中

@PostConstruct是开机自动同步

@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {clearCache();log.info("数据库数据同步到缓存");Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);//查看小于未来5分钟的所有任务List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));if(allTasks != null && allTasks.size() > 0){for (Taskinfo taskinfo : allTasks) {Task task = new Task();BeanUtils.copyProperties(taskinfo,task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());addTaskToRedis(task);}}
}private void clearCache(){// 删除缓存中未来数据集合和当前消费者队列的所有keySet<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_cacheService.delete(futurekeys);cacheService.delete(topickeys);
}

在这里插入图片描述

删除redis中数据,重新启动服务

在这里插入图片描述

同步成功

2.7 延迟队列对外接口

在这里插入图片描述

2.7.1 IScheduleClinet接口

对外通过Fegin进行接口调用

在heima-leadnews-feign-api模块下创建com.heima.apis.schedule包

再创建接口IScheduleClinet接口,将com.heima.schedule.service.TaskService接口的东西复制过来

@FeignClient(value = "leadnews-schedule")
public interface IScheduleClient {/*** 添加延迟任务* @param task* @return*/@PostMapping("/api/v1/task/add")public ResponseResult addTask(@RequestBody Task task);/*** 删除任务* @param taskId* @return*/@GetMapping("/api/v1/task/{taskId}")public ResponseResult cancelTask(@PathVariable("taskId") long taskId);/*** 按照类型和优先级拉取* @param type* @param priority* @return*/@GetMapping("/api/v1/{type}/{priority}")public ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority);
}

2.7.2 在微服务中实现类

在heima-leadnews-schedule模块下创建com.heima.schedule.feign.ScheduleClient实现类(充当Controller)

@RestController
public class ScheduleClient implements IScheduleClient {@Autowiredprivate TaskService taskService;/*** 添加延迟任务* @param task* @return*/@PostMapping("/api/v1/task/add")@Overridepublic ResponseResult addTask(@RequestBody Task task){return ResponseResult.okResult(taskService.addTask(task));}/*** 删除任务* @param taskId* @return*/@GetMapping("/api/v1/task/{taskId}")@Overridepublic ResponseResult cancelTask(@PathVariable("taskId") long taskId){return ResponseResult.okResult(taskService.cancelTask(taskId));}/*** 按照类型和优先级拉取* @param type* @param priority* @return*/@GetMapping("/api/v1/task/{type}/{priority}")@Overridepublic ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority){return ResponseResult.okResult(taskService.poll(type, priority));}
}

2.8 发布文章集成延迟队列

在这里插入图片描述

2.8.1 添加askTypeEnum类枚举类

定义枚举类com.heima.model.common.enums.TaskTypeEnum类

@Getter
@AllArgsConstructor
public enum TaskTypeEnum {NEWS_SCAN_TIME(1001, 1,"文章定时审核"),REMOTEERROR(1002, 2,"第三方接口调用失败,重试");private final int taskType; //对应具体业务private final int priority; //业务不同级别private final String desc; //描述信息
}

2.8.2 Task的参数序列化

Task的参数是一个二进制数据,所以需要序列化

引入序列化工具

在这里插入图片描述

导入两个工具类

在这里插入图片描述

导入依赖

<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>1.6.0</version>
</dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>1.6.0</version>
</dependency>

2.8.3 实现文章发布集成接口及实现类

添加com.heima.wemedia.service.WmNewsTaskService接口

public interface WmNewsTaskService {/*** 添加文章自动发布任务* @param id 文章id* @param publishTime 发布时间*/public void addNewsToTask(Integer id, Date publishTime);
}

实现类com.heima.wemedia.service.impl.WmNewsTaskServiceImpl

@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {@Autowiredprivate IScheduleClient scheduleClient;@Overridepublic void addNewsToTask(Integer id, Date publishTime) {log.info("添加文章自动发布任务,文章id:{},发布时间:{}",id,publishTime);Task task = new Task();task.setExecuteTime(publishTime.getTime());task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());WmNews wmNews = new WmNews();wmNews.setId(id);task.setParameters(ProtostuffUtil.serialize(wmNews));scheduleClient.addTask(task);log.info("添加文章自动发布任务成功");}
}

2.8.4 修改文章发布逻辑

修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑

第五步审核时,把任务先放到队列中,放在队列中再通过拉取任务进行审核

@Autowired
private WmNewAutoScanService wmNewAutoScanService;
@Autowired
private WmNewsTaskService wmNewsTaskService;
@Override
public ResponseResult submitNews(WmNewsDto wmNewsDto) {// 0.参数检查if(wmNewsDto == null||wmNewsDto.getContent()==null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//1. 保存或修改文章WmNews wmNews = new WmNews();BeanUtils.copyProperties(wmNewsDto,wmNews);//1.1 封面if(wmNewsDto.getImages()!=null&& wmNewsDto.getImages().size()>0){String imageStr = StringUtils.join(wmNewsDto.getImages(), ",");wmNews.setImages(imageStr);}//1.2 如果封面为自动-1,则需要手动设置封面规则if(wmNewsDto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){wmNews.setType(null);}saveOrUpdateWmNews(wmNews);//2.判断是否为草稿,如果为草稿结束当前方法if(wmNews.getStatus().equals(WmNews.Status.NORMAL.getCode())){return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}//3.不是草稿,保存文章内容与图片素材的关系//3.1 获取文章内容的图片素材List<String> imageList=extractUrlInfo(wmNewsDto.getContent());saveRelativeInfoForContent(imageList,wmNews.getId());//4.不是草稿,保存文章封面图片与图片素材的关系saveRelativeInfoForCover(wmNewsDto,wmNews,imageList);//5.审核文章//wmNewAutoScanService.autoScanMediaNews(wmNews.getId());//将文章id和发布时间添加到任务中wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNewsDto.getPublishTime());return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}

2.8.5 启动测试

在这里插入图片描述

在这里插入图片描述

2.9 消费任务审核文章

修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑

@Autowired
private WmNewAutoScanService wmNewAutoScanService;
/*** 消费任务,审核文章*/
@Override
@Async
@Scheduled(fixedRate = 1000)
public void scanNewsByTask() {log.info("开始执行文章自动审核任务");ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());if(responseResult.getCode().equals(200)&&responseResult.getData()!=null){log.info("task:{}",responseResult.getData());String jsonTask = JSON.toJSONString(responseResult.getData());Task task = JSON.parseObject(jsonTask, Task.class);//逆序列化任务参数拿到idWmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class);wmNewAutoScanService.autoScanMediaNews(wmNews.getId());}
}

这个方法并不会被调用,只需要按照一定频率拉取任务

因此添加@Scheduled(fixedRate = 1000)1s中拉取一次

在这里插入图片描述

同时需要在WediaAppilcation启动类添加@EnableScheduling

@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.heima.wemedia.mapper")
@EnableFeignClients(basePackages = "com.heima.apis")
@EnableAsync
@EnableScheduling
public class WemediaApplication {public static void main(String[] args) {SpringApplication.run(WemediaApplication.class,args);}@Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor() {MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));return interceptor;}@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter);return rabbitTemplate;}
}

2.9.1 综合测试

发布一个即时任务

在这里插入图片描述

发布一个延迟任务

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

查看控制台
在这里插入图片描述

25分即将被消费

在这里插入图片描述

状态为1表示消费成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/798531.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL事务以及并发访问隔离级别

MySQL事务以及并发问题 事务1.什么是事务2.MySQL如何开启事务3.事务提交方式4.事务原理5.事务的四大特性&#xff08;ACID&#xff09; 事务并发问题1.并发引起的三个问题2.事务隔离级别 事务 在 MySQL 中&#xff0c;事务支持是在引擎层实现的。MySQL 是一个支持多引擎的系统&…

Java开发测试(第一篇):Java测试框架JUnit5

目录 1.基本介绍 2.maven中安装JUnit5 3.使用 4.JUnit5命名规则 5.JUnit5常用注解 6.JUnit5断言 7.JUnit5多个类之间的继承关系 8.JUnit5参数化 &#xff08;1&#xff09;使用场景&#xff1a; &#xff08;2&#xff09;使用前需在pom.xml文件中导入依赖 &#xff…

AcWing 1388. 游戏(每日一题)

原题链接&#xff1a;1388. 游戏 - AcWing题库 玩家一和玩家二共同玩一个小游戏。 给定一个包含 N 个正整数的序列。 由玩家一开始&#xff0c;双方交替行动。 每次行动可以在数列的两端之中任选一个数字将其取走&#xff0c;并给自己增加相应数字的分数。&#xff08;双方…

TCP三次握手,四次挥手

TCP为什么四次挥手&#xff1f;而不是三次&#xff1f; 正常流程&#xff1a;服务接收到 客户端的 FIN请求后&#xff0c;会发送一个ACK响应&#xff0c;等待系统资源释放后&#xff0c;再发送FIN 请求给客户端&#xff0c;客户端再发送一个ACK响应。 若为三次&#xff1a;就是…

unsigned和int相加减的错误

先看代码 #include<cstring> #include<vector> using namespace std; int main() {string s "12345678";int n s.length();cout << "n-10" << n - 10 << endl;cout << "s.length() - 10" << s.len…

第九届蓝桥杯大赛个人赛省赛(软件类)真题C 语言 A 组-航班时间

#include<iostream> using namespace std;int getTime(){int h1, h2, m1, m2, s1, s2, d 0;//d一定初始化为0&#xff0c;以正确处理不跨天的情况 scanf("%d:%d:%d %d:%d:%d (%d)", &h1, &m1, &s1, &h2, &m2, &s2, &d);return d …

基于Arduino nano配置银燕电调

1 目的 配置电调&#xff0c;设置电机转动方向&#xff0c;使得CW电机朝顺时针方向转动&#xff0c;CCW电机朝逆时针转动。 2 步骤 硬件 Arduino nano板子及USB线变阻器银燕电调EMAX Bullet 20A朗宇电机 2205 2300KV格氏电池3S杜邦线若干接线端子 软件 BLHeliSuite 注意…

数据库表设计18条黄金规则

前言 对于后端开发同学来说&#xff0c;访问数据库&#xff0c;是代码中必不可少的一个环节。 系统中收集到用户的核心数据&#xff0c;为了安全性&#xff0c;我们一般会存储到数据库&#xff0c;比如&#xff1a;mysql&#xff0c;oracle等。 后端开发的日常工作&#xff…

算法-数论-蓝桥杯

算法-数论 1、最大公约数 def gcd(a,b):if b 0:return areturn gcd(b, a%b) # a和b的最大公约数等于b与a mod b 的最大公约数def gcd(a,b):while b ! 0:cur aa bb cur%bpassreturn a欧几里得算法 a可以表示成a kb r&#xff08;a&#xff0c;b&#xff0c;k&#xff0c…

2024最新软件测试八股文,能不能拿心仪Offer就看你背得怎样了

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

Linux - mac 装 mutipass 获取 ubuntu

mutipass &#xff1a;https://multipass.run/docs/mac-tutorial mutipass list mutipass launch --name myname mutipass shell myname 获取 root权限&#xff1a; sudo su

生成式人工智能与 LangChain(预览)(上)

原文&#xff1a;Generative AI with LangChain 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 一、生成模型是什么&#xff1f; 人工智能&#xff08;AI&#xff09;取得了重大进展&#xff0c;影响着企业、社会和个人。在过去的十年左右&#xff0c;深度学习已经发…

副业选择攻略:如何找到最适合自己的那一个?

大家好&#xff0c;我是木薯。今天有个新人伙伴来咨询客服&#xff1a;新手适不适合在水牛社上做副业&#xff1f;什么样的副业适合自己&#xff1f; 这种问题其实对我们来说已经见得太多太多了&#xff0c;归其原因是因为自己对副业没有一个清晰的自我认知&#xff0c;从而感觉…

基于LEAP模型的能源环境发展、碳排放建模预测及不确定性分析

在国家“3060”碳达峰碳中和的政策背景下&#xff0c;如何寻求经济-能源-环境的平衡有效发展是国家、省份、城市及园区等不同级别经济体的重要课题。根据国家政策、当地能源结构、能源技术发展水平以及相关碳排放指标制定合理有效的低碳能源发展规划需要以科学准确的能源环境发…

Spring——框架介绍

每一个Java技术中都会存在一个“核心对象”&#xff0c;这个核心对象来完成主要任务为了得到核心对象&#xff0c;需要创建若干个辅助对象&#xff0c;从而导致开发步骤增加JDBC中 JDBC 核心对象——PreparedStatement 通过DriverManager得到数据库厂商提供的Driver对象DriverM…

京东云轻量云主机8核16G配置租用价格1198元1年、4688元三年

京东云轻量云主机8核16G服务器租用优惠价格1198元1年、4688元三年&#xff0c;配置为8C16G-270G SSD系统盘-5M带宽-500G月流量&#xff0c;华北-北京地域。京东云8核16G服务器活动页面 yunfuwuqiba.com/go/jd 活动链接打开如下图&#xff1a; 京东云8核16G服务器优惠价格 京东云…

MySQL数据库基础--存储引擎

MySQL体系结构 连接层 最上层是一些客户端和链接服务&#xff0c;主要完成一些类似于连接处理&#xff0c;授权认证&#xff0c;及相关的安全方案&#xff0c;服务器也会为安全接入的每个客户端验证他所具有的操作权限。 服务层 第二层架构主要完成大多数的核心服务功…

【C++入门】缺省参数、函数重载与引用

&#x1f49e;&#x1f49e; 前言 hello hello~ &#xff0c;这里是大耳朵土土垚~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f4a5;个人主页&#x…

就业班 第二阶段(python) 2401--4.3 day2 python2

七、标准数据类型 1、为什么编程语言中要有类型 类型有以下几个重要角色&#xff1a; 对机器而言&#xff0c;类型描述了内存中的电荷是怎么解释的。 对编译器或者解释器而言&#xff0c;类型可以协助确保上面那些电荷、字节在程序的运行中始终如一地被理解。 对程序员而言…

C++算法 —— 递归

一、汉诺塔问题 1.链接 面试题 08.06. 汉诺塔问题 - 力扣&#xff08;LeetCode&#xff09; 2.描述 3.思路 4.参考代码 class Solution { public:void hanota(vector<int>& A, vector<int>& B, vector<int>& C) {dfs(A.size(),A,B,C);}void…