文章目录
- 梳理前面的实现:
- Feign
- 点赞改进
- day07-积分系统
- bitmap相关命令
- 签到
- 增加签到记录
- 计算本月已连续签到的天数
- 查询签到记录
- 积分
- 表设计
- 签到-->发送RabbitMQ消息,保存积分
- 对应的消费者:**消费消息 用于保存积分**
- 增加积分
- 查询个人今日积分情况
- 排行榜
- 分页查询当前赛季积分和排名列表-redis
- 查询当前用户的当前赛季积分和排名
- 查询积分榜
- 榜单持久化
- MybatisPlus实现动态表名
- 定时任务持久化榜单到DB:
- 分页查询指定历史赛季积分和排名
- 海量数据存储策略
- 历史榜单的存储策略
- XXL-Job分布式任务
- XXL-JOB任务分片
- ~~xxl-job子任务--> 任务链~~
項目也用了Ribbon
梳理前面的实现:
Feign
Feign的拦截器 header里的key和网关是一样的
template.header(JwtConstants.USER_HEADER, userId.toString());
package com.tianji.authsdk.resource.interceptors;import com.tianji.auth.common.constants.JwtConstants;
import com.tianji.common.utils.UserContext;
import feign.RequestInterceptor;
import feign.RequestTemplate;// 服务之间的Feign调用没有用户信息,所以设置Feign拦截器获取用户信息,加到请求头,后面feign调用才有
public class FeignRelayUserInterceptor implements RequestInterceptor {@Overridepublic void apply(RequestTemplate template) {Long userId = UserContext.getUser();if (userId == null) {return;}// 将用户信息加到请求头里template.header(JwtConstants.USER_HEADER, userId.toString());}
}
点赞改进
day07-积分系统
- 积分:用户在天机学堂网站的各种交互行为都可以产生积分,积分值与行为类型有关
- 学霸天梯榜:按照每个学员的总积分排序得到的排行榜,称为学霸天梯榜。排名前三的有奖励。天梯榜每个自然月为一个赛季,月初清零。
积分获取规则
- 签到规则
连续7天奖励10分 连续14天 奖励20 连续28天奖励40分, 每月签到进度当月第一天重置 - 学习规则
每学习一小节,积分+10,每天获得上限50分 - 交互规则(有效交互数据参与积分规则,无效数据会被删除)
- 写评价 积分+10
- 写问答 积分+5 每日获得上限为20分
- 写笔记 积分+3 每次被采集+2 每日获得上限为20分
bitmap相关命令
读取BitMap中的数据:
BITFIELD key GET encoding offset
- GET:代表查询
- encoding:返回结果的编码方式,BitMap中是二进制保存,而返回结果会转为10进制,但需要一个转换规则,也就是这里的编码方式
- u:无符号整数,例如 u2,代表读2个bit位,转为无符号整数返回
- i:又符号整数,例如 i2,代表读2个bit位,转为有符号整数返回
- offset:从第几个bit位开始读取,例如0:代表从第一个bit位开始
例如,我想查询从第1天到第3天的签到记录,可以这样:
BITFIELD key GET u3 0
可以看到,返回的结果是7. 为什么是7呢?
签到记录是 11100111,从0开始,取3个bit位,刚好是111,转无符号整数,刚好是7
Redis最基础的数据类型只有5种:String、List、Set、SortedSet、Hash,其它特殊数据结构大多都是基于以上5这种数据类型。
BitMap也不例外,它是基于String结构的。因为Redis的String类型底层是SDS,也会存在一个字节数组用来保存数据。而Redis就提供了几个按位操作这个数组中数据的命令,实现了BitMap效果。
由于String类型的最大空间是512MB,也就是2的31次幂个bit,因此可以保存的数据量级是十分恐怖的。
签到
增加签到记录
签到的key前缀,后面需要拼接用户id和年月,如sign:uid:111:202403
public static final String SIGN_RECORD_KEY_PREFIX = “sign:uid:”
public SignResultVO addSignRecords() {Long userId = UserContext.getUser();LocalDate now = LocalDate.now();int dayOfMonth = now.getDayOfMonth(); // 获取当前日是这个月的多少号DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMM");String yearMonth = formatter.format(now); // 获取年月,如202403String key = RedisConstants.SIGN_RECORD_KEY_PREFIX + userId.toString() + ":" + yearMonth;// 利用bitset,将签到记录保存到redis的bitmap结构中,需要校验是否已签到int offset = dayOfMonth - 1; // 偏移量,因为下标从0开始所以减1// 校验是否已签到Boolean exists = redisTemplate.opsForValue().setBit(key, offset, true);if (exists) {throw new BadRequestException("不能重复签到");}// 计算连续签到的天数,以此计算是否有连续签到7天的额外奖励积分int signDays = countSignDays(key, dayOfMonth);// 计算连续签到的奖励积分,规则是连续签到7天加10积分,连续签到14天加20积分,连续签到28天加40积分int rewardPoints = 0;switch (signDays) {case 7:rewardPoints = 10;break;case 14:rewardPoints = 20;break;case 28:rewardPoints = 40;break;}// 发送RabbitMQ消息,保存积分rabbitMqHelper.send(MqConstants.Exchange.LEARNING_EXCHANGE,MqConstants.Key.SIGN_IN,// 积分数量 = 签到积分1+额外签到奖励积分SignInMessage.of(userId, rewardPoints + 1));// 封装结果并返回SignResultVO resultVO = SignResultVO.builder().signDays(signDays).signPoints(1) // 签到分数,默认为1,不需要重复赋值.rewardPoints(rewardPoints).build();return resultVO;
}
计算本月已连续签到的天数
计算本月已连续签到的天数: return 本月已连续签到的天数
bitfield key get u本月当前天数 0
: 获取从0到本月当前天数(后面的不要)- 与1进行&运算,得到数据num的最后一位 ; num右移一位
private int countSignDays(String key, int dayOfMonth) {// 获取本月直到当前天的签到数据,结果为十进制,返回结果为list,在第0个元素// 相当于 bitfield key get u本月当前天数 0List<Long> signList = redisTemplate.opsForValue().bitField(key,BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));// 判空if (CollUtil.isEmpty(signList)) {return 0;}int signDays = 0; // 计数器,统计连续签到天数Long num = signList.get(0);log.debug("num {}", num);// num转二进制,累加统计连续签到天数// 通过与1进行&运算,获取当前签到数据最后一位while ((num & 1) == 1) {signDays++;// 注:>>和>>>分别表示带符号右移和无符号右移,这里去的是无符号数,所以用>>>num = num >>> 1; // 右移1位,更新num,}return signDays;
}
统计的是 当前天签到,到当前天为止连续签到的天数(并不一定是最大连续签到的天数)
查询签到记录
积分
表设计
签到记录:用Bitmap
积分记录:
CREATE TABLE IF NOT EXISTS `points_record` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '积分记录表id',`user_id` bigint NOT NULL COMMENT '用户id',`type` tinyint NOT NULL COMMENT '积分方式:1-课程学习,2-每日签到,3-课程问答, 4-课程笔记,5-课程评价',`points` tinyint NOT NULL COMMENT '积分值',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',PRIMARY KEY (`id`) USING BTREE,KEY `idx_user_id` (`user_id`,`type`) USING BTREE,KEY `idx_create_time` (`create_time`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=41 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='学习积分记录,每个月底清零';
排行耪:
CREATE TABLE IF NOT EXISTS `points_board` (`id` bigint NOT NULL COMMENT '榜单id',`user_id` bigint NOT NULL COMMENT '学生id',`points` int NOT NULL COMMENT '积分值',`rank` tinyint NOT NULL COMMENT '名次,只记录赛季前100',`season` smallint NOT NULL COMMENT '赛季,例如 1,就是第一赛季,2-就是第二赛季',PRIMARY KEY (`id`) USING BTREE,UNIQUE KEY `idx_season_user` (`season`,`user_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci ROW_FORMAT=DYNAMIC COMMENT='学霸天梯榜';
tinyint
: -128~127
签到–>发送RabbitMQ消息,保存积分
// 签到--发送RabbitMQ消息,保存积分rabbitMqHelper.send(MqConstants.Exchange.LEARNING_EXCHANGE,MqConstants.Key.SIGN_IN,// 积分数量 = 签到积分1+额外签到奖励积分SignInMessage.of(userId, rewardPoints + 1));
对应的消费者:消费消息 用于保存积分
public class LearningPointListener {private final IPointsRecordService pointsRecordService;/**** 接收签到增加的积分的消息,并增加积分* @param message 接受的参数类型为SignInMessage*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "signs.points.queue", durable = "true"),exchange = @Exchange(value = MqConstants.Exchange.LEARNING_EXCHANGE, type = ExchangeTypes.TOPIC),key = MqConstants.Key.SIGN_IN))public void signInListener(SignInMessage message) {log.debug("LearningPointListener接收签到消息,用户{},积分数量{}", message.getUserId(),message.getPoints());if (message.getUserId() == null|| message.getPoints() == null) {// 这里是接受MQ消息,中断即可,若抛异常,则会开启重试return;}// 保存积分pointsRecordService.addPointRecord(message, PointsRecordType.SIGN);}
增加积分
@Data
@NoArgsConstructor
@AllArgsConstructor(staticName = "of")
public class SignInMessage {private Long userId;// 积分数量private Integer points;
}
// incrementScore,如果key存在,则给对应的value(用户id)的score加上增量detal(新增积分数量),没有则新创建并赋值
redisTemplate.opsForZSet().incrementScore(key, userId.toString(), savePoints)
public void addPointRecord(SignInMessage message, PointsRecordType recordType) {// 判断积分是否有上限,recordType.getMaxPoints()是否大于0boolean hasMaxPoints = recordType.getMaxPoints() > 0;Long userId = message.getUserId(); // 获取当前登录用户信息// 如果有上限制,查询该用户今日该积分类型已获得的积分数量LocalDateTime now = LocalDateTime.now();int currentPoints = 0;if (hasMaxPoints) {LocalDateTime dayStartTime = DateUtils.getDayStartTime(now); // 当前开始时间LocalDateTime dayEndTime = DateUtils.getDayEndTime(now); // 当天结束时间QueryWrapper<PointsRecord> wrapper = new QueryWrapper<>();wrapper.select("sum(points) as currentPoints").eq("user_id", userId) .eq("type", recordType) .between("create_time", dayStartTime, dayEndTime); Map<String, Object> map = this.getMap(wrapper);if (map != null) {BigDecimal bigDecimal = (BigDecimal) map.get("currentPoints");currentPoints = bigDecimal.intValue();}// 判断积分是否超过上限if (currentPoints >= recordType.getMaxPoints()) {return;}}// 计算实际应保存的积分数量int savePoints = hasMaxPoints ? Math.min(recordType.getMaxPoints() - currentPoints, message.getPoints()) : message.getPoints();// 保存积分明细PointsRecord pointsRecord = PointsRecord.builder().userId(userId).type(recordType).points(savePoints).build();this.save(pointsRecord);// 累加并保存该用户总积分到redis的zset,用于生成当前赛季的排行榜DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMM");String key = RedisConstants.POINTS_BOARD_KEY_PREFIX + formatter.format(now);// incrementScore,如果key存在,则给对应的value(用户id)的score加上增量detal(新增积分数量),没有则新创建并赋值redisTemplate.opsForZSet().incrementScore(key, userId.toString(), savePoints);
}
查询个人今日积分情况
@ApiModel(value="PointsBoard对象", description="学霸天梯榜")
public class PointsBoard implements Serializable {private static final long serialVersionUID = 1L;@ApiModelProperty(value = "榜单id")@TableId(value = "id", type = IdType.ASSIGN_ID)private Long id;@ApiModelProperty(value = "学生id")@TableField("user_id")private Long userId;@ApiModelProperty(value = "积分值")@TableField("points")private Integer points;@ApiModelProperty(value = "名次,只记录赛季前100")@TableField("rank")private Integer rank;@ApiModelProperty(value = "赛季,例如 1,就是第一赛季,2-就是第二赛季")@TableField("season")private Integer season;}
排行榜
实时排行榜–>Redis
历史排行榜–>DB
// 查询当前用户排名,根据query.season区分当前赛季(redis)和历史赛季(db)
PointsBoardVO boardVO = isCurrentBoard ? queryMyCurrentBoardRank(key) : queryMyHistoryBoardRank(season);
// 分类查询赛季列表,根据query.season区分当前赛季(redis)和历史赛季(db)
List<PointsBoard> boards = isCurrentBoard ? queryCurrentBoardRankList(query, key) : queryHistoryBoardRankList(season, query);
要想形成排行榜,我们在查询数据库时,需要先对用户分组,再对积分求和,最终按照积分和排序,Sql语句是这样:
SELECT user_id, SUM(points) FROM points_record GROUP BY user_id ORDER BY SUM(points)
但是效率不高 每次都要分组
zset实现:
积分榜单汇总信息的VO:
public class PointsBoardVO {@ApiModelProperty("我的榜单排名")private Integer rank;@ApiModelProperty("我的积分值")private Integer points;@ApiModelProperty("前100名上榜人信息")private List<PointsBoardItemVO> boardList;
}
每个赛季结束定时任务持久化到DB。Redis存当月当赛季的实时排行榜
保存积分addPointRecord里的:key是“”+赛季年月
// 累加并保存该用户总积分到redis的zset,用于生成当前赛季的排行榜
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMM");
String key = RedisConstants.POINTS_BOARD_KEY_PREFIX + formatter.format(now);
// incrementScore,如果key存在,则给对应的value(用户id)的score加上增量detal(新增积分数量),没有则新创建并赋值
redisTemplate.opsForZSet().incrementScore(key, userId.toString(), savePoints);
分页查询当前赛季积分和排名列表-redis
Set<ZSetOperations.TypedTuple<String>> typedTuples = redisTemplate.opsForZSet().reverseRangeWithScores(key, start, end);
int randIndex = start + 1; // 排名计数器
List<PointsBoard> boards = new ArrayList<>();for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {Double score = typedTuple.getScore(); // 积分数String value = typedTuple.getValue(); // 用户idif (StrUtil.isBlank(value) || score == null) { // 判空防止空指针continue;}PointsBoard board = PointsBoard.builder().rank(randIndex++).points(score.intValue()).userId(Long.parseLong(value)).build();boards.add(board);
}
ZSetOperations.TypedTuple<String> 是针对 Redis 有序集合(ZSet) 的元素及其分数的封装。这个 TypedTuple<String> 实际上代表了一个带有值和分数的 有序集合元素。
在 Redis 中,有序集合是基于 分数(score) 对 元素(value) 排序的。这个 TypedTuple 中有两个部分:
值:这是存储在有序集合中的元素本身(可以是任何类型的值,例如 String、Integer 等)。
分数:每个元素有一个关联的分数(score),它是一个 double 类型,用来排序有序集合中的元素。
查询当前用户的当前赛季积分和排名
// 获取当前赛季积分
Double score = redisTemplate.opsForZSet().score(key, userId.toString());
// 获取当前赛季排名,zset默认是按score升序排名,rank升序排名,reverseRank即按照score降序排名
Long rank = redisTemplate.opsForZSet().reverseRank(key, userId.toString()) + 1;
查询积分榜
要从DB中查,因为Redis只有当月的。同时持久化到DB还没没做
积分赛季表 服务实现类
public class PointsBoardSeasonServiceImpl extends ServiceImpl<PointsBoardSeasonMapper, PointsBoardSeason> implements IPointsBoardSeasonService {private final PointsBoardSeasonMapper pointsBoardSeasonMapper;/*** 查询历史赛季列表*/@Overridepublic List<PointsBoardSeason> getHistorySeasonList() {List<PointsBoardSeason> seasonList = this.lambdaQuery().list();if (CollUtil.isEmpty(seasonList)) { // 判空throw new BizIllegalException("查询历史赛季列表失败");}return seasonList;}/*** 创建上赛季表** @param tableName 上赛季表名*/@Overridepublic void createPointsBoardTableOfLastSeason(String tableName) {pointsBoardSeasonMapper.createPointsBoardTableOfLastSeason(tableName);}
}
<insert id="createPointsBoardTableOfLastSeason" parameterType="java.lang.String">CREATE TABLE `${tableName}`(`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT '榜单id',`user_id` BIGINT NOT NULL COMMENT '学生id',`points` INT NOT NULL COMMENT '积分值',PRIMARY KEY (`id`) USING BTREE,INDEX `idx_user_id` (`user_id`) USING BTREE)COMMENT ='学霸天梯榜分表'COLLATE = 'utf8mb4_0900_ai_ci'ENGINE = InnoDBROW_FORMAT = DYNAMIC
</insert>
${tableName}传的是public static final String POINTS_BOARD_TABLE_PREFIX = “points_board_”+赛季ID,下面是积分赛季表
榜单持久化
MybatisPlus实现动态表名
动态表名是:"point_board" + 赛季ID
如何获取:1. ThreadLocal存储动态表名
按照赛季ID分表,保存的表是没有rank字段的
public class TableInfoContext {private static final ThreadLocal<String> TL = new ThreadLocal<>();public static void setInfo(String info) {TL.set(info);}public static String getInfo() {return TL.get();}public static void remove() {TL.remove();}
}
- 声明动态表名拦截器,使用拦截器插件
package com.tianji.learning.config;import com.baomidou.mybatisplus.extension.plugins.handler.TableNameHandler;
import com.baomidou.mybatisplus.extension.plugins.inner.DynamicTableNameInnerInterceptor;
import com.tianji.learning.util.TableInfoContext;@Configuration//配置类
public class MybatisConfiguration {// 声明动态表名拦截器,使用拦截器插件@Beanpublic DynamicTableNameInnerInterceptor dynamicTableNameInnerInterceptor() {// 准备一个Map,用于存储TableNameHandlerMap<String, TableNameHandler> map = new HashMap<>(1);// 存入一个TableNameHandler,用来替换points_board表名称// 替换方式,就是从TableInfoContext中读取保存好的动态表名,判空是提高代码健壮性map.put("points_board", (sql, tableName) -> TableInfoContext.getInfo() == null ? tableName : TableInfoContext.getInfo());map.put("points_record", (sql, tableName) -> TableInfoContext.getInfo() == null ? tableName : TableInfoContext.getInfo());return new DynamicTableNameInnerInterceptor(map);}
}
在执行SQL语句时,根据TableInfoContext中存储的动态表名信息,决定是否替换原始表名。如果TableInfoContext中有动态表名,则使用动态表名;如果没有,则使用原始表名
通过@Bean注解将dynamicTableNameInnerInterceptor方法返回的DynamicTableNameInnerInterceptor实例注册为一个Spring Bean。这个拦截器会被MyBatis自动识别并加入到拦截器链中
4. 用的时候set get设置
定时任务持久化榜单到DB:
/*** 持久化上赛季排行榜数据到DB* XxlJob注解内容要和任务名称一致* 使用XxlJob实现任务分片*/@XxlJob("savePointsBoard2DB")public void savePointsBoard2DB() {log.debug("开始持久化上赛季排行榜数据到DB...");// 查询上赛季信息PointsBoardSeason boardSeason = getLastPointsBoardSeason();// 拼接表名并存入ThreadLocalString tableName = LearningConstants.POINTS_BOARD_TABLE_PREFIX + boardSeason.getId();TableInfoContext.setInfo(tableName);//存入ThreadLocal// 分页从redis查询上赛季榜单数据LocalDate time = LocalDate.now().minusMonths(1);DateTimeFormatter yearMonth = DateTimeFormatter.ofPattern("yyyyMM");String key = RedisConstants.POINTS_BOARD_KEY_PREFIX + time.format(yearMonth);// xxl-job分片广播int sharedIndex = XxlJobHelper.getShardIndex(); // 当前分片索引int shardTotal = XxlJobHelper.getShardTotal(); // 总分片数// 构建分页参数PointsBoardQuery pageQuery = new PointsBoardQuery();pageQuery.setPageNo(sharedIndex + 1); // 页码pageQuery.setPageSize(1000); // 页面记录数while (true) {List<PointsBoard> boards = pointsBoardService.queryCurrentBoardRankList(pageQuery, key);if (CollUtil.isEmpty(boards)) { // 结束循环break;}// 翻页,跳过N个页,N就是分片数量pageQuery.setPageNo(pageQuery.getPageNo() + shardTotal); // 页码+total,跳过N页// 字段处理,rank赋值给id并清空for (PointsBoard board : boards){board.setId(board.getRank().longValue()); // 历史赛季排行榜中id存了rank排名board.setRank(null); // 清空rank}// 持久化到db相应的赛季表中,批量新增pointsBoardService.saveBatch(boards);}// 清空ThreadLocal中的数据TableInfoContext.remove();log.debug("完成持久化上赛季排行榜数据到DB...");}
public class PointsBoardPersistentHandler {private final IPointsBoardSeasonService pointsBoardSeasonService;private final IPointsBoardService pointsBoardService;//注入都要写接口注入,不能拿impl实现类private final StringRedisTemplate redisTemplate;/*** 定时任务创建上赛季榜单表*/// @Scheduled(cron = "0 0 3 1 * ?") // 一月一个赛季,每月1号的凌晨3点// @Scheduled(cron = "0 43 15 27 3 ?") // 一月一个赛季,每月1号的凌晨3点@XxlJob("createPointsBoardTableJob")private void createPointsBoardTableOfLastSeason() {log.debug("创建上赛季榜单表开始执行....");PointsBoardSeason boardSeason = getLastPointsBoardSeason();// 创建上赛季榜单表,表名示例:points_board_7, 7为赛季idpointsBoardSeasonService.createPointsBoardTableOfLastSeason(LearningConstants.POINTS_BOARD_TABLE_PREFIX + boardSeason.getId());log.debug("创建上赛季榜单表成功 ....");}/*** 查询上赛季信息*/private PointsBoardSeason getLastPointsBoardSeason() {// 获取上个月的当前时间点LocalDate time = LocalDate.now().minusMonths(1);// 从赛季表查询对应赛季信息PointsBoardSeason boardSeason = pointsBoardSeasonService.lambdaQuery().le(PointsBoardSeason::getBeginTime, time).ge(PointsBoardSeason::getEndTime, time).one();return boardSeason == null ? new PointsBoardSeason() : boardSeason;}/*** 清除redis的历史榜单*/@XxlJob("clearPointsBoardFromRedis")public void clearPointsBoardFromRedis(){// 拼接keyLocalDate time = LocalDate.now().minusMonths(1);DateTimeFormatter yearMonth = DateTimeFormatter.ofPattern("yyyyMM");String key = RedisConstants.POINTS_BOARD_KEY_PREFIX + time.format(yearMonth);// 删除键redisTemplate.unlink(key);}
}
分页查询指定历史赛季积分和排名
private List<PointsBoard> queryHistoryBoardRankList(Long seasonId, PointsBoardQuery query) {String tableName = LearningConstants.POINTS_BOARD_TABLE_PREFIX + seasonId;TableInfoContext.setInfo(tableName); // 设置动态表名// 根据排名降序分页查询Page<PointsBoard> boardPage = this.lambdaQuery().select(PointsBoard::getId, PointsBoard::getPoints, PointsBoard::getUserId) // 设置查询字段,id,user_id和points.page(query.toMpPage("id", false));if (CollUtil.isEmpty(boardPage.getRecords())) {throw new BizIllegalException("查询历史赛季排行榜信息异常");}List<PointsBoard> boardList = boardPage.getRecords().stream().map(board -> {// 排名rank暂存在了id字段,board.setRank(board.getId() == null ? 0 : board.getId().intValue());board.setId(null);return board;}).collect(Collectors.toList());TableInfoContext.remove(); // 移除动态表名return boardList;}
假如有数百万用户,这就意味着每个赛季榜单都有数百万数据。随着时间推移,历史赛季越来越多,如果全部保存到一张表中,数据量会非常恐怖!
该怎么办呢?
海量数据存储策略
对于数据库的海量数据存储,方案有很多,常见的有:
- 分区(Partition)是一种数据存储方案,可以解决单表数据较多的问题
按照某种规则,把表数据对应的ibd文件拆分成多个文件来存储。从物理上来看,一张表的数据被拆到多个表文件存储了;从逻辑上来看,他们对外表现是一张表。但逻辑上还是一张表。增删改查的方式不会有什么变化,只不过底层MySQL底层的处理上会有变更。例如检索时可以只检索某个文件,而不是全部。
3. 分库
4.集群
历史榜单的存储策略
天机学堂项目是一个教育类项目,用户规模并不会很高,一般在十多万到百万级别。因此最终的数据规模也并不会非常庞大。
综合之前的分析,结合天机学堂的项目情况,我们可以对榜单数据做分表,但是暂时不需要做分库和集群。
由于我们要解决的是数据过多问题,因此分表的方式选择水平分表。具体来说,就是按照赛季拆分,每一个赛季是一个独立的表
XXL-Job分布式任务
目前,我们的定时任务都是基于SpringTask来实现的。但是SpringTask存在一些问题:
- 当微服务多实例部署时,定时任务会被执行多次。而事实上我们只需要这个任务被执行一次即可。
- 我们除了要定时创建表,还要定时持久化Redis数据到数据库,我们希望这多个定时任务能够按照顺序依次执行,SpringTask无法控制任务顺序
不仅仅是SpringTask,其它单机使用的定时任务工具,都无法实现像这种任务执行者的调度、任务执行顺序的编排、任务监控等功能。这些功能必须要用到分布式任务调度组件。
单机使用的定时任务在多实例部署的时候,每个启动的服务实例都会有自己的任务触发器,这样就会导致各个实例各自运行,无法统一控制。
把任务触发器提取到各个服务实例之外,去做统一的触发、统一的调度。
事实上,大多数的分布式任务调度组件都是这样做的:
XXL-JOB的运行原理和架构如图:
@ConfigurationProperties(prefix = "tj.xxl-job")
public class XxlJobProperties {private String accessToken;private Admin admin;private Executor executor;@Datapublic static class Admin {private String address;}@Datapublic static class Executor {private String appName;private String address;private String ip;private Integer port;private String logPath;private Integer logRetentionDays;}
}
@Configuration
@ConditionalOnClass(XxlJobSpringExecutor.class)
@EnableConfigurationProperties(XxlJobProperties.class)
public class XxlJobConfig {@Beanpublic XxlJobSpringExecutor xxlJobExecutor(XxlJobProperties prop) {log.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();XxlJobProperties.Admin admin = prop.getAdmin();if (admin != null && StringUtils.isNotEmpty(admin.getAddress())) {xxlJobSpringExecutor.setAdminAddresses(admin.getAddress());}XxlJobProperties.Executor executor = prop.getExecutor();if (executor != null) {if (executor.getAppName() != null)xxlJobSpringExecutor.setAppname(executor.getAppName());if (executor.getIp() != null)xxlJobSpringExecutor.setIp(executor.getIp());if (executor.getPort() != null)xxlJobSpringExecutor.setPort(executor.getPort());if (executor.getLogPath() != null)xxlJobSpringExecutor.setLogPath(executor.getLogPath());if (executor.getLogRetentionDays() != null)xxlJobSpringExecutor.setLogRetentionDays(executor.getLogRetentionDays());}if (prop.getAccessToken() != null)xxlJobSpringExecutor.setAccessToken(prop.getAccessToken());log.info(">>>>>>>>>>> xxl-job config end.");return xxlJobSpringExecutor;}
}
appname: 和服务名称一致
handler那个和注解里的一致
路由策略:就是指如果有多个任务执行器,该由谁执行?
路由策略说明:
ROUND(轮询):在线的执行器按照轮询策略选择一个执行【轮着执行】
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有执行器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务
XXL-JOB任务分片
通过while死循环,不停的查询数据,直到把所有数据都持久化为止。这样如果数据量达到数百万,交给一个任务执行器来处理会耗费非常多时间。
因此,将来肯定会将学习服务多实例部署,这样就会有多个执行器并行执行。但是,如果交给多个任务执行器,大家执行相同代码,都从第1页逐页处理数据,又会出现重复处理的情况。
怎么办?
这就要用到任务分片的方案了。
怎样才能确保任务不重复呢?我们可以参考扑克牌发牌的原理:
- 逐一给每个人发牌
- 发完一圈后,再回头给第一个人发
- 重复上述动作,直到牌发完为止
要想知道每一个执行器执行哪些页数据,只要弄清楚两个关键参数即可: - 起始页码:pageNo
- 下一页的跨度:step
而这两个参数是有规律的:
- 起始页码:执行器编号是多少,起始页码就是多少
- 页跨度:执行器有几个,跨度就是多少。也就是说你要跳过别人读取过的页码
int sharedIndex = XxlJobHelper.getShardIndex(); // 起始页码:当前分片索引
int shardTotal = XxlJobHelper.getShardTotal(); // 跨度:总分片数
// 构建分页参数
PointsBoardQuery pageQuery = new PointsBoardQuery();
pageQuery.setPageNo(sharedIndex + 1); // 页码(从1开始)
pageQuery.setPageSize(1000); // 页面记录数while (true) {List<PointsBoard> boards = pointsBoardService.queryCurrentBoardRankList(pageQuery, key);if (CollUtil.isEmpty(boards)) { // 结束循环break;}// 翻页,跳过N个页,N就是分片数量pageQuery.setPageNo(pageQuery.getPageNo() + shardTotal); // 页码+total,跳过N页// 字段处理,rank赋值给id并清空for (PointsBoard board : boards){board.setId(board.getRank().longValue()); // 历史赛季排行榜中id存了rank排名board.setRank(null); // 清空rank}// 持久化到db相应的赛季表中,批量新增pointsBoardService.saveBatch(boards);
}
这里是 每个实例:端口不一样 实现的
分片广播:实现横向加机器 解决 如一亿个用户计算余额宝的余额太耗时的问题
面试 场景:用户特别大的情况,榜单特别大,
Redis del和unlink的区别
xxl-job子任务–> 任务链
和分片一起用,非常可能出问题【分片 其中一个实例执行完就会执行子任务,可能清除掉还未持久化的redis数据】所以还是别用了。每个Job设置执行时间,谁先谁后就好了
要想让任务A、B依次执行,其实就是配置任务B作为任务A的子任务。因此,我们按照下面方式配置:
- 创建历史榜单表(10)的子任务是持久化榜单数据任务(12)
- 持久化榜单数据任务(12)的子任务是清理Redis中的历史榜单(13)
也就是说:10的子任务是12, 12的子任务是13