厂里教务之延迟任务精准发布文章

延迟任务精准发布文章

延迟任务概述

什么是延迟任务
  • 定时任务:有固定周期的,有明确的触发时间

  • 延迟队列:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟

应用场景:

场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消

场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止

技术对比
DelayQueue

JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素

DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法

getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。

compareTo方法:用于排序,确定元素出队列的顺序。

实现:

1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,

2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,

3:循环的从延迟队列中拉取任务

public class DelayedTask  implements Delayed{// 任务的执行时间private int executeTime = 0;public DelayedTask(int delay){Calendar calendar = Calendar.getInstance();calendar.add(Calendar.SECOND,delay);this.executeTime = (int)(calendar.getTimeInMillis() /1000 );}
​/*** 元素在队列中的剩余时间* @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {Calendar calendar = Calendar.getInstance();return executeTime - (calendar.getTimeInMillis()/1000);}
​/*** 元素排序* @param o* @return*/@Overridepublic int compareTo(Delayed o) {long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);return val == 0 ? 0 : ( val < 0 ? -1: 1 );}
​
​public static void main(String[] args) {DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();queue.add(new DelayedTask(5));queue.add(new DelayedTask(10));queue.add(new DelayedTask(15));
​System.out.println(System.currentTimeMillis()/1000+" start consume ");while(queue.size() != 0){DelayedTask delayedTask = queue.poll();if(delayedTask !=null ){System.out.println(System.currentTimeMillis()/1000+" cosume task");}//每隔一秒消费一次try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}     }
}

DelayQueue实现完成之后思考一个问题:

使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保证数据不丢失,需要持久化(磁盘)

RabbitMQ实现延迟任务
  • TTL:Time To Live (消息存活时间)

  • 死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)

redis实现

zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序

redis实现延迟任务

实现思路

问题思路

1.为什么任务需要存储在数据库中?

延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。

2.为什么redis中使用两种数据类型,list和zset?

效率问题,算法的时间复杂度

3.在添加zset数据的时候,为什么不需要预加载?

任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。

延迟任务服务实现

搭建changli-lnformation-schedule模块

Information-schedule是一个通用的服务,单独创建模块来管理任何类型的延迟任务

数据库准备

导入资料中leadnews_schedule数据库

taskinfo 任务表

实体类

package com.kjz.model.schedule.pojos;
​
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
​
import java.io.Serializable;
import java.util.Date;
​
/*** <p>* * </p>** @author kjz*/
@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {
​private static final long serialVersionUID = 1L;
​/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;
​/*** 执行时间*/@TableField("execute_time")private Date executeTime;
​/*** 参数*/@TableField("parameters")private byte[] parameters;
​/*** 优先级*/@TableField("priority")private Integer priority;
​/*** 任务类型*/@TableField("task_type")private Integer taskType;
​
​
}

taskinfo_logs 任务日志表

实体类

package com.heima.model.schedule.pojos;
​
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
​
import java.io.Serializable;
import java.util.Date;
​
/*** <p>* * </p>** @author itheima*/
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {
​private static final long serialVersionUID = 1L;
​/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;
​/*** 执行时间*/@TableField("execute_time")private Date executeTime;
​/*** 参数*/@TableField("parameters")private byte[] parameters;
​/*** 优先级*/@TableField("priority")private Integer priority;
​/*** 任务类型*/@TableField("task_type")private Integer taskType;
​/*** 版本号,用乐观锁*/@Versionprivate Integer version;
​/*** 状态 0=int 1=EXECUTED 2=CANCELLED*/@TableField("status")private Integer status;
​
​
}

乐观锁支持:

/*** mybatis-plus乐观锁支持* @return*/
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;
}

安装redis

①拉取镜像

docker pull redis

② 创建容器

docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "1234"

③链接测试

能链接成功,即可

项目集成redis

① 在项目导入redis相关依赖,已经完成

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency>

② 在changli-lnformation-schedule中集成redis,添加以下nacos配置,链接上redis

