redis的key设计规范
推荐规范: 业务前缀+数据名称+唯一id
比如表示文章点赞的用户集合: blog:like:${blogId}
刷新token有效期(拦截器实现)
使用双重拦截器解耦登录鉴权拦截和刷新有效期
RefreshTokenInterceptor: 拦截所有请求 只负责token续期 没有token则放行
@Component
public class RefreshTokenInterceptor implements HandlerInterceptor {@Resourceprivate RedisTemplate redisTemplate;@Resourceprivate JwtUtils jwtUtils;@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {String token = request.getHeader("Authorization");if (StrUtil.isEmpty(token)) {return true; // 没有token则放行}boolean hasToken = redisTemplate.hasKey(SystemConstants.LOGIN_USERINFO_PREFIX + token);DecodedJWT decodedJWT = jwtUtils.verifyToken(token);if (!hasToken || null == decodedJWT) {return true; // 缓存无或者token校验失败也放行}// 解析token获取用户数据 刷新tokenMap<String, Claim> claims = decodedJWT.getClaims();HashMap<String, Object> userDtoMap = new HashMap<>();userDtoMap.put("id", claims.get("id").asLong()); // redis的hash只能存string类型userDtoMap.put("nickName", claims.get("nickName").asString());userDtoMap.put("icon", claims.get("icon").asString());UserDTO userDTO = BeanUtil.toBean(userDtoMap, UserDTO.class);UserHolder.saveUser(userDTO); // 用户信息存入Threadlocalreturn true;}/*** 后置拦截器 销毁用户信息** @param request* @param response* @param handler* @param ex* @throws Exception*/@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {UserHolder.removeUser();}
}
LoginInterceptor: 拦截需要授权的接口 只负责鉴权
@Slf4j
@Component
public class LoginInterceptor implements HandlerInterceptor {private void respondUnauthorized(HttpServletResponse resp) throws IOException {ObjectMapper om = new ObjectMapper();resp.setContentType("application/json;charset=utf-8");resp.setStatus(HttpStatus.HTTP_UNAUTHORIZED);resp.getWriter().write(om.writeValueAsString(Result.fail("未授权")));}/*** 前置拦截器 进入controller前判断用户是否登录*/@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {if (null == UserHolder.getUser()) {// 返回未授权 拦截不往下执行respondUnauthorized(response);return false;}return true;}}
RefreshTokenInterceptor拦截器要先执行 所以使用Order来注册拦截器执行顺序 order越小越先执行:
WebMvcConfigure:
@Configuration
public class WebMvcConfigure implements WebMvcConfigurer {@Resourceprivate RefreshTokenInterceptor refreshTokenInterceptor;@Resourceprivate LoginInterceptor loginInterceptor;private static final List<String> EXCLUDE_PATHS = Arrays.asList("/v2/api-docs", // api docs"/doc.html","/webjars/**","/v3/**","/swagger-resources/**","/user/code","/user/login");/*** 添加拦截器** @param registry*/public void addInterceptors(InterceptorRegistry registry) {// order越小优先执行registry.addInterceptor(refreshTokenInterceptor).excludePathPatterns(EXCLUDE_PATHS).order(0);// 默认是所有路径 直接排除不拦截的即可registry.addInterceptor(loginInterceptor).excludePathPatterns(EXCLUDE_PATHS).order(1);}}
缓存
数据库和缓存双写一致
如果数据库更新 缓存就是旧数据 要将数据库的数据回写到缓存 这个过程称之为双写
双写一致就是指:
更新端: 先获取锁, 更新数据库 再删除缓存 释放锁
读取端: 先读取缓存 缓存miss查数据库回填缓存 缓存命中则直接返回
锁操作工具类:
@Component
public class RedisUtils {@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 尝试获取锁* setnx命令能设置成功则获取到锁了 设置不成功则获取失败 10秒后自动释放锁* @param key* @return*/public boolean tryLock(String key) {Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(lock);}/*** 释放锁* key不存在setnx该key才能成功 删除掉key即为释放锁* @param key*/public void unlock(String key) {stringRedisTemplate.delete(key);}}
缓存穿透
什么是缓存穿透
缓存穿透是指缓存失效和数据库也没有数据导致大量请求直达数据库导致服务数据库不可用
解决方法
- 缓存兜底空字符串
- 布隆过滤器
缓存血崩
什么是缓存血崩
缓存击穿是指同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方法
- 给不同key的TTL添加随机值 错峰过期
- 多级缓存
缓存击穿
什么是缓存击穿
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
解决方法
- 互斥锁
- 逻辑过期
互斥锁实现
private Shop getShopByMutexLock(Long id) {// search cacheString shopJson = stringRedisTemplate.opsForValue().get(SHOP_PREFIX + id);if (StrUtil.isNotBlank(shopJson)) {log.info("走缓存了");return JSONUtil.toBean(shopJson, Shop.class);}// no data if cache is empty string, return null as resultsif (Objects.nonNull(shopJson)) {log.info("空字符串");return null;}// no cache when nullString lockKey = "lock:shop:" + id;Shop shop;try {// try to hold the mutex lockboolean lock = redisUtils.tryLock(lockKey);if (!lock) {log.info("未拿到锁重试");// no mutex lock, retry laterThread.sleep(50);return findShopById(id);}// data is expired and then rebuild cache if try holding mutex lock successful// search dbshop = shopMapper.findById(id);Thread.sleep(200); // simulate longer build time// if db exists data and write it, otherwise write empty stringString value = Objects.nonNull(shop) ? JSONUtil.toJsonStr(shop) : "";stringRedisTemplate.opsForValue().set(SHOP_PREFIX + id, value, Duration.ofMinutes(30));log.info("finish cache: {}", value);} catch (Exception e) {throw new BusinessException(e.getMessage());} finally {redisUtils.unlock(lockKey); // release mutex lock}return shop;}
逻辑过期实现
public void saveShop2Redis(Long id, Long expireSeconds) {Shop shop = shopMapper.findById(id);RedisData redisData = new RedisData();redisData.setData(shop);redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));stringRedisTemplate.opsForValue().set(SHOP_PREFIX + id, JSONUtil.toJsonStr(redisData));}public Shop findShopByLogicalExpired(Long id) {String shopJson = stringRedisTemplate.opsForValue().get("shop:" + id);// directly return null if the cache is empty stringif (StrUtil.isBlank(shopJson)) {return null;}RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);LocalDateTime expireTime = redisData.getExpireTime();if (expireTime.isAfter(LocalDateTime.now())) {// return data if validJSONObject data = (JSONObject) redisData.getData();return JSONUtil.toBean(data, Shop.class);}// data is expired, then rebuild the cache asynchronously if try holding mutex lock successfulboolean lock = redisUtils.tryLock("lock:shop:" + id);if (lock) {threadPoolExecutor.submit(() -> {try {saveShop2Redis(id, 20L);} catch (RuntimeException e) {throw new BusinessException(e.getMessage());} finally {redisUtils.unlock("lock:shop:" + id); // release the lock}});}return null;}
优惠券秒杀
Redis生成全局唯一ID
场景分析
在集群环境下时 数据库都是单独的 自增ID会冲突 ID的规律性太明显 考虑以下场景:
- 如果ID规律性太明显 容易被竞品看出一天卖了多少单 销量暴露 这是不好的
- 随着项目规模变大 myslq单表数据量不宜超过500W 数据量太大就要进行分库分表 分表后逻辑上还是同一张表 但ID不能相同 这种情况要保证ID唯一性
全局唯一ID特性
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性
结合redis的icr自增数再拼接时间戳+日期生成一个64位的二进制即可满足安全性:
三个组成部分
1. 符号位: 1bit ID是正数所以永远为0
2. 时间戳: 31bit 以秒为单位 可用69年
3. 序列号: 32bit 秒内的计数器 每次自增1 每秒就能产生2^32次方个不同ID
IdWorker实现
@Component
public class IdWorker implements Serializable {private static final long serialVersionUID = -3644487519624710927L;private final int BIT_NUM = 32; // moved number of bitsprivate final long TIMESTAMP = 1742465369998L; // initial milliseconds@Resourceprivate RedisTemplate redisTemplate;public long nextUUID(String prefix) {LocalDateTime now = LocalDateTime.now();long nowMillis = now.toInstant(ZoneOffset.of("+8")).toEpochMilli();// get current millisecondslong timestamp = nowMillis - TIMESTAMP; // calculate the timestamp difference from the initial timestamp to nowString dateStr = DateTimeFormatter.ofPattern("yyyy:MM:dd").format(now); // get current formatted date stringlong serialNo = redisTemplate.opsForValue().increment("icr:" + prefix + ":" + dateStr); // use the combinations of prefix and current date string to work out the serial numberreturn timestamp << BIT_NUM | serialNo; // move the timestamp to high bit and pad the serial number to low bit to return a global unique id}
}
使用业务标识和日期来做为icr前缀 这样就可以在redis里看到每天有多少订单量了
💡 小贴士 代码中涉及的位运算原理是左移+按位或 左移是为了腾出低位的bit给序列号 按位或是将序列号填充到低位 原理是左移后的低位都补0 按位或是只要序列号为1的位运算结果就是1 都为0就是0 这就实现了填充序列号的效果 组合两者得到的结果数就是时间戳的低位+序列号的低位
CountDownLatch类
使用场景
信号枪顾名思义就是发射一个信号 用于异步的多线程任务并行时主线程想等待所有子线程任务都执行完在往下执行
两个核心方法
1. countDown: 当某个线程任务完成时调用标识着一个任务的完成
2. await: 阻塞当前线程 等待所有线程都调用countDown后恢复
统计任务执行时长案例
@Test
void testIdWorker() throws InterruptedException {CountDownLatch latch = new CountDownLatch(300); // 初始化300个任务待完成Runnable task = () -> {for (int i = 0; i < 100; i++) {long id = redisIdWorker.nextId("order");System.out.println("id = " + id);}latch.countDown();// 每执行完调用countDown() 总任务数-1};long begin = System.currentTimeMillis();for (int i = 0; i < 300; i++) {es.submit(task); // 异步提交任务}latch.await(); // 等待全部执行完long end = System.currentTimeMillis();System.out.println("time = " + (end - begin));
}
秒杀核心逻辑
业务流程
- 判断优惠券id合理性 是否存在
- 判断秒杀时间是否开始
- 判断库存是否充足
- 如果条件都通过 生成订单ID 发送下单消息和userId,voucherId,orderId到队列
- 消费端开启一个线程轮训消费下单消息:
- 判断订单id订单是否已创建 避免重复处理消息
- 判断库存避免超卖
- 创建用户优惠券订单
- 扣减库存
- 消息处理失败处理pendinglist
- 消息处理成功确认回执ACK
秒杀接口实现
生产端
- 生成订单ID
- 调用lua脚本执行秒杀 保证原子性
- 成功则返回订单ID
- 失败则抛出错误
生产消息到stream队列:
/*** seckill asynchronously** @param voucherId* @return*/
@Override
public long sekillVoucherAsynchronously(long voucherId) {Long userId = UserHolder.getUser().getId();long orderId = idWorker.nextUUID("seckill:orders");// check the stocks and ordersLong result = stringRedisTemplate.execute(REDIS_SCRIPT, Collections.emptyList(), String.valueOf(voucherId), userId.toString(), String.valueOf(orderId));// seckill failure if not 0if (!result.equals(0L)) {throw new BusinessException(errMsg.get(result.intValue()));}// otherwise return the orderId directlyreturn orderId;
}
秒杀的lua脚本:
-- 优惠券秒杀脚本
--[[解析execute传进来的参数:优惠券id: 用于记录优惠券的库存key用户id: 用于记录优惠券的所有购买人 防止同一用户重复下单订单id: 用于生产下单的消息到Stream消息队列实现异步下单
]]
-- 优惠券id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local ordersId = ARGV[3]
-- 秒杀业务前缀
local BIZ_PREFIX = 'seckill:voucher:'
-- 优惠券库存key
local voucherDetailsKey = BIZ_PREFIX .. 'details:' .. voucherId
-- 优惠券订单的用户集合key
local voucherOrdersKey = BIZ_PREFIX .. 'orders:' .. voucherIdif(redis.call('hexists', voucherDetailsKey, 'stocks') == 0) thenreturn 1
end
local time = redis.call('time') -- 得到一个长度为2的数组 第一个元素是当前的秒数 第二个是当前的微秒数
local nowMills = time[1] * 1000 + math.floor(time[2] / 1000) -- 转为毫秒
if (nowMills < tonumber(redis.call('hget', voucherDetailsKey, 'beginTime')) or nowMills > tonumber(redis.call('hget', voucherDetailsKey, 'endTime'))) thenreturn 2;
end
-- 超卖检测 因为redis存的值都是字符串 所以要转数字比较大小
if (tonumber(redis.call('hget', voucherDetailsKey, 'stocks')) < 1) thenreturn 3;
end
-- 重复下单检测
if (redis.call('sismember', voucherOrdersKey, userId) == 1) thenreturn 4;
end
--if (voucher.)
-- 库存和重复下单都没有问题则扣库存加入用户列表
redis.call('hincrby', voucherDetailsKey, 'stocks', -1) -- incrby 是增加值, 值为-1表示减
redis.call('sadd', voucherOrdersKey, userId)
-- 生产下单消息到消息队列 xadd 消息队列名 消息唯一ID(*代表让redis自动) field, value [,field, value] [...] 每个field, value都是一个entry
redis.call('xadd', BIZ_PREFIX .. 'streams.orders', '*', 'userId', userId, 'voucherId', voucherId, 'orderId', ordersId)
return 0 -- 返回0表示秒杀成功
💡 lua踩坑 在redis环境下的lua屏蔽了很多全局对象 生成时间戳这种只能用redis提供的time命令
消费端
- 开启线程轮训处理消息
- 校验订单和库存避免重复消费
- 创建订单扣减库存
- 处理异常则处理PendingList的消息 确保消息都被消费
Stream的消费者组要提前定义好 否则消费端监听会报错 创建一个从尾部开始消费的消费者组命令:
XGROUP CREATE key groupname $ MKSTREAM
消费消息:
@Resourceprivate SeckillVoucherServiceImpl seckillVoucherService;@Resourceprivate IdWorker idWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static DefaultRedisScript<Long> REDIS_SCRIPT;@Resourceprivate ThreadPoolExecutor threadPoolExecutor;private static final Map<Integer, String> errMsg;static {errMsg = new HashMap<>(4);errMsg.put(1, "优惠券不存在");errMsg.put(2, "秒杀尚未开始");errMsg.put(3, "库存不足");errMsg.put(4, "请勿重复下单");}// initialize the seckill scriptstatic {REDIS_SCRIPT = new DefaultRedisScript<>();REDIS_SCRIPT.setLocation(new ClassPathResource("seckill.lua")); // load script from the resources directoryREDIS_SCRIPT.setResultType(Long.class); // set the scripts of return value type}@Resourceprivate VoucherOrderMapper voucherOrderMapper;@Resourceprivate TransactionTemplate transactionTemplate;/*** create orders job handler*/private class VoucherOrdersHandler implements Runnable {private static final String REDIS_QUEUE_NAME = "seckill:voucher:streams.orders";private void handlePendingList() {int retries = 3;while (retries-- > 0) { // loop readtry {// there is equal to "execute the XREADGROUP GROUP g1 c1 COUNT 1 STREAMS seckill:voucher:streams.orders 0" commandsList<MapRecord<String, Object, Object>> mapRecords = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), // specify the group and customerStreamReadOptions.empty().count(1), // specify the number of messagesStreamOffset.create(REDIS_QUEUE_NAME, ReadOffset.from("0")) // specify the streams which the messages read from);if (Objects.isNull(mapRecords) || mapRecords.isEmpty()) {// messages pending queue is empty, end the processbreak;}MapRecord<String, Object, Object> record = mapRecords.get(0); // get a id:entries mapMap<Object, Object> entries = record.getValue();// get the entries mapMap<String, String> fieldMappings = new HashMap<>();fieldMappings.put("orderId", "id");VoucherOrder voucherOrders = BeanUtil.fillBeanWithMap(entries, new VoucherOrder(), CopyOptions.create().ignoreError().setFieldMapping(fieldMappings));long exists = voucherOrderMapper.exists(voucherOrders);if (exists > 0) {log.info("订单{}已存在 不再处理", voucherOrders.getId());// 直接回ACK标识已处理stringRedisTemplate.opsForStream().acknowledge(REDIS_QUEUE_NAME, "g1", record.getId());continue;}boolean success = createVoucherOrdersAndDeductVoucherStocks(voucherOrders);if (success) {stringRedisTemplate.opsForStream().acknowledge(REDIS_QUEUE_NAME, "g1", record.getId()); // acknowledge the successreturn;}} catch (Exception e) {log.info("handle pending-list seckill voucher orders exception: {}", e);try {Thread.sleep(20); // re-read in 20ms later} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}log.info("handle pending-list seckill voucher orders failure");}@Overridepublic void run() {while (true) { // loop readtry {// there is equal to "execute the XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS seckill:voucher:streams.orders >" commandsList<MapRecord<String, Object, Object>> mapRecords = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), // specify the group and customerStreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), // specify the number of messages and the blocking timeoutStreamOffset.create(REDIS_QUEUE_NAME, ReadOffset.lastConsumed()) // specify the streams which the messages read from);if (Objects.isNull(mapRecords) || mapRecords.isEmpty()) {// messages queue is empty, proceed to next readingcontinue;}MapRecord<String, Object, Object> record = mapRecords.get(0); // get a id:entries mapMap<Object, Object> entries = record.getValue();// get the entries mapMap<String, String> fieldMappings = new HashMap<>();fieldMappings.put("orderId", "id");VoucherOrder voucherOrders = BeanUtil.fillBeanWithMap(entries, new VoucherOrder(), CopyOptions.create().ignoreError().setFieldMapping(fieldMappings).setFieldValueEditor((k, v) -> v.toString()));boolean success = createVoucherOrdersAndDeductVoucherStocks(voucherOrders);if (success) {stringRedisTemplate.opsForStream().acknowledge(REDIS_QUEUE_NAME, "g1", record.getId()); // acknowledge the success}} catch (Exception e) {log.info("handle seckill voucher orders exception: {}", e);handlePendingList();}}}public boolean createVoucherOrdersAndDeductVoucherStocks(VoucherOrder voucherOrders) {try {Boolean executed = transactionTemplate.execute((transactionStatus) -> {long ordersInserted = voucherOrderMapper.insert(voucherOrders);long stocksUpdated = seckillVoucherService.deductStocks(voucherOrders);if ((ordersInserted < 1 || stocksUpdated < 1) && !transactionStatus.isCompleted()) {transactionStatus.setRollbackOnly();throw new BusinessException("订单创建异常");}return ordersInserted > 0 && stocksUpdated > 0;});return Boolean.TRUE.equals(executed);} catch (Exception e) {log.info("下单扣减库存异常: {}", e);throw new BusinessException("订单创建异常");}}}@SneakyThrows@PostConstructpublic void initJob() {threadPoolExecutor.submit(new VoucherOrdersHandler());}
💡 事务控制在同类里调用声明式事务标注的方法要通过代理对象调用才会生效 颗粒度细的要缩小事务控制范围用TransactionTemplate实现编程式事务
用户点赞
业务流程
- 用户对一片帖子只能点赞一次 再点就是取消点赞
- 帖子显示点赞前五个用户 先点赞的排前面
实现思路:
sortedset可以满足排序 用文章的id做为key: blog:like:${blogId} 用户id为value 时间戳为分数存储
集合的操作
- 点赞: 将用户id存入集合 同步文章点赞数
- 取消: 将用户id移出集合 同步文章点赞数
- 点赞数量: 集合长度
操作后发消息异步回刷db
点赞表结构
CREATE TABLE `tb_user_blog_like` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',`blog_id` bigint NOT NULL COMMENT '帖子id',`user_id` bigint NOT NULL COMMENT '用户id',`created_at` timestamp NOT NULL COMMENT '创建时间',`updated_at` timestamp NOT NULL COMMENT '更新时间',`status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '1点赞0未点赞',PRIMARY KEY (`id`),KEY `idx_user_id_blog_id` (`user_id`,`blog_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
代码实现
生产端
/*** like posts** @param blogId*/@Overridepublic void like(Long blogId) {Long userId = UserHolder.getUser().getId();DefaultRedisScript<Object> redisScript = new DefaultRedisScript<>("" +"local blogId = ARGV[1];" +"local userId = ARGV[2];" +"local blogKey = KEYS[1] .. blogId;" +"local messageQueueKey = KEYS[2];" +"local score = redis.call('zscore', blogKey, userId);" +"if(score == false) then " +"local time = redis.call('time');" +"local nowMills = time[1] * 1000 + math.floor(time[2] / 1000);" +"redis.call('zadd', blogKey, nowMills, userId);" +"else " +"redis.call('zrem', blogKey, userId); " +"end; " +"redis.call('xadd', messageQueueKey, '*', 'userId', userId, 'blogId', blogId);" +"");stringRedisTemplate.execute(redisScript, Arrays.asList(BLOG_LIKED, BLOG_USER_STREAMS_LIKE), blogId.toString(), userId.toString());}
消费端
@SneakyThrows@PostConstructpublic void initJob() {threadPoolExecutor.submit(new BlogLikeJob());}private class BlogLikeJob implements Runnable {private boolean updateLike(Long blogId, Long userId) {RLock rLock = redissonClient.getLock("lock:blog:update:like:" + blogId);try {boolean tryLock = rLock.tryLock(2L, TimeUnit.SECONDS);if (tryLock) {UserBlogLike userBlogLike = new UserBlogLike();userBlogLike.setUserId(userId);userBlogLike.setBlogId(blogId);Integer status = userBlogLikeService.exist(userBlogLike);int delta = Objects.nonNull(status) && status == 1 ? -1 : 1;Boolean success = transactionTemplate.execute(transactionStatus -> {long updated = blogMapper.updateLike(blogId, delta);long toggleLike;if (Objects.isNull(status)) {toggleLike = userBlogLikeService.insert(userBlogLike);} else {userBlogLike.setStatus(status == 1 ? 0 : 1);toggleLike = userBlogLikeService.update(userBlogLike);}if (updated < 1 || toggleLike < 1) {if (!transactionStatus.isCompleted()) {transactionStatus.setRollbackOnly();}}return updated > 1 && toggleLike > 1;});return Boolean.TRUE.equals(success);}return false;} catch (Exception e) {throw new RuntimeException(e);} finally {if (rLock.isHeldByCurrentThread() && rLock.isLocked()) {rLock.unlock();}}}@Overridepublic void run() {while (true) {try {List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(BLOG_USER_STREAMS_LIKE, ReadOffset.lastConsumed()));if (Objects.isNull(records) || records.isEmpty()) {continue;}MapRecord<String, Object, Object> record = records.get(0);Map<Object, Object> entries = record.getValue();Long blogId = Long.valueOf((String) entries.get("blogId"));Long userId = Long.valueOf(entries.get("userId").toString());log.info("消费点赞数据: blogId: {}, userId: {}", blogId, userId);boolean success = this.updateLike(blogId, userId);if (success) {stringRedisTemplate.opsForStream().acknowledge(BLOG_USER_STREAMS_LIKE, "g1", record.getId());}} catch (Exception e) {log.info("handle update blog like exception: {}", e);handlePendingList();}}}private void handlePendingList() {int retries = 5;while (retries-- > 0) { // loop readtry {// there is equal to "execute the XREADGROUP GROUP g1 c1 COUNT 1 STREAMS seckill:voucher:streams.orders 0" commandsList<MapRecord<String, Object, Object>> mapRecords = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"), // specify the group and customerStreamReadOptions.empty().count(1), // specify the number of messagesStreamOffset.create(BLOG_USER_STREAMS_LIKE, ReadOffset.from("0")) // specify the streams which the messages read from, here are read from historical records);if (Objects.isNull(mapRecords) || mapRecords.isEmpty()) {// messages pending queue is empty, end the processbreak;}MapRecord<String, Object, Object> record = mapRecords.get(0); // get a id:entries mapMap<Object, Object> entries = record.getValue();// get the entries mapLong blogId = Long.valueOf(entries.get("blogId").toString());Long userId = Long.valueOf(entries.get("userId").toString());log.info("消费点赞数据: blogId: {}, userId: {}", blogId, userId);boolean success = this.updateLike(blogId, userId);if (success) {stringRedisTemplate.opsForStream().acknowledge(BLOG_USER_STREAMS_LIKE, "g1", record.getId()); // acknowledge the success}} catch (Exception e) {log.info("handle pending-list update blog like exception: {}", e);try {Thread.sleep(20); // re-read in 20ms later} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}log.info("handle pending-list update blog like failure");}}public boolean isLike(String userId, String blogId) {return Optional.ofNullable(stringRedisTemplate.opsForZSet().score(BLOG_LIKED + blogId, userId)).orElse(Double.valueOf(0)) > 0;}public int getLiked(String blogId) {return stringRedisTemplate.opsForZSet().count(BLOG_LIKED + blogId, 0, System.currentTimeMillis()).intValue();}
TOP5点赞
/*** 查询点赞用户Top5** @param id* @return*/@Overridepublic List<UserDTO> queryLikedUsers(Long id) {String key = BLOG_LIKED + id;Set<String> ids = stringRedisTemplate.opsForZSet().range(key, 0, 4);if (Objects.isNull(ids) || ids.isEmpty()) {return Collections.emptyList();}return userService.findByIds(ids.stream().collect(Collectors.toList()));}
用户关注
业务流程
- 用户能关注另一个用户和取消关注
- 用户发布一篇帖子所有的粉丝都能看到最新的发帖
- 用户关注的人显示在关注列表 最新关注的在最前面
- 用户能在用户主页里看到用户和自己的共同关注
因为涉及到排序所以依然用sortedset key为user:follow: u s e r I d v a l u e 为 关 注 这 个 用 户 的 所 有 u s e r i d 集 合 分 数 为 时 间 戳 关 注 的 用 户 最 新 动 态 设 计 用 s o r t e d s e t 给 每 个 用 户 分 配 一 个 收 件 箱 b l o g : f o l l o w U s e r P o s t : {userId} value为关注这个用户的所有userid集合 分数为时间戳 关注的用户最新动态设计 用sortedset给每个用户分配一个收件箱 blog:followUserPost: userIdvalue为关注这个用户的所有userid集合分数为时间戳关注的用户最新动态设计用sortedset给每个用户分配一个收件箱blog:followUserPost:{userId} 把发布的帖子id都放进去 时间戳做为分数
集合的操作
- 关注用户 将被关注用户的id存入当前用户的集合里
- 取消关注 从当前用户集合中删除目标用户id
- 关注的人最新动态 帖子发布接口里将帖子id放入用户的收件箱
- 共同关注取两个用户的关注交集
关注和取关发消息异步回刷db
关注表设计
CREATE TABLE `tb_follow` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',`user_id` bigint unsigned NOT NULL COMMENT '用户id',`follow_user_id` bigint unsigned NOT NULL COMMENT '关联的用户id',`status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '1关注0未关注',`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`updated_at` timestamp NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE,KEY `idx_user_id_follow_user_id` (`user_id`,`follow_user_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=COMPACT
代码实现
生产端
@Override
public void updateFollow(Long followUserId, Integer status) {DefaultRedisScript<Object> redisScript = new DefaultRedisScript<>("" +"local userId = ARGV[1];" +"local userFollowKey = KEYS[1] .. userId;" +"local followMessageQueueKey = KEYS[2];" +"local followUserId = ARGV[2];" +"local status = ARGV[3];" +"local score = redis.call('zscore', userFollowKey, followUserId);" +"if(status == '1' and score == false) then " +"local time = redis.call('time');" +"local nowMills = time[1] * 1000 + math.floor(time[2] / 1000);" +"redis.call('zadd', userFollowKey, nowMills, followUserId);" +"end; " +"if(status == '0' and score ~= false) then " +"redis.call('zrem', userFollowKey, followUserId);" +"end;" +"redis.call('xadd', followMessageQueueKey, '*', 'userId', userId, 'followUserId', followUserId, 'status', status); " +"");stringRedisTemplate.execute(redisScript, Arrays.asList(USER_FOLLOWS, USER_STREAM_FOLLOW), UserHolder.getUser().getId().toString(), followUserId.toString(), status.toString());
}
@Override
public int isFollow(Long userId) {Double exists = stringRedisTemplate.opsForZSet().score(USER_FOLLOWS + UserHolder.getUser().getId().toString(), userId.toString());return Optional.ofNullable(exists).orElse(Double.valueOf(0)) > 0 ? 1 : 0;
}
共同关注
@Override
public List<UserDTO> commonFollowFromCache(Long followedUserId) {String followUserKey = USER_FOLLOWS + followedUserId;Set<String> followIds = stringRedisTemplate.opsForZSet().reverseRange(followUserKey, 0, -1);if (Objects.isNull(followIds) || followIds.isEmpty()) {return Collections.emptyList();}String currentUserKey = USER_FOLLOWS + UserHolder.getUser().getId();Set<String> currentIds = stringRedisTemplate.opsForZSet().reverseRange(currentUserKey, 0, -1);List<String> interactionIds = interact(followIds.stream().collect(Collectors.toList()), currentIds.stream().collect(Collectors.toList()));return userService.findByIds(interactionIds);
}public<T> List<T> interact(List<T> list, List<T> other) {List<T> largerList = list.size() == other.size() ? list : (list.size() > other.size() ? list : other);List<T> smallerList = list.size() == other.size() ? other : (list.size() < other.size() ? list : other);int i = 0;int index = 0;while (index < smallerList.size()) {T currentId = smallerList.get(index);if (largerList.contains(currentId)) {smallerList.set(i, currentId);i++;}index++;}return smallerList.subList(0, i);
}
消费端
@Resource
private ThreadPoolExecutor threadPoolExecutor;@Resource
private RedissonClient redissonClient;@SneakyThrows
@PostConstruct
public void initJob() {threadPoolExecutor.submit(new UserFollowJob());
}private class UserFollowJob implements Runnable {@Overridepublic void run() {while (true) {try {List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(USER_STREAM_FOLLOW, ReadOffset.lastConsumed()));if (Objects.isNull(records) || records.isEmpty()) {continue;}MapRecord<String, Object, Object> record = records.get(0);Map<Object, Object> entries = record.getValue();Long userId = Long.valueOf(entries.get("userId").toString());Long followUserId = Long.valueOf((String) entries.get("followUserId"));Integer status = Integer.valueOf((String) entries.get("status"));log.info("消费关注数据: userId: {}, followUserId: {}, status: {}", userId, followUserId, status);boolean success = this.updateFollow(userId, followUserId, status);if (success) {stringRedisTemplate.opsForStream().acknowledge(USER_STREAM_FOLLOW, "g1", record.getId());}} catch (Exception e) {log.info("handle update user follow exception: {}", e);handlePendingList();}}}private boolean updateFollow(Long userId, Long followUserId, Integer status) {RLock lock = redissonClient.getLock("lock:user:update:follow:" + userId);try {boolean tryLock = lock.tryLock(2L, TimeUnit.SECONDS);if (tryLock) {Follow newOrUpdatedFollow = new Follow();newOrUpdatedFollow.setUserId(userId);newOrUpdatedFollow.setFollowUserId(followUserId);long exists = followMapper.exist(newOrUpdatedFollow);newOrUpdatedFollow.setStatus(status);if (exists > 0) {// 如果存在 更新statuslong updated = followMapper.updateFollow(newOrUpdatedFollow);return updated > 0;} else {// 不存在 新插入long inserted = followMapper.insertFollow(newOrUpdatedFollow);return inserted > 0;}}return false;} catch (Exception e) {throw new RuntimeException(e);} finally {if (lock.isHeldByCurrentThread() && lock.isLocked()) {lock.unlock();}}}private void handlePendingList() {int retries = 5;while (retries-- > 0) {try {List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(USER_STREAM_FOLLOW, ReadOffset.from("0")));if (Objects.isNull(records) || records.isEmpty()) {break;}MapRecord<String, Object, Object> record = records.get(0);Map<Object, Object> entries = record.getValue();Long userId = Long.valueOf(entries.get("userId").toString());Long followUserId = Long.valueOf((String) entries.get("followUserId"));Integer status = Integer.valueOf((String) entries.get("status"));log.info("消费关注数据: userId: {}, followUserId: {}, status: {}", userId, followUserId, status);boolean success = this.updateFollow(userId, followUserId, status);if (success) {stringRedisTemplate.opsForStream().acknowledge(USER_STREAM_FOLLOW, "g1", record.getId());}} catch (Exception e) {log.info("handle update user follow exception: {}", e);try {Thread.sleep(20);} catch (InterruptedException ex) {throw new RuntimeException(ex);}}}log.info("handle update user follow failure");}
}
发贴推送粉丝
/*** 发布博客并广播到所有订阅者的收件箱** @param blog* @return*/
@Override
public long publish(Blog blog) {Long userId = UserHolder.getUser().getId();blog.setUserId(userId);long inserted = blogMapper.insertBlog(blog);if (inserted < 1) {throw new BusinessException("发布失败");}// 主动推到当前用户的订阅者的收件箱// select user_id from tb_follow where follow_user_id = ? and status = 1List<Long> followerIds = followService.findFollowerById(userId);for (Long followerId : followerIds) {stringRedisTemplate.opsForZSet().add(SUBSCRIBE_BLOG + followerId, blog.getId().toString(), System.currentTimeMillis());}return blog.getId();
}
粉丝端收件箱滚动分页
@Override
public ScrollResult<Blog> pullSubscribeBlogs(Long userId, Integer offset, Long lastTimestamp) {ScrollResult scrollResult = new ScrollResult();String key = SystemConstants.SUBSCRIBE_BLOG + userId;final int PAGE_SIZE = 5; // 一页查多少个// 获取降序的订阅消息 最新的在上面Set<ZSetOperations.TypedTuple<String>> subscribeBlogs = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, lastTimestamp, offset, PAGE_SIZE);// 没有则查到头了if (Objects.isNull(subscribeBlogs) || subscribeBlogs.isEmpty()) {scrollResult.setList(Collections.emptyList());return scrollResult;}long minTimestamp = 0; // 记录最小时间戳int cursor = 1; // 相同的最小的时间戳存在的帖子的个数,用来跳过相同分数避免重复数据查询 默认为1是因为查出来的最后一条就是下一次查的第一条 所以要告诉下一次要偏移1个去掉重复的List<String> ids = new ArrayList<>(subscribeBlogs.size()); // 指定初始化长度 避免长度超出默认值触发扩容for (ZSetOperations.TypedTuple<String> subscribeBlog : subscribeBlogs) {String id = subscribeBlog.getValue();ids.add(id); // 将id添加到集合用于后续查找动态详情// 判断时间戳是否有重合 有则记录数+1long time = subscribeBlog.getScore().longValue();if (time == minTimestamp) {cursor += 1;} else {cursor = 1;minTimestamp = time; // 更新最小时间戳}}int newOffset = cursor > 1 ? cursor + offset : cursor; // 如果cursor>1则有时间戳重合的部分 要加上重复的部分 下一次查询才能跳过重复的List<Blog> blogs = blogMapper.findByIds(ids); // 根据ids查出所有博客和用户信息for (Blog blog : blogs) { // 查询用户是否点赞和点赞数String likeKey = BLOG_LIKED + blog.getId();Set<String> likedUserIds = stringRedisTemplate.opsForZSet().reverseRange(likeKey, 0, -1);blog.setIsLike(likedUserIds.contains(userId.toString()));blog.setLiked(likedUserIds.size());}scrollResult.setList(blogs);scrollResult.setOffset(newOffset);scrollResult.setMinTime(minTimestamp);return scrollResult;
}
💡 疑难剖析 要记住每次来查时上一次的最后一条时间戳会和本次重叠 所以当前偏移量要>=1 跳过重叠的数据 多余的取决于有没有相同的消息的时间戳 即同一时间点发送的消息有多少条 读完后下一次就要用偏移量跳过这些
附近的人
GEO数据结构
GEO是redis中可以存储经纬度坐标的数据结构 还能测距 可以找出范围内的所有目标
⚠️ 版本问题 GEO数据结构仅redis6.2+才支持 保证redis服务端版本match
常用命令
* GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
* GEODIST:计算指定的两个点之间的距离并返回
* GEOHASH:将指定member的坐标转为hash字符串形式并返回
* GEOPOS:返回指定member的坐标
* GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
* GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
* GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。
实现附近的人采用了redis的GEO数据结构 这个实现是不准的 就不具体展开了 直接记录一下代码吧
代码实现
先将店铺的经纬度数据生成好到redis, 按店铺分类id存
@Resource
private StringRedisTemplate stringRedisTemplate;@Resource
private ShopMapper shopMapper;@Test
public void insertShop() {List<Shop> all = shopMapper.findAll();Map<Long, List<Shop>> typeMap = all.stream().collect(Collectors.groupingBy(Shop::getTypeId));Set<Map.Entry<Long, List<Shop>>> entries = typeMap.entrySet();for (Map.Entry<Long, List<Shop>> entry : entries) {String key = "shop:geo:" + entry.getKey();// 店铺id就是zset的值 店铺经纬度的hash就是分数List<RedisGeoCommands.GeoLocation<String>> geoLocations = entry.getValue().stream().map(shop -> new RedisGeoCommands.GeoLocation<>(shop.getId().toString(), new Point(shop.getX().doubleValue(), shop.getY().doubleValue()))).collect(Collectors.toList());stringRedisTemplate.opsForGeo().add(key, geoLocations);}
}
根据经纬度查询
@Override
public List<Shop> queryShopByType(Integer typeId, Integer current, Double x, Double y) {if (Objects.isNull(x) || Objects.isNull(y)) {// 没有传x, y 传统分页PageHelper.startPage(current, MAX_PAGE_SIZE);Page<Shop> shops = shopMapper.findByType(typeId);return shops;}// 如传了x,y 使用GEOSEARCH进行检索String key = "shop:geo:" + typeId;// 计算分页起始量long from = (current - 1) * MAX_PAGE_SIZE;long to = current * MAX_PAGE_SIZE;// 查询xy坐标500米内的素有店铺并返回它们之间的距离GeoResults<RedisGeoCommands.GeoLocation<String>> geoResults = stringRedisTemplate.opsForGeo().search(key,GeoReference.fromCoordinate(x, y),new Distance(500), // 默认单位是米RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(to) // 返回包含距离值);List<GeoResult<RedisGeoCommands.GeoLocation<String>>> content = geoResults.getContent();// 如果数据量不足起始值说明没有数据了 直接返回空if (content.size() <= from) {return Collections.emptyList();}// 拿到所有店铺id去查询店铺数据并设置距离值List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = content.stream().skip(from).collect(Collectors.toList());HashMap<Long, BigDecimal> distanceMap = new HashMap<>();for (GeoResult<RedisGeoCommands.GeoLocation<String>> geoLocationGeoResult : list) {String id = geoLocationGeoResult.getContent().getName(); // 获取存的店铺iddouble distance = geoLocationGeoResult.getDistance().getValue(); // 距离distanceMap.put(Long.valueOf(id), new BigDecimal(distance).setScale(2, BigDecimal.ROUND_HALF_UP));}List<Shop> shops = shopMapper.findByIds(distanceMap.keySet());for (Shop shop : shops) {shop.setDistance(distanceMap.get(shop.getId()));}return shops;
}
用户签到
BitMap数据结构
bitmap数据结构也叫位图 是一个二进制位组成的一个数据 底层基于String 所以操作的api也封装在opsForValue中
常用命令
* SETBIT:向指定位置(offset)存入一个0或1
* GETBIT :获取指定位置(offset)的bit值
* BITCOUNT :统计BitMap中值为1的bit位的数量
* BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
* BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
* BITOP :将多个BitMap的结果做位运算(与 、或、异或)
* BITPOS :查找bit数组中指定范围内第一个0或1出现的位置
使用bitmap记录用户每个月的签到 已签到的位置设为1
代码实现
/*** @description 当前用户签到*/
@Override
public void sign() {String key = getSignKey();int today = LocalDateTime.now().getDayOfMonth() - 1; // offset是从0开始的 所以要-1stringRedisTemplate.opsForValue().setBit(key, today, true);
}@Override
public int signCount() {String key = getSignKey();int today = LocalDateTime.now().getDayOfMonth(); // 多少个比特位 从1开始// unsigned就是查几个比特位 今天是几号就查几个 valueAt传0表示从1号开始查List<Long> results = stringRedisTemplate.opsForValue().bitField(key, BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(today)).valueAt(0));// 如果为空则返回0if (Objects.isNull(results) || results.isEmpty()) {return 0;}// 因为只传了一个子命令 所以取第一个Long result = results.get(0); // 返回的是一个十进制数if (Objects.isNull(result) || result == 0) {return 0;}int continuousCount = 0;while (true) {if ((result & 1) == 0) { // 如果遇到0则为缺卡break;}continuousCount++;result >>>= 1; // 取下一个计算}return continuousCount;
}
💡 二进制运算 记录连续打卡时无符号右移每一个bit 挨个计算是否为1 利用了按位与的特性 只有都为1时才为1
网站UV统计
HyperLogLog数据结构
- HyperLogLog数据结构是一个字符串集合 对内存非常友好 统计准确率有80% 在大部分的允许范围内 不管多少数据都保证存储不超过16kb
- HyperLogLog的特性就是不存重复的值 所以可以去重 天然用于做UV统计
常用命令
百万数据内存占用统计
完结撒花🌸🌸🌸😄