【Easylive】项目常见问题解答(自用&持续更新中…) 汇总版
这个定时任务系统主要包含两个核心功能:每日数据统计和临时文件清理。下面我将详细解析这两个定时任务的实现逻辑和技术要点:
@Component
@Slf4j
public class SysTask {@Resourceprivate StatisticsInfoService statisticsInfoService;@Resourceprivate AppConfig appConfig;@Scheduled(cron = "0 0 0 * * ?")public void statisticsData() {statisticsInfoService.statisticsData();}@Scheduled(cron = "0 */1 * * * ?")public void delTempFile() {String tempFolderName = appConfig.getProjectFolder() + Constants.FILE_FOLDER + Constants.FILE_FOLDER_TEMP;File folder = new File(tempFolderName);File[] listFile = folder.listFiles();if (listFile == null) {return;}String twodaysAgo = DateUtil.format(DateUtil.getDayAgo(2), DateTimePatternEnum.YYYYMMDD.getPattern()).toLowerCase();Integer dayInt = Integer.parseInt(twodaysAgo);for (File file : listFile) {Integer fileDate = Integer.parseInt(file.getName());if (fileDate <= dayInt) {try {FileUtils.deleteDirectory(file);} catch (IOException e) {log.info("删除临时文件失败", e);}}}}
}
一、statisticsData()
每日数据统计任务
执行时机:每天凌晨0点(0 0 0 * * ?
)
1. 数据统计流程
2. 核心代码解析
public void statisticsData() {// 1. 准备数据结构List<StatisticsInfo> statisticsInfoList = new ArrayList<>();final String statisticsDate = DateUtil.getBeforeDayDate(1); // 获取昨天的日期// 2. 播放量统计(Redis → DB)Map<String, Integer> videoPlayCountMap = redisComponent.getVideoPlayCount(statisticsDate);// 处理视频ID格式List<String> playVideoKeys = videoPlayCountMap.keySet().stream().map(item -> item.substring(item.lastIndexOf(":") + 1)).collect(Collectors.toList());// 3. 关联用户信息VideoInfoQuery query = new VideoInfoQuery();query.setVideoIdArray(playVideoKeys.toArray(new String[0]));List<VideoInfo> videoInfoList = videoInfoMapper.selectList(query);// 4. 按用户聚合播放量Map<String, Integer> videoCountMap = videoInfoList.stream().collect(Collectors.groupingBy(VideoInfo::getUserId,Collectors.summingInt(item -> videoPlayCountMap.getOrDefault(Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId(), 0))));// 5. 构建播放统计对象videoCountMap.forEach((userId, count) -> {StatisticsInfo info = new StatisticsInfo();info.setStatisticsDate(statisticsDate);info.setUserId(userId);info.setDataType(StatisticsTypeEnum.PLAY.getType());info.setStatisticsCount(count);statisticsInfoList.add(info);});// 6. 补充其他维度数据addFansData(statisticsDate, statisticsInfoList); // 粉丝统计addCommentData(statisticsDate, statisticsInfoList); // 评论统计addInteractionData(statisticsDate, statisticsInfoList); // 点赞/收藏/投币// 7. 批量入库statisticsInfoMapper.insertOrUpdateBatch(statisticsInfoList);
}
3. 关键技术点
-
Redis+DB混合统计:
• 播放量等高频数据先记录到Redis
• 定时任务从Redis获取昨日数据后持久化到DB -
多维度统计:
• 播放量:基于视频ID聚合后关联用户
• 粉丝量:直接查询DB关系数据
• 互动数据:统一处理点赞/收藏/投币等行为 -
批量操作优化:
• 使用insertOrUpdateBatch
实现批量upsert
• 减少数据库连接次数提升性能
二、delTempFile()
临时文件清理任务
执行时机:每分钟执行一次(0 */1 * * * ?
)
1. 文件清理逻辑
public void delTempFile() {// 1. 构建临时文件夹路径String tempFolder = appConfig.getProjectFolder() + Constants.FILE_FOLDER + Constants.FILE_FOLDER_TEMP;// 2. 获取两天前的日期(基准线)String twodaysAgo = DateUtil.format(DateUtil.getDayAgo(2), "yyyyMMdd");Integer thresholdDate = Integer.parseInt(twodaysAgo);// 3. 遍历临时文件夹File folder = new File(tempFolder);File[] files = folder.listFiles();if (files == null) return;for (File file : files) {try {// 4. 按日期命名规则清理旧文件Integer fileDate = Integer.parseInt(file.getName());if (fileDate <= thresholdDate) {FileUtils.deleteDirectory(file); // 递归删除}} catch (Exception e) {log.error("删除临时文件失败", e);}}
}
2. 设计要点
-
安全机制:
• 文件名强制使用日期格式(如20230815
)
• 只删除命名合规的文件夹 -
容错处理:
• 捕获IOException
防止单次失败影响后续操作
• 空文件夹自动跳过 -
性能考虑:
• 高频检查(每分钟)但低负载(仅处理过期文件)
• 使用FileUtils
工具类保证删除可靠性
三、架构设计亮点
-
解耦设计:
• 统计服务与业务逻辑分离
• 文件清理与业务模块隔离 -
数据一致性:
• 实时数据写入Redis保证性能
• 定时同步到DB保证持久化 -
扩展性:
• 新增统计维度只需添加对应方法
• 文件清理策略可配置化
四、潜在优化建议
-
统计任务优化:
// 可考虑分片统计(大用户量场景) @Scheduled(cron = "0 0 1 * * ?") public void statsUserShard1() {statisticsService.processByUserRange(0, 10000); }
-
文件清理增强:
// 添加文件大小监控 if (file.length() > MAX_TEMP_FILE_SIZE) {alertService.notifyOversizeFile(file); }
-
异常处理强化:
@Scheduled(...) public void safeStatistics() {try {statisticsData();} catch (Exception e) {log.error("统计任务失败", e);retryLater(); // 延迟重试机制} }
这套定时任务系统通过合理的职责划分和稳健的实现方式,有效解决了数据统计和资源清理这两类经典的后台任务需求。
@Override
public void statisticsData() {// 创建统计结果容器List<StatisticsInfo> statisticsInfoList = new ArrayList<>();// 获取统计日期(昨天)final String statisticsDate = DateUtil.getBeforeDayDate(1);// ========== 播放量统计 ==========// 从Redis获取昨日所有视频的播放量数据// 格式:Map<"video_play_count:20230815:video123", 播放次数>Map<String, Integer> videoPlayCountMap = redisComponent.getVideoPlayCount(statisticsDate);// 提取视频ID列表(去掉前缀)List<String> playVideoKeys = new ArrayList<>(videoPlayCountMap.keySet());playVideoKeys = playVideoKeys.stream().map(item -> item.substring(item.lastIndexOf(":") + 1)) // 截取最后一个:后的视频ID.collect(Collectors.toList());// 构建视频查询条件VideoInfoQuery videoInfoQuery = new VideoInfoQuery();videoInfoQuery.setVideoIdArray(playVideoKeys.toArray(new String[playVideoKeys.size()]));// 批量查询视频基本信息List<VideoInfo> videoInfoList = videoInfoMapper.selectList(videoInfoQuery);// 按用户ID分组统计总播放量Map<String, Integer> videoCountMap = videoInfoList.stream().collect(Collectors.groupingBy(VideoInfo::getUserId, // 按用户ID分组Collectors.summingInt(item -> {// 重组Redis key获取该视频播放量String redisKey = Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId();Integer count = videoPlayCountMap.get(redisKey);return count == null ? 0 : count; // 空值保护})));// 转换播放统计结果对象videoCountMap.forEach((userId, count) -> {StatisticsInfo statisticsInfo = new StatisticsInfo();statisticsInfo.setStatisticsDate(statisticsDate); // 统计日期statisticsInfo.setUserId(userId); // 用户IDstatisticsInfo.setDataType(StatisticsTypeEnum.PLAY.getType()); // 数据类型=播放statisticsInfo.setStatisticsCount(count); // 播放次数statisticsInfoList.add(statisticsInfo); // 加入结果集});// ========== 粉丝量统计 ==========// 从数据库查询昨日粉丝变化数据List<StatisticsInfo> fansDataList = this.statisticsInfoMapper.selectStatisticsFans(statisticsDate);// 设置统计维度和日期for (StatisticsInfo statisticsInfo : fansDataList) {statisticsInfo.setStatisticsDate(statisticsDate); // 统一日期格式statisticsInfo.setDataType(StatisticsTypeEnum.FANS.getType()); // 数据类型=粉丝}statisticsInfoList.addAll(fansDataList); // 合并结果// ========== 评论统计 ==========// 从数据库查询昨日评论数据List<StatisticsInfo> commentDataList = this.statisticsInfoMapper.selectStatisticsComment(statisticsDate);// 设置统计维度和日期for (StatisticsInfo statisticsInfo : commentDataList) {statisticsInfo.setStatisticsDate(statisticsDate); // 统一日期格式statisticsInfo.setDataType(StatisticsTypeEnum.COMMENT.getType()); // 数据类型=评论}statisticsInfoList.addAll(commentDataList); // 合并结果// ========== 互动行为统计 ==========// 查询点赞/收藏/投币数据(参数为行为类型数组)List<StatisticsInfo> statisticsInfoOthers = this.statisticsInfoMapper.selectStatisticsInfo(statisticsDate,new Integer[]{UserActionTypeEnum.VIDEO_LIKE.getType(), // 点赞UserActionTypeEnum.VIDEO_COIN.getType(), // 投币UserActionTypeEnum.VIDEO_COLLECT.getType() // 收藏});// 转换行为类型为统计类型for (StatisticsInfo statisticsInfo : statisticsInfoOthers) {statisticsInfo.setStatisticsDate(statisticsDate); // 统一日期格式// 行为类型转换if (UserActionTypeEnum.VIDEO_LIKE.getType().equals(statisticsInfo.getDataType())) {statisticsInfo.setDataType(StatisticsTypeEnum.LIKE.getType()); // 点赞} else if (UserActionTypeEnum.VIDEO_COLLECT.getType().equals(statisticsInfo.getDataType())) {statisticsInfo.setDataType(StatisticsTypeEnum.COLLECTION.getType()); // 收藏} else if (UserActionTypeEnum.VIDEO_COIN.getType().equals(statisticsInfo.getDataType())) {statisticsInfo.setDataType(StatisticsTypeEnum.COIN.getType()); // 投币}}statisticsInfoList.addAll(statisticsInfoOthers); // 合并结果// ========== 最终入库 ==========// 批量插入或更新统计结果this.statisticsInfoMapper.insertOrUpdateBatch(statisticsInfoList);
}
这段代码使用Java 8的Stream API和Collectors工具类,实现了按用户ID分组统计视频总播放量的功能。下面我将从多个维度进行详细解析:
一、代码结构分解
Map<String, Integer> videoCountMap = videoInfoList.stream().collect(Collectors.groupingBy(VideoInfo::getUserId, Collectors.summingInt(item -> {String redisKey = Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId();Integer count = videoPlayCountMap.get(redisKey);return count == null ? 0 : count;})));
二、逐层解析
1. 数据准备阶段
• 输入:videoInfoList
(视频信息列表,包含videoId
和userId
等字段)
• 目标:生成Map<用户ID, 该用户所有视频的总播放量>
2. Stream流水线
videoInfoList.stream()
将List转换为Stream,准备进行流式处理。
3. 核心收集器
Collectors.groupingBy(VideoInfo::getUserId, // 分组依据:用户IDCollectors.summingInt(...) // 聚合方式:求和
)
• groupingBy
:按用户ID分组,生成Map<String, List<VideoInfo>>
• summingInt
:对每组内的元素进行整数求和
4. 播放量计算逻辑
item -> {// 重组Redis key:video_play_count:20230815:video123String redisKey = Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId();// 从预加载的Map获取播放量Integer count = videoPlayCountMap.get(redisKey);// 空值保护(没有记录则视为0次播放)return count == null ? 0 : count;
}
三、关键技术点
1. 嵌套收集器
• 外层:groupingBy
按用户分组
• 内层:summingInt
对播放量求和
形成两级聚合操作。
2. 实时Key重组
Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId()
动态拼接Redis key,与之前从Redis加载数据时的key格式严格一致:
• 常量前缀:video_play_count:
• 日期分区:20230815
• 视频ID::video123
3. 空值防御
count == null ? 0 : count
处理可能存在的:
• Redis中无该视频记录
• 视频刚上传尚未有播放数据
四、内存数据流演示
假设原始数据:
videoInfoList = [{videoId: "v1", userId: "u1"}, {videoId: "v2", userId: "u1"},{videoId: "v3", userId: "u2"}
]videoPlayCountMap = {"video_play_count:20230815:v1": 100,"video_play_count:20230815:v2": 50,"video_play_count:20230815:v3": 200
}
执行过程:
- 流处理开始
- 遇到v1(用户u1)→ 计算播放量100 → u1分组累计100
- 遇到v2(用户u1)→ 计算播放量50 → u1分组累计150
- 遇到v3(用户u2)→ 计算播放量200 → u2分组累计200
最终结果:
{u1=150, u2=200}
五、设计优势
-
高效聚合
单次遍历完成分组+求和,时间复杂度O(n) -
内存友好
流式处理避免中间集合的创建 -
可维护性
清晰表达业务逻辑:
“按用户分组,求和每个用户所有视频的播放量” -
扩展性
如需修改统计逻辑(如求平均值),只需替换summingInt
为其他收集器
六、潜在优化方向
1. 并行处理(大数据量时)
videoInfoList.parallelStream()...
注意线程安全和顺序保证
2. 缓存Key构建
// 预先生成videoId到播放量的映射,避免重复拼接字符串
Map<String, Integer> videoIdToCount = ...;
3. 异常处理增强
try {Integer count = videoPlayCountMap.get(redisKey);return Optional.ofNullable(count).orElse(0);
} catch (Exception e) {log.warn("播放量统计异常 videoId:{}", item.getVideoId());return 0;
}
这段代码展示了如何优雅地结合Stream API和集合操作,实现复杂的数据聚合统计,是Java函数式编程的典型实践。