spring:redis:host: 192.168.200.130password: leadnewsport: 6379

③ 工具类CacheService到heima-leadnews-common模块下,并添加自动配置

④:测试

package com.kjz.schedule.test;
​
​
import com.kjz.common.redis.CacheService;
import com.kjz.schedule.ScheduleApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
​
import java.util.Set;
​
​
@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {
​@Autowiredprivate CacheService cacheService;
​@Testpublic void testList(){
​//在list的左边添加元素
//        cacheService.lLeftPush("list_001","hello,redis");
​//在list的右边获取元素,并删除String list_001 = cacheService.lRightPop("list_001");System.out.println(list_001);}
​@Testpublic void testZset(){//添加数据到zset中  分值/*cacheService.zAdd("zset_key_001","hello zset 001",1000);cacheService.zAdd("zset_key_001","hello zset 002",8888);cacheService.zAdd("zset_key_001","hello zset 003",7777);cacheService.zAdd("zset_key_001","hello zset 004",999999);*/
​//按照分值获取数据Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);System.out.println(zset_key_001);
​
​}
}

添加任务

①:拷贝mybatis-plus生成的文件,mapper

②:创建task类,用于接收添加任务的参数

package com.kjz.model.schedule.dtos;
​
import lombok.Data;
​
import java.io.Serializable;
​
@Data
public class Task implements Serializable {
​/*** 任务id*/private Long taskId;/*** 类型*/private Integer taskType;
​/*** 优先级*/private Integer priority;
​/*** 执行id*/private long executeTime;
​/*** task参数*/private byte[] parameters;}

③:创建TaskService

package com.kjz.schedule.service;
​
import com.kjz.model.schedule.dtos.Task;
​
/*** 对外访问接口*/
public interface TaskService {
​/*** 添加任务* @param task   任务对象* @return       任务id*/public long addTask(Task task) ;
​
}

实现:

package com.kjz.schedule.service.impl;
​
import com.alibaba.fastjson.JSON;
import com.kjz.common.constants.ScheduleConstants;
import com.kjz.common.redis.CacheService;
import com.kjz.model.schedule.dtos.Task;
import com.kjz.model.schedule.pojos.Taskinfo;
import com.kjz.model.schedule.pojos.TaskinfoLogs;
import com.kjz.schedule.mapper.TaskinfoLogsMapper;
import com.kjz.schedule.mapper.TaskinfoMapper;
import com.kjz.schedule.service.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
​
import java.util.Calendar;
import java.util.Date;
​
@Service
@Transactional
@Slf4j
public class TaskServiceImpl implements TaskService {/*** 添加延迟任务** @param task* @return*/@Overridepublic long addTask(Task task) {//1.添加任务到数据库中
​boolean success = addTaskToDb(task);
​if (success) {//2.添加任务到redisaddTaskToCache(task);}
​
​return task.getTaskId();}
​@Autowiredprivate CacheService cacheService;
​/*** 把任务添加到redis中** @param task*/private void addTaskToCache(Task task) {
​String key = task.getTaskType() + "_" + task.getPriority();
​//获取5分钟之后的时间  毫秒值Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);long nextScheduleTime = calendar.getTimeInMillis();
​//2.1 如果任务的执行时间小于等于当前时间,存入listif (task.getExecuteTime() <= System.currentTimeMillis()) {cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));} else if (task.getExecuteTime() <= nextScheduleTime) {//2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());}
​
​}
​@Autowiredprivate TaskinfoMapper taskinfoMapper;
​@Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;
​/*** 添加任务到数据库中** @param task* @return*/private boolean addTaskToDb(Task task) {
​boolean flag = false;
​try {//保存任务表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);
​//设置taskIDtask.setTaskId(taskinfo.getTaskId());
​//保存任务日志数据TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);
​flag = true;} catch (Exception e) {e.printStackTrace();}
​return flag;}
}

ScheduleConstants常量类

package com.kjz.common.constants;
​
public class ScheduleConstants {
​//task状态public static final int SCHEDULED=0;   //初始化状态
​public static final int EXECUTED=1;       //已执行状态
​public static final int CANCELLED=2;   //已取消状态
​public static String FUTURE="future_";   //未来数据key前缀
​public static String TOPIC="topic_";     //当前数据key前缀
}

④:测试

取消任务

在TaskService中添加方法

/*** 取消任务* @param taskId        任务id* @return              取消结果*/
public boolean cancelTask(long taskId);

实现

/*** 取消任务* @param taskId* @return*/
@Override
public boolean cancelTask(long taskId) {
​boolean flag = false;
​//删除任务,更新日志Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
​//删除redis的数据if(task != null){removeTaskFromCache(task);flag = true;}
​
​
​return false;
}
​
/*** 删除redis中的任务数据* @param task*/
private void removeTaskFromCache(Task task) {
​String key = task.getTaskType()+"_"+task.getPriority();
​if(task.getExecuteTime()<=System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));}else {cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));}
}
​
/*** 删除任务,更新任务日志状态* @param taskId* @param status* @return*/
private Task updateDb(long taskId, int status) {Task task = null;try {//删除任务taskinfoMapper.deleteById(taskId);
​TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(status);taskinfoLogsMapper.updateById(taskinfoLogs);
​task = new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());}catch (Exception e){log.error("task cancel exception taskid={}",taskId);}
​return task;
​
}

测试

消费任务

在TaskService中添加方法

/*** 按照类型和优先级来拉取任务* @param type* @param priority* @return*/
public Task poll(int type,int priority);

实现

/*** 按照类型和优先级拉取任务* @return*/
@Override
public Task poll(int type,int priority) {Task task = null;try {String key = type+"_"+priority;String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);//更新数据库信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error("poll task exception");}
​return task;
}
未来数据定时刷新
reids key值匹配

方案1:keys 模糊匹配

keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

方案2:scan

SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。

代码案例:

@Test
public void testKeys(){Set<String> keys = cacheService.keys("future_*");System.out.println(keys);
​Set<String> scan = cacheService.scan("future_*");System.out.println(scan);
}
reids管道

普通redis客户端和服务器交互模式

Pipeline请求模型

官方测试结果数据对比

测试案例对比:

//耗时6151
@Test
public  void testPiple1(){long start =System.currentTimeMillis();for (int i = 0; i <10000 ; i++) {Task task = new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());cacheService.lLeftPush("1001_1", JSON.toJSONString(task));}System.out.println("耗时"+(System.currentTimeMillis()- start));
}
​
​
@Test
public void testPiple2(){long start  = System.currentTimeMillis();//使用管道技术List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {@Nullable@Overridepublic Object doInRedis(RedisConnection redisConnection) throws DataAccessException {for (int i = 0; i <10000 ; i++) {Task task = new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());}return null;}});System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
}

4.8.3)未来数据定时刷新-功能完成

在TaskService中添加方法

@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
​// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250
​String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}
}

在引导类中添加开启任务调度注解:@EnableScheduling

分布式锁解决集群下的方法抢占执行
问题描述

启动两台changli-lnformation-schedule服务,每台服务都会去执行refresh定时任务方法

分布式锁

分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

解决方案:

redis分布式锁

sexnx (SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功

  • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败

  • 客户端A执行代码完成,删除锁

  • 客户端B在等待一段时间后再去请求设置key的值,设置成功

  • 客户端B执行代码完成,删除锁

在工具类CacheService中添加方法
/*** 加锁** @param name* @param expire* @return*/
public String tryLock(String name, long expire) {name = name + "_lock";String token = UUID.randomUUID().toString();RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();RedisConnection conn = factory.getConnection();try {
​//参考redis命令://set key value [EX seconds] [PX milliseconds] [NX|XX]Boolean result = conn.set(name.getBytes(),token.getBytes(),Expiration.from(expire, TimeUnit.MILLISECONDS),RedisStringCommands.SetOption.SET_IF_ABSENT //NX);if (result != null && result)return token;} finally {RedisConnectionUtils.releaseConnection(conn, factory,false);}return null;
}

修改未来数据定时刷新的方法,如下:

/*** 未来数据定时刷新*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){
​String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);if(StringUtils.isNotBlank(token)){log.info("未来数据定时刷新---定时任务");
​//获取所有未来数据的集合keySet<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");for (String futureKey : futureKeys) {//future_100_50
​//获取当前数据的key  topicString topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
​//按照key和分值查询符合条件的数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
​//同步数据if(!tasks.isEmpty()){cacheService.refreshWithPipeline(futureKey,topicKey,tasks);log.info("成功的将"+futureKey+"刷新到了"+topicKey);}}}
}
数据库同步到redis

@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {clearCache();log.info("数据库数据同步到缓存");Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);
​//查看小于未来5分钟的所有任务List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));if(allTasks != null && allTasks.size() > 0){for (Taskinfo taskinfo : allTasks) {Task task = new Task();BeanUtils.copyProperties(taskinfo,task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());addTaskToCache(task);}}
}
​
private void clearCache(){// 删除缓存中未来数据集合和当前消费者队列的所有keySet<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_cacheService.delete(futurekeys);cacheService.delete(topickeys);
}

延迟队列解决精准时间发布文章

延迟队列服务提供对外接口

提供远程的feign接口,在heima-leadnews-feign-api编写类如下:

package com.heima.apis.schedule;
​
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.schedule.dtos.Task;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
​
@FeignClient("leadnews-schedule")
public interface IScheduleClient {
​/*** 添加任务* @param task   任务对象* @return       任务id*/@PostMapping("/api/v1/task/add")public ResponseResult  addTask(@RequestBody Task task);
​/*** 取消任务* @param taskId        任务id* @return              取消结果*/@GetMapping("/api/v1/task/cancel/{taskId}")public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
​/*** 按照类型和优先级来拉取任务* @param type* @param priority* @return*/@GetMapping("/api/v1/task/poll/{type}/{priority}")public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority")  int priority);
}

在changli-Information-schedule微服务下提供对应的实现

package com.heima.schedule.feign;
​
import com.heima.apis.schedule.IScheduleClient;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
​
​
@RestController
public class ScheduleClient  implements IScheduleClient {
​@Autowiredprivate TaskService taskService;
​/*** 添加任务* @param task 任务对象* @return 任务id*/@PostMapping("/api/v1/task/add")@Overridepublic ResponseResult addTask(@RequestBody Task task) {return ResponseResult.okResult(taskService.addTask(task));}
​/*** 取消任务* @param taskId 任务id* @return 取消结果*/@GetMapping("/api/v1/task/cancel/{taskId}")@Overridepublic ResponseResult cancelTask(@PathVariable("taskId") long taskId) {return ResponseResult.okResult(taskService.cancelTask(taskId));}
​/*** 按照类型和优先级来拉取任务* @param type* @param priority* @return*/@GetMapping("/api/v1/task/poll/{type}/{priority}")@Overridepublic ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {return ResponseResult.okResult(taskService.poll(type,priority));}
}
发布文章集成添加延迟队列接口

在创建WmNewsTaskService

package com.heima.wemedia.service;
​
import com.heima.model.wemedia.pojos.WmNews;
​
​
public interface WmNewsTaskService {
​/*** 添加任务到延迟队列中* @param id  文章的id* @param publishTime  发布的时间  可以做为任务的执行时间*/public void addNewsToTask(Integer id, Date publishTime);
​
​
}

实现:

package com.heima.wemedia.service.impl;
​
import com.heima.apis.schedule.IScheduleClient;
import com.heima.model.common.enums.TaskTypeEnum;
import com.heima.model.schedule.dtos.Task;
import com.heima.model.wemedia.pojos.WmNews;
import com.heima.utils.common.ProtostuffUtil;
import com.heima.wemedia.service.WmNewsTaskService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
​
​
@Service
@Slf4j
public class WmNewsTaskServiceImpl  implements WmNewsTaskService {
​
​@Autowiredprivate IScheduleClient scheduleClient;
​/*** 添加任务到延迟队列中* @param id          文章的id* @param publishTime 发布的时间  可以做为任务的执行时间*/@Override@Asyncpublic void addNewsToTask(Integer id, Date publishTime) {
​log.info("添加任务到延迟服务中----begin");
​Task task = new Task();task.setExecuteTime(publishTime.getTime());task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());WmNews wmNews = new WmNews();wmNews.setId(id);task.setParameters(ProtostuffUtil.serialize(wmNews));
​scheduleClient.addTask(task);
​log.info("添加任务到延迟服务中----end");
​}}

枚举类:

package com.heima.model.common.enums;
​
import lombok.AllArgsConstructor;
import lombok.Getter;
​
@Getter
@AllArgsConstructor
public enum TaskTypeEnum {
​NEWS_SCAN_TIME(1001, 1,"文章定时审核"),REMOTEERROR(1002, 2,"第三方接口调用失败,重试");private final int taskType; //对应具体业务private final int priority; //业务不同级别private final String desc; //描述信息
}

序列化工具对比

  • JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组

  • Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类

拷贝资料中的两个类到heima-leadnews-utils下

Protostuff需要引导依赖:

<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>1.6.0</version>
</dependency>
​
<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>1.6.0</version>
</dependency>

修改发布文章代码:

把之前的异步调用修改为调用延迟任务

@Autowired
private WmNewsTaskService wmNewsTaskService;/*** 发布修改文章或保存为草稿* @param dto* @return*/
@Override
public ResponseResult submitNews(WmNewsDto dto) {
​//0.条件判断if(dto == null || dto.getContent() == null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}
​//1.保存或修改文章
​WmNews wmNews = new WmNews();//属性拷贝 属性名词和类型相同才能拷贝BeanUtils.copyProperties(dto,wmNews);//封面图片  list---> stringif(dto.getImages() != null && dto.getImages().size() > 0){//[1dddfsd.jpg,sdlfjldk.jpg]-->   1dddfsd.jpg,sdlfjldk.jpgString imageStr = StringUtils.join(dto.getImages(), ",");wmNews.setImages(imageStr);}//如果当前封面类型为自动 -1if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){wmNews.setType(null);}
​saveOrUpdateWmNews(wmNews);
​//2.判断是否为草稿  如果为草稿结束当前方法if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}
​//3.不是草稿,保存文章内容图片与素材的关系//获取到文章内容中的图片信息List<String> materials =  ectractUrlInfo(dto.getContent());saveRelativeInfoForContent(materials,wmNews.getId());
​//4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片saveRelativeInfoForCover(dto,wmNews,materials);
​//审核文章//        wmNewsAutoScanService.autoScanWmNews(wmNews.getId());wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
​return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
​
}

消费任务进行审核文章

WmNewsTaskService中添加方法

/*** 消费延迟队列数据*/
public void scanNewsByTask();

实现

@Autowired
private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
​
/*** 消费延迟队列数据*/
@Scheduled(fixedRate = 1000)
@Override
@SneakyThrows
public void scanNewsByTask() {
​log.info("文章审核---消费任务执行---begin---");
​ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());if(responseResult.getCode().equals(200) && responseResult.getData() != null){String json_str = JSON.toJSONString(responseResult.getData());Task task = JSON.parseObject(json_str, Task.class);byte[] parameters = task.getParameters();WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);System.out.println(wmNews.getId()+"-----------");wmNewsAutoScanService.autoScanWmNews(wmNews.getId());}log.info("文章审核---消费任务执行---end---");
}

在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/29006.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

部署远程控制台访问服务Rttys,第三部分服务端(安装Rttys)

安装服务端Rttys之前可选先在客户端安装rtty。服务端采用GO语言实现&#xff0c;前端界面采用vue实现。 CMAKE的安装和客户端RTTY的安装请参考前两篇文章&#xff1a; Linux远程连接程序工具选型 Webssh与Rtty 部署远程控制台访问服务Rttys&#xff0c;第一部分客户端&#…

下饺子模式一触即发,爆款的诞生仿佛“开盲盒”?

千呼万唤始出来&#xff0c;国产首款3A游戏大作《黑神话&#xff1a;悟空》即将发售。 早在2020年的8月20日当天&#xff0c;《黑神话&#xff1a;悟空》就发布了13分钟的实机演示。仅两天&#xff0c;B站播放量超过1700万&#xff0c;微博话题阅读量超过2.4亿。 从立项开始算…

自动化产线设备联网,协同打造5G智慧工厂

1、需求背景 随着信息技术、物联网、人工智能等领域的飞速发展&#xff0c;智慧工厂成为制造业升级和转型的关键方向。在智慧工厂中&#xff0c;产线设备之间的实时通信和协同操作可以提高整个生产流程的自动化水平。 提升生产效率 通过稳定的网络连接&#xff0c;保证设备之…

RT-Thread简介及启动流程分析

阅读引言&#xff1a; 最近在学习RT-Thread的内部机制&#xff0c;觉得这个启动流程和一些底层原理还是挺重要的&#xff0c; 所以写下此文。 目录 1&#xff0c; RT-Thread简介 2&#xff0c;RT-Thread任务的几种状态 3&#xff0c; 学习资源推荐 4&#xff0c; 启动流程分…

MTANet: 多任务注意力网络,用于自动医学图像分割和分类| 文献速递-深度学习结合医疗影像疾病诊断与病灶分割

Title 题目 MTANet: Multi-Task Attention Network for Automatic Medical Image Segmentation and Classification MTANet: 多任务注意力网络&#xff0c;用于自动医学图像分割和分类 01 文献速递介绍 医学图像分割和分类是当前临床实践中的两个关键步骤&#xff0c;其准…

Springboot3+自动装配

导言&#xff1a;这里主要讲述springboot3以后spring.factories功能失效&#xff0c;带来的解决办法。 之前有一次希望用springboot模块拿到工具模块的配置configuration的时候&#xff0c;想通过之前的spring.factories来实现自动装配&#xff0c;但是发现一直拿不到配置&…

数据仓库与数据挖掘(期末复习)

数据仓库与数据挖掘&#xff08;期末复习&#xff09; ETL的含义Extract 、 Transformation、Load。 ODS的全称Operational Data Store。 DW全称 Data Warehourse DM全称是Data Mart 数据仓库数据抽取时所用到技术是增量、全量、定时、调度 STAGE层作用是提供业务系统数据…

全国各区县地区生产总值数据(GDP及人均生产总值),精度超高 区县级数据

数据名称: 全国各区县地区生产总值数据 数据格式: shpexcel 数据几何类型: 面 数据精度&#xff1a;区县 数据坐标系: WGS84 数据来源&#xff1a;网络公开数据 数据可视化.

稳了?L3规模化落地在即,激光雷达公司成首批赢家

作者 | 芦苇 编辑 | 德新 在中国&#xff0c;距L3级自动驾驶的规模化落地&#xff0c;又近了一步。 随着国内试点政策刷新&#xff0c;越来越多的车企在部分市域获得了自动驾驶测试牌照&#xff0c;能上路测试的L3级自动驾驶车辆正在快速增加。 其中一个重要节点是&#xf…

C语言最终文章-二叉树

文章目录 前言二叉树的性质二叉树的存储方式顺序存储堆及其应用TopK问题堆排序 链式存储二叉树的练习1.二叉树查找值为x的节点2.判断是否为完全二叉树LC226.翻转二叉树[LC572. 另一棵树的子树](https://leetcode.cn/problems/subtree-of-another-tree/description/)两道选择题 …

目标检测:IOU

IOU&#xff08;Intersection over Union&#xff09;交并比&#xff1a; 它计算的是“预测的边框”和“真实的边框”的交叠率&#xff0c;即它们的交集和并集的比值。这个比值用于衡量预测边框与真实边框的重叠程度&#xff0c;从而评估目标检测的准确性。 在目标检测任务中…

嵌入式操作系统_2.嵌入式操作系统的一般架构

1.嵌入式操作系统的概念 嵌入式操作系统通常由硬件驱动程序、调式代理、操作系统内核、文件系统和可配置组件等功能组成&#xff0c;并为应用软件提供标准的API&#xff08;Application Programming Interface&#xff09;接口服务。 2.一般嵌入式操作系统的体系结构 从嵌入…

深度神经网络——什么是NLP(自然语言处理)?

自然语言处理&#xff08;NLP&#xff09; 是对使计算机能够处理、分析、解释和推理人类语言的技术和工具的研究和应用。 NLP 是一个跨学科领域&#xff0c;它结合了语言学和计算机科学等领域已建立的技术。 这些技术与人工智能结合使用来创建聊天机器人和数字助理&#xff0c;…

海成蜘蛛池广州官网下载

baidu搜索&#xff1a;如何联系八爪鱼SEO? baidu搜索&#xff1a;如何联系八爪鱼SEO? baidu搜索&#xff1a;如何联系八爪鱼SEO? 当我们给自己的泛目录设置仅蜘蛛抓取生成缓存的时候,我们需要模拟蜘蛛抓取测试我们的设置是否成功。绝大部分时候我们都使用网页蜘蛛模拟抓取测…

2024.618到底买什么数码值得?带你一起来看看!

在618期间&#xff0c;这些新品可能会有特别的优惠活动&#xff0c;包括但不限于折扣、满减、赠品等。因此&#xff0c;如果你正在寻找一款适合自己的数码产品&#xff0c;不妨关注各大电商平台的618促销活动&#xff0c;把握机会&#xff0c;以优惠的价格购买到心仪的产品。 …

文件操作(1)(C语言版)

前言&#xff1a; 为什么要学习文件操作&#xff1a; 1、如果大家写过一些代码&#xff0c;当运行结束的时候&#xff0c;这些运行结果将不复存在&#xff0c;除非&#xff0c;再次运行时这些结果才能展现在屏幕上面&#xff0c;就比如之前写过的通讯录。 现实中的通讯录可以保…

【数据结构初阶】--- 堆

文章目录 一、什么是堆&#xff1f;树二叉树完全二叉树堆的分类堆的实现方法 二、堆的操作堆的定义初始化插入数据&#xff08;包含向上调整详细讲解&#xff09;向上调整删除堆顶元素&#xff08;包含向下调整详细讲解&#xff09;向下调整返回堆顶元素判断堆是否为空销毁 三、…

一个开源的快速准确地将 PDF 转换为 markdown工具

大家好&#xff0c;今天给大家分享的是一个开源的快速准确地将 PDF 转换为 markdown工具。 Marker是一款功能强大的PDF转换工具&#xff0c;它能够将PDF文件快速、准确地转换为Markdown格式。这款工具特别适合处理书籍和科学论文&#xff0c;支持所有语言的转换&#xff0c;并…

2024年最佳插电式混合动力电动汽车

对电动汽车充满好奇和环保意识的司机们还没有准备好跨入纯电动汽车&#xff0c;他们可以找到一个折衷方案&#xff0c;即插电式混合动力车。 在过去的16年里&#xff0c;我一直在把握汽车行业的脉搏。试驾数百辆汽车、电动汽车、插电式混合动力车&#xff0c;跟踪汽车行业的新闻…

DAY04 HTMLCSS

文章目录 一 表单(1) 数字控件(2) 颜色控件(3) 日期控件(4) 月份控件(5) 星期控件(6) 搜索控件(7) 范围控件 二 浮动框架三 结构化标签四 CSS1 CSS概述2 CSS的编写位置1. inline style 行内样式2. inner style 内部样式3. outer style 外部样式4. 小结 3 CSS选择器1. 通用选择器…