目录
P1 Redis企业实战课程介绍
P2 短信登录 导入黑马点评项目
P3 短信登录 基于session实现短信登录的流程
P4 短信登录 实现发送短信验证码功能
P5 短信登录 实现短信验证码登录和注册功能
P6 短信登录 实现登录校验拦截器
P7 短信登录 隐藏用户敏感信息
P8 短信登录 session共享的问题分析
P9 短信登录 Redis代替session的业务流程
P10 短信登录 基于Redis实现短信登录
P11 短信登录 解决状态登录刷新问题
P12 什么是缓存
P13 添加商户缓存
P14 缓存练习题分析
P15 缓存更新策略
P16 实现商铺缓存与数据库的双写一致
P17 缓存穿透的解决思路
P18 编码解决商铺查询的缓存穿透问题
P19 缓存雪崩问题及解决思路
P20 缓存击穿问题及解决方案
P21 利用互斥锁解决缓存击穿问题
P22 利用逻辑过期解决缓存击穿问题
P23 封装Redis工具类
P24 缓存总结
P25 优惠券秒杀 全局唯一ID
P26 优惠券秒杀 Redis实现全局唯一id
P27 优惠券秒杀 添加优惠券
P28 优惠券秒杀 实现秒杀下单
P29 优惠券秒杀 库存超卖问题分析
P30 优惠券秒杀 乐观锁解决超卖
P31 优惠券秒杀 实现一人一单功能
P32 优惠券秒杀 集群下的线程并发安全问题
P33 分布式锁 基本原理和不同实现方式对比
P34 分布式锁 Redis的分布式锁实现思路
P35 分布式锁 实现Redis分布式锁版本1
P36 分布式锁 Redis分布式锁误删问题
P37 分布式锁 解决Redis分布式锁误删问题
P38 分布式锁 分布式锁的原子性问题
P39 分布式锁 Lua脚本解决多条命令原子性问题
P40 分布式锁 Java调用lua脚本改造分布式锁
P41 分布式锁 Redisson功能介绍
P42 分布式锁 Redisson快速入门
P43 分布式锁 Redisson的可重入锁原理
P44 分布式锁 Redisson的锁重试和WatchDog机制
P45 分布式锁 Redissson的multiLock原理
P46 秒杀优化 异步秒杀思路
P47 秒杀优化 基于Redis完成秒杀资格判断
P48 秒杀优化 基于阻塞队列实现秒杀异步下单
P49 Redis消息队列 认识消息队列
P50 Redis消息队列 基于List实现消息队列
P51 Redis消息队列 PubSub实现消息队列
P52 Redis消息队列 Stream的单消费模式
P53 Redis消息队列 Stream的消费组模式
P54 Redis消息队列 基于Stream消息队列实现异步秒杀
P55 达人探店 发布探店笔记
P56 达人探店 查看探店笔记
P57 达人探店 点赞功能
P58 达人探店 点赞排行榜
P59 好友关注 关注和取关
P60 好友关注 共同关注
P61 好友关注 Feed流实现方案分析
P62 好友关注 推送到粉丝收件箱
P63 好友关注 滚动分页查询收件箱的思路
P64 好友关注 实现滚动分页查询
P65 附近商铺 GEO数据结构的基本用法
P66 附近商铺 导入店铺数据到GEO
P67 附近商铺 实现附近商户功能
P68 用户签到 BitMap功能演示
P69 用户签到 实现签到功能
P70 用户签到 统计连续签到
P71 UV统计 HyperLogLog的用法
P72 UV统计 测试百万数据的统计
P1 Redis企业实战课程介绍
P2 短信登录 导入黑马点评项目
首先在数据库连接下新建一个数据库hmdp,然后右键hmdp下的表,选择运行SQL文件,然后指定运行文件hmdp.sql即可(建议MySQL的版本在5.7及以上):
下面这个hm-dianping文件是项目源码。在IDEA中打开。
记得要修改数据库连接和Redis连接的密码:
运行程序后尝试访问:localhost:8081/shop-type/list 进行简单测试:
将nginx文件复制到一个没有中文路径的目录,然后点击nginx.exe运行:
在nginx所在目录打开CMD窗口,输入命令:start nginx.exe
访问:localhost:8080,选择用手机模式看,可以看到具体的页面:
P3 短信登录 基于session实现短信登录的流程
点击发送验证码可以看到验证码发送成功:
P4 短信登录 实现发送短信验证码功能
controller/UserController中写入如下代码:
@PostMapping("code")
public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {//发送短信验证码并保存验证码return userService.sendCode(phone,session);
}
public interface IUserService extends IService<User> {
Result sendCode(String phone, HttpSession session);
}
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {@Overridepublic Result sendCode(String phone, HttpSession session) {//校验手机号if(RegexUtils.isPhoneInvalid(phone)){//不符合return Result.fail("手机号格式错误");}//生成验证码String code = RandomUtil.randomNumbers(6);//保存验证码到sessionsession.setAttribute("code",code);//发送验证码log.debug("发送短信验证码成功,验证码:"+code);return Result.ok();}
}
P5 短信登录 实现短信验证码登录和注册功能
service/impl/UserServiceImpl的UserServiceImpl中写入如下代码:
@Overridepublic Result login(LoginFormDTO loginForm, HttpSession session) {String phone = loginForm.getPhone();//校验手机if(RegexUtils.isPhoneInvalid(phone)){return Result.fail("手机号格式错误");}//校验验证码Object cacheCode = session.getAttribute("code");String code = loginForm.getCode();if(cacheCode==null || !cacheCode.toString().equals(code)){//不一致,报错return Result.fail("验证码错误");}//一致根据手机号查用户User user = query().eq("phone", phone).one();//判断用户是否存在if(user==null){//不存在,创建用户并保存user = createUserWithPhone(loginForm.getPhone());}//保存用户信息到sessionsession.setAttribute("user",user);return null;}private User createUserWithPhone(String phone){//1.创建用户User user = new User();user.setPhone(phone);user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));//2。保存用户save(user);return user;}
前端点击发送验证码,后端直接把验证码摘抄后输入:
勾选协议然后确定登录,出现如下代码:
然后看到数据库后台记录已更新:
P6 短信登录 实现登录校验拦截器
preHandle前置拦截:
postHandle后置拦截:
afterCompletion视图渲染之后返回给用户之前:
在utils下面编写一个LoginInterceptor类,实现preHandle和afterCompletion这两个方法(这里User和UserDto的问题,我推荐的是统一使用UserDto,采用BeanUtils里的copy方法即可):
public class LoginInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//获取sessionHttpSession session = request.getSession();//获取用户User user = (User) session.getAttribute("user");//判断用户是否存在if(user==null){response.setStatus(401);return false;}UserDTO userDTO = new UserDTO();BeanUtils.copyProperties(user,userDTO);//存在,保存用户信息的ThreadLocalUserHolder.saveUser(userDTO);//放行return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {//移除用户UserHolder.removeUser();}
}
在config下面创建一个MvcConfig类:
通过addInterceptors方法来添加拦截器,registry是拦截器的注册器。
用.excludePathPatterns来排除不需要拦截的路径。在这里code、login、bloghot、shop、shopType、upload和voucher等都不需要拦截。
@Configuration
public class MvcConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry){registry.addInterceptor(new LoginInterceptor()).excludePathPatterns("/user/code","/user/login","/upload/**","/blog/hot","/shop/**","/shop-type/**","/voucher/**");}
}
输入手机号码点击获取验证码,写入返回后端的验证码,勾选协议之后,登录会直接返回首页,此时看我的个人主页没问题:
P7 短信登录 隐藏用户敏感信息
在P6已将User转为UserDTO返回给前端。
P8 短信登录 session共享的问题分析
多台Tomcat并不共享session存储空间,当请求切换不同Tomcat服务器时会导致数据丢失的问题。
session的替代方案应该满足:1.数据共享。2.内存存储。3.key、value结构。
P9 短信登录 Redis代替session的业务流程
想要保存用户的登录信息有2种方法:1.用String类型。2.用Hash类型。
String类型是以JSON字符串格式来保存,比较简单直观,但是占用内存比较多(因为有name和age这类的json格式):
Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD,并且内存占用更少:
以随机的token作为key来存储用户的数据,token是用一个随机的字符串。
P10 短信登录 基于Redis实现短信登录
在UserServiceImpl中写入如下代码(调用StringRedisTemplate中的set方法进行数据插入,最好在key的前面加入业务前缀以示区分,形成区分):
@Resource
private StringRedisTemplate stringRedisTemplate;
在sendCode这个方法里将保存验证码的代码替换为下面:
//保存验证码到redis
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
在login这个方法里进行如下2处修改:
首先是校验验证码:
//校验验证码
String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
然后是添加把用户信息添加到Redis的逻辑:
//7.保存用户信息到redis----------------
//7.1 随机生成Token作为登录令牌
String token = UUID.randomUUID().toString(true);
//7.2 将User对象转为Hash存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO);
//7.3 存储
stringRedisTemplate.opsForHash().putAll("login:token:"+token,userMap);
//7.4设置token有效期
String tokenKey = LOGIN_USER_KEY+token;
stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES);
return Result.ok(token);
在MvcConfig类上有@Configuration注解,说明是由Spring来负责依赖注入。
在MvcConfig类中要编写如下的代码:
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry){
registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
.excludePathPatterns(
"/user/code",
"/user/login",
"/upload/**",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/voucher/**"
);
}
}
在utils下的LoginInterceptor中写入如下代码:
public class LoginInterceptor implements HandlerInterceptor {
@Resource
private StringRedisTemplate stringRedisTemplate;
public LoginInterceptor(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//TODO;1.获取请求头中的token
String token = request.getHeader("authorization");
if(StrUtil.isBlank(token)){
//不存在,拦截,返回401状态码
response.setStatus(401);
return false;
}
//TODO:2.基于TOKEN获取redis的用户
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
//判断用户是否存在
if(userMap.isEmpty()){
//不存在,拦截,返回401状态码
response.setStatus(401);
return false;
}
//TODO:3.将查询到的Hash数据转化为UserDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
//TODO:4.存在,保存用户信息的ThreadLocal
UserHolder.saveUser(userDTO);
//TODO:5.刷新token有效期
stringRedisTemplate.expire(LOGIN_USER_KEY + token,RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
//放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
//移除用户
UserHolder.removeUser();
}
}
测试:首先把Redis和数据库都启动。 原始的项目的Redis的服务器ID需要更改为自己的。点击发送验证码,redis中有记录,没问题:
但点击登录的时候会报一个无法将Long转String的错误。因为用的是stringRedisTemplate要求所有的字段都是string类型的。
需要对UserServiceImpl中如下的位置进行修改:
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO,new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));
效果如下:
P11 短信登录 解决状态登录刷新问题
现在只有在用户访问拦截器拦截的页面才会刷新页面,假如用户访问的是不需要拦截的页面则不会导致页面的刷新。
现在的解决思路是:新增一个拦截器,拦截一切路径。
复制LoginInterceptor变成一份新的RefreshTokenInterceptor,把下面几处地方改为return true即可:
LoginInterceptor的代码变成如下:
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.判断是否需要拦截(ThreadLocal中是否有用户)
if(UserHolder.getUser()==null){
//没有,需要拦截,设置状态码
response.setStatus(401);
//拦截
return false;
}
//放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
//移除用户
UserHolder.removeUser();
}
}
现在还需要在MvcConfig里面对拦截器进行更新配置,需要(用order)调整拦截器的执行顺序:
@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry){
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/user/code",
"/user/login",
"/upload/**",
"/blog/hot",
"/shop/**",
"/shop-type/**",
"/voucher/**"
).order(1);
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate))
.addPathPatterns("/**").order(0);
}
}
P12 什么是缓存
缓存就是数据交换的缓冲区,是存储数据的临时地方,一般读写性能较高。
缓存作用:降低后端负载;提高读写的效率,降低响应时间。
缓存成本:数据一致性成本(数据库里的数据如果发生变化,容易与缓存中的数据形成不一致)。代码维护成本高(搭建集群)。运营成本高。
P13 添加商户缓存
在ShopController类的queryShopById方法中:
@GetMapping("/{id}")
public Result queryShopById(@PathVariable("id") Long id) {
return Result.ok(shopService.queryById(id));
}
@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Object queryById(Long id) {
String key = CACHE_SHOP_KEY + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)){
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
//5.不存在,返回错误
if(shop==null){
return Result.fail("店铺不存在!");
}
//6.存在,写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
return Result.ok(shop);
}
}
核心是通过调用hutool工具包中的JSONUtil类来实现对象转JSON(方法:toJsonStr(对象))和JSON转对象(方法:toBean(json,Bean的类型))。
P14 缓存练习题分析
TODO:对分类进行缓存。
P15 缓存更新策略
主动更新:编写业务逻辑,在修改数据库的同时,更新缓存。
适用于高一致性的需求:主动更新,以超时剔除作为兜底方案。
主动更新策略:
1.由缓存的调用者,在更新数据库的同时更新缓存。(一般情况下使用该种方案)
2.缓存与数据库聚合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存的一致性问题。
3.调用者只操作缓存,由其它线程异步的将缓存数据持久化到数据库,保证最终一致。
对1进行分析:
1.选择删除缓存还是更新缓存?如果是更新缓存:每次更新数据库都会更新缓存,无效的写操作比较多。删除缓存:更新数据库时让缓存失效,查询时再更新缓存。
2.如何保证缓存与数据库的操作的同时成功或失败?
单体系统:将缓存与数据库操作放在一个事务。
分布式系统:利用TCC等分布式事务方案。
3.先操作缓存还是先操作数据库?
先删缓存,再操作(写)数据库:
先操作(写)数据库,再删除缓存(出现的概率比较低)
要求线程1来查询的时候缓存恰好失效了->在写入缓存的时候突然来了线程2,对数据库的数据进行了修改->此时线程1写回缓存的是旧数据。
P16 实现商铺缓存与数据库的双写一致
给查询商铺的缓存添加超时剔除和主动更新的策略。
修改ShopController中的业务逻辑,满足下面要求:
1.根据id查询商铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间。
2.根据id修改店铺时,先修改数据库,再删除缓存。
首先修改ShopServiceImpl的redis过期时间:
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
@PutMapping
public Result updateShop(@RequestBody Shop shop) {
// 写入数据库
return Result.ok(shopService.update(shop));
}
@Override
public Object update(Shop shop) {
Long id = shop.getId();
if(id == null){
return Result.fail("商铺id不存在");
}
updateById(shop);
stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
return Result.ok();
}
首先删除缓存中的数据,然后看SQL语句是否执行,是否加上了TTL过期时间。
在PostMan中访问http://localhost:8081/shop,然后修改101茶餐厅为102茶餐厅:
注意要发送的是PUT请求,请求的内容如下:
{
"area": "大关",
"openHours": "10:00-22:00",
"sold": 4215,
"address": "金华路锦昌文华苑29号",
"comments": 3035,
"avgPrice": 80,
"score": 37,
"name": "102茶餐厅",
"typeId": 1,
"id": 1
}
然后去数据库看是否名称更新为102茶餐厅,然后看缓存中的数据是否被删除,用户刷新页面看到102茶餐厅,缓存中会有最新的数据。
P17 缓存穿透的解决思路
缓存穿透指的是客户端请求的数据在缓存中和数据库中都不存在,使得缓存永远不会生效,请求都会打到数据库。
2种解决方法:
1.缓存空对象。优点:实现简单,维护方便。缺点:额外的内存消耗。可能造成短期的不一致(可以设置TTL)。
2.布隆过滤。在客户端和Redis之间加个布隆过滤器(存在不一定存在,不存在一定不存在,有5%的错误率)。
优点:内存占用较少,没有多余key。缺点:实现复杂,存在误判可能。
P18 编码解决商铺查询的缓存穿透问题
下图是原始的:
下面是更改后的:
在ShopServiceImpl类里对queryById方法进行修改:
@Override
public Object queryById(Long id) {
String key = CACHE_SHOP_KEY + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)){
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null。
if(shopJson != null){
return Result.fail("店铺信息不存在!");
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
//5.不存在,返回错误
if(shop==null){
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return Result.fail("店铺不存在!");
}
//6.存在,写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
}
测试:
localhost:8080/api/shop/1此时是命中数据。
localhost:8080/api/shop/0此时未命中数据。打开缓存可以看到缓存的是空,并且TTL是200秒。
总结缓存穿透:用户请求的数据在缓存中和数据库中都不存在,不断发起请求,会给数据库造成巨大压力。
缓存穿透:缓存null值和布隆过滤器。还可以增强id的复杂度,避免被猜测id规律。做好数据的基础格式校验。加强用户权限校验。做好热点参数的限流。
P19 缓存雪崩问题及解决思路
缓存雪崩:是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求打到数据库,带来巨大的压力。
解决方案:
1.(解决大量缓存key同时失效)给不同Key的TTL添加随机值。
2.(解决Redis宕机)利用Redis集群提高服务的可用性。
3.给缓存业务添加降级限流策略。
4.给业务添加多级缓存(浏览器可以有缓存,nginx可以有缓存,redis可以有缓存,数据库可以有缓存)。
P20 缓存击穿问题及解决方案
缓存击穿问题:也叫热点key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然消失了,无数的请求访问在瞬间给数据库带来巨大的冲击。
解决方案:
1.互斥锁。由获取互斥锁成功的线程来查询数据库重建缓存数据。缺点:未获得互斥锁的线程需要等待,性能略差。
2.逻辑过期。设置一个逻辑时间字段,查询缓存的时候检查逻辑时间看是否已过期。如果某个线程获取到互斥锁就开启新线程,由新线程查询数据库重建缓存数据。
其它线程在获取互斥锁失败后不会等待,而是直接返回过期的数据。只有当缓存重建完毕之后释放锁,新线程才会读到最新的数据。
互斥锁优点:
互斥锁没有额外的内存消耗:因为逻辑过期需要维护一个逻辑过期的字段,有额外内存消耗。
互斥锁可以保证强一致性,所有线程拿到的是最新数据。实现也很简单。
互斥锁缺点:
线程需要等待,性能受到影响。可能会有死锁的风险。
逻辑过期优点:
线程无需等待,性能较好。
逻辑过期缺点:
不保证一致性。有额外内存消耗。实现复杂。
P21 利用互斥锁解决缓存击穿问题
在ShopServiceImpl类中定义一个tryLock方法(在Redis中的setnx相当于setIfAbsent方法。)
public boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
public void unLock(String key){
stringRedisTemplate.delete(key);
}
public Shop queryWithPassThrough(Long id){
String key = CACHE_SHOP_KEY + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)){
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return shop;
}
//上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null。
if(shopJson != null){
return null;
}
//4.不存在,根据id查询数据库
Shop shop = getById(id);
//5.不存在,返回错误
if(shop==null){
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//6.存在,写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
return shop;
}
public Shop queryWithMutex(Long id){
String key = CACHE_SHOP_KEY + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)){
//3.存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return shop;
}
//上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null。
if(shopJson != null){
return null;
}
//4.实现缓存重建
//4.1 获取互斥锁
String lockKey = LOCK_SHOP_KEY+id;
Shop shop = null;
try {
boolean isLock = tryLock(lockKey);
//4.2 判断是否获取成功
if(!isLock){
//4.3 失败,则休眠并重试
Thread.sleep(50);
return queryWithMutex(id);
}
//4.4 获取互斥锁成功,根据id查询数据库
shop = getById(id);
//模拟重建的延时
Thread.sleep(200);
//5.数据库查询失败,返回错误
if(shop==null){
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//6.存在,写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//7.释放互斥锁
unLock(lockKey);
}
//8.返回
return shop;
}
public Object queryById(Long id) {
//缓存穿透
//Shop shop = queryWithPassThrough(id);
//互斥锁解决缓存击穿
Shop shop = queryWithMutex(id);
return Result.ok(shop);
}
测试:
定义1000个线程,Ramp-Up时间为5。
请求地址:localhost:8081/shop/1。
设置完毕后点击绿色箭头运行,此时会提示是否保存测试文件,选择不保存(我测试选择保存会报错)。
可以在结果树这里看请求是否发送成功:
先删掉缓存,然后点击绿色箭头发送并发请求,可以发现所有线程请求成功,控制台对数据库的查询只有1次(没有出现多个线程争抢查询数据库的情况),测试成功。
P22 利用逻辑过期解决缓存击穿问题
如何添加逻辑过期字段?答:可以在utils包下定义RedisData类(可以让Shop继承RedisData类),也可以在RedisData中设置一个Shop类的data属性:
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
public void saveShop2Redis(Long id,Long expireSeconds){
//1.查询店铺数据
Shop shop = getById(id);
//2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
//3.写入Redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
}
单元测试,在test包下的HmDianPingApplicationTests中创建testSaveShop类写入测试代码(这里要注意的是输入alt+insert之后选择Test Method要选择Junit 5来进行测试方法的编写):
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private ShopServiceImpl shopService;
@Test
void testSaveShop() {
shopService.saveShop2Redis(1L,10L);
}
}
可以看到redis中确实存入了数据:
在ShopServiceImpl中复制一份缓存穿透的代码,更改名称为queryWithLogicalExpire:
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10); //自动生成线程池
public Shop queryWithLogicalExpire(Long id){
String key = CACHE_SHOP_KEY + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isBlank(shopJson)){
//3.不存在,返回空
return null;
}
//4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
//5.判断是否过期
//5.1 未过期直接返回店铺信息
LocalDateTime expireTime = redisData.getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){
return shop;
}
//5.2 已过期重建缓存
//6.缓存重建
//6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
//6.2.判断是否获取互斥锁成功
if(isLock){
//6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
saveShop2Redis(id,20L); //实际中应该设置为30分钟
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unLock(lockKey);
}
});
}
//6.4.失败,返回过期的商铺信息
return shop;
}
测试:
先到数据库把102茶餐厅改为103茶餐厅(因为Redis之前插入了一条缓存为102茶餐厅,并且已经过期,此时数据库与缓存不一致),新的HTTP请求会将逻辑过期的数据删除,然后更新缓存。
线程数设置为100,Ramp-up时间设置为1
在查看结果树里面到中间某个HTTP请求会完成重建,响应数据会改变。
1.安全性问题:在高并发情况下是否会有很多线程来做重建。
2.一致性问题:在重建完成之前得到的是否是旧的数据。
P23 封装Redis工具类
在utils包下创建CacheClient类,先写入如下基础的代码:
@Slf4j
@Component
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;
public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public void set(String key, Object value, Long time, TimeUnit unit){
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(value),time,unit);
}
public void setWithLogicalExpire(String key, Object value,Long expire,TimeUnit unit){
//设置逻辑过期
RedisData redisData = new RedisData();
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(expire)));
redisData.setData(value);
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redisData));
}
}
在CacheClient类中编写缓存穿透的共性方法queryWithPassThrough:
public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type,
Function<ID,R> dbFallBack,Long time,TimeUnit unit){
String key = keyPrefix + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isNotBlank(shopJson)){
//3.存在,直接返回
return JSONUtil.toBean(shopJson, type);
}
//上面是有值的情况,下面是无值的2种情况:A:空字符串。B:null。
if(shopJson != null){
return null;
}
//4.不存在,根据id查询数据库
R r = dbFallBack.apply(id);
//5.不存在,返回错误
if(r==null){
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
}
//6.存在,写入Redis
this.set(key,r,time,unit);
return r;
}
编写完queryWithPassThrough之后可以到ShopServiceImpl中直接调用新的方法(记得引入CacheClient类):
@Resource
private CacheClient cacheClient;
@Override
public Object queryById(Long id) {
//调用工具类解决缓存击穿
Shop shop = cacheClient.queryWithPassThrough(CACHE_SHOP_KEY, id, Shop.class, this::getById, CACHE_SHOP_TTL, TimeUnit.MINUTES);
if(shop==null){
return Result.fail("店铺不存在!");
}
return Result.ok(shop);
}
进行测试:成功会对不存在的店铺空值进行缓存。
接下来拷贝queryWithLogicalExpire的代码到CacheClient类中进行改写:
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type,Function<ID,R> dbFallBack,Long time,TimeUnit unit){
String key = keyPrefix + id;
//1.从Redis查询缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if(StrUtil.isBlank(shopJson)){
//3.不存在,返回空
return null;
}
//4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
R r = JSONUtil.toBean(data, type);
//5.判断是否过期
//5.1 未过期直接返回店铺信息
LocalDateTime expireTime = redisData.getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){
return r;
}
//5.2 已过期重建缓存
//6.缓存重建
//6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
//6.2.判断是否获取互斥锁成功
if(isLock){
//6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
//查询数据库
R r1 = dbFallBack.apply(id);
//写入redis
this.setWithLogicalExpire(key,r1,time,unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
unLock(lockKey);
}
});
}
//6.4.失败,返回过期的商铺信息
return r;
}
public boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
public void unLock(String key){
stringRedisTemplate.delete(key);
}
改写test下的HmDianPingApplicationTests类:
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private CacheClient cacheClient;
@Resource
private ShopServiceImpl shopService;
@Test
void testSaveShop() throws InterruptedException {
Shop shop = shopService.getById(1L);
cacheClient.setWithLogicalExpire(CACHE_SHOP_KEY+1L,shop,10L,TimeUnit.SECONDS);
}
}
测试:首先运行HmDianPingApplicationTests类里的测试方法,10秒后逻辑过期,此时运行后台程序,修改数据库1号商铺的name字段,此时访问:localhost:8080/api/shop/1 会出现效果第1次访问为缓存旧值,然后发现缓存过期开始重建,第2次访问开始就是新值。数据库也只有1次重建。
P24 缓存总结
P25 优惠券秒杀 全局唯一ID
每个店铺都可以发布优惠券,当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID会存在一些问题。
1.id的规律性太明显。
2.受单表数据量的限制(分表之后每张表都自增长,id会出现重复)。
全局ID生成器:是一种在分布式系统下用来生成全局唯一ID的工具。
要求全局唯一ID生成器满足如下几点:1.唯一性。2.高可用。3.高性能。4.递增性。5.安全性。
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息。
符号位永远为0代表整数。
31位的时间戳是以秒为单位,定义了一个起始时间,用当前时间减起始时间,预估可以使用69年。
32位的是序列号是Redis自增的值,支持每秒产生2^32个不同ID。
P26 优惠券秒杀 Redis实现全局唯一id
在utils包下定义一个RedisWorker类,是一个基于Redis的ID生成器。
如果只使用一个key来自增记录有一个坏处,最终key的自增数量会突破容量的上限,假如自增超过32位彼时便无法再存储新的数据,解决的方案是采用拼接日期。
@Component
public class RedisIdWorker {
private static final long BEGIN_TIMESTAMP = 1640995200L;
//序列号的位数
private static final int COUNT_BITS=32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId(String keyPrefix){
//1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long timeStamp = now.toEpochSecond(ZoneOffset.UTC) - BEGIN_TIMESTAMP;
//2.生成序列号
//2.1获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.2自增长
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//3.拼接并返回
return timeStamp << COUNT_BITS | count;
}
}
在HmDianPingApplicationTests中写入如下的测试代码:
@Resource
private ShopServiceImpl shopService;
@Resource
private RedisIdWorker redisIdWorker;
private ExecutorService es = Executors.newFixedThreadPool(500);
@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task = ()->{
for(int i=0;i<100;i++){
long id = redisIdWorker.nextId("order");
System.out.println("id="+id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for(int i=0;i<300;i++){
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("Result Time = " + (end-begin));
}
运行之后可以看到以十进制输出的所有编号:
可以在Redis中看到自增长的结果,1次是30000:
大概2秒可以生成3万条,速度还是可以的。
全局唯一ID生成策略:
1.UUID利用JDK自带的工具类即可生成,生成的是16进制的字符串,无单调递增的特性。
2.Redis自增(每天一个key,方便统计订单量。时间戳+计数器的格式。)
3.snowflake雪花算法(不依赖于Redis,性能更好,对于时钟依赖)
4.数据库自增
P27 优惠券秒杀 添加优惠券
每个店铺都可以发放优惠券,分为平价券和特价券。平价券可以任意抢购,特价券需要秒杀抢购。
tb_voucher:优惠券基本信息,优惠金额,使用规则等。
tb_seckill_voucher:优惠券的库存,开始抢购时间,结束抢购时间,只有特价优惠券才需要填写这些信息。
请求的信息如下可自行复制(注意beginTime和endTime需要修改):
{
"shopId":1,
"title":"100元代金券",
"subTitle":"周一至周五均可使用",
"rules":"全场通用\\n无需预约\\n可无限叠加\\不兑现、不找零\\n仅限堂食",
"payValue":8000,
"actualValue":10000,
"type":1,
"stock":100,
"beginTime":"2024-04-10T10:09:17",
"endTime":"2024-04-11T12:09:04"
}
注意要在请求头中带Authorization参数否则会报401(登录后进入“我的”页面,看网络包有Authorization的值):
以如下格式发送请求:
首先在tb_voucher表中可以看到新增的优惠券:
在tb_seckill_voucher表中也可以看到秒杀优惠券的具体信息:
在前端也能看到新增的100元代金券,注意优惠券的时间一定要进行更改,如果不在开始和结束时间区间内优惠券会处于下架状态是看不到的。
P28 优惠券秒杀 实现秒杀下单
首先要判断秒杀是否开始或结束,所以要先查询优惠券的信息,如果尚未开始或者已经结束无法下单。
要判断库存是否充足,如果不足则无法下单。
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@Resource
private IVoucherService voucherService;
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherService.seckillVoucher(voucherId);
}
}
@Service
@Transactional
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
//2.1秒杀尚未开始返回异常
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀尚未开始");
}
//2.2秒杀已结束返回异常
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
//3.判断库存是否充足
if(voucher.getStock()<1){
//3.1库存不足返回异常
return Result.fail("库存不足!");
}
//3.2库存充足扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).update();
if(!success){
return Result.fail("库存不足!");
}
//4.创建订单,返回订单id
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");//订单id
voucherOrder.setId(orderId);
Long userId = UserHolder.getUser().getId();//用户id
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);//代金券id
save(voucherOrder);
return Result.ok(orderId);
}
}
测试:点击限时抢购之后会提示抢购成功。
P29 优惠券秒杀 库存超卖问题分析
Jmeter的配置如下:
注意Authorization要事先登录获取:
下面是结果:
发现tb_seckill_voucher中库存为-9,在tb_voucher_order中插入了109条数据,说明出现了超卖的问题。
正常逻辑:
非正常逻辑:
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案是加锁。
悲观锁:认为线程安全问题一定会发送,因此在操作数据之前要先获取锁,确保线程串行执行。像Synchronized、Lock都属于悲观锁。
乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。
如果没有修改则认为是安全的,自己才更新数据。
如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。‘
乐观锁关键是判断之前查询得到的数据是否被修改过,常见的方法有2种:
1.版本号法:
2.CAS法(版本号法的简化版):查询的时候把库存查出来,更新的时候判断库存和之前查到的库存是否一致,如果一致则更新数据。
P30 优惠券秒杀 乐观锁解决超卖
只需加上下面这段代码即可:.eq("stock",voucher.getStock()) 。用于比较当前数据库的库存值和之前查询到的库存值是否相同,只有相同时才可以执行set语句。
//3.2库存充足扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
.eq("voucher_id", voucherId) //相当于where条件 where id = ? and stock = ?
.eq("stock",voucher.getStock()).update();
但现在出现了异常值偏高的问题,正常的请求大约只占10%。
原理是因为:假如一次有30个线程涌入,查询到库存值为100,只有1个线程能把值改为99,其它29个线程比对库存值99发现和自己查询到的库存值100不同,所以都认为数据已经被修改过,所以都失败了。
乐观锁的问题,成功率太低。
现在只需要保证stock>0即可,只要存量大于0就可以任意扣减。
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
.eq("voucher_id", voucherId) //相当于where条件 where id = ? and stock = ?
.gt("stock",0).update();
乐观锁缺陷:
需要大量对数据库进行访问,容易导致数据库的崩溃。
总结:
P31 优惠券秒杀 实现一人一单功能
修改秒杀业务,要求同一个优惠券,一个用户只能下一单。
首先不建议把锁加在方法上,因为任何一个用户来了都要加这把锁,而且是同一把锁,方法之间变成串行执行,性能很差。
因此可以把锁加在用户id上,只有当id相同时才会对锁形成竞争关系。但是因为toString的内部是new了一个String字符串,每调一次toString都是生成一个全新的字符串对象,锁对象会变。
所以可以调用intern()方法,intern()方法会优先去字符串常量池里查找与目标字符串值相同的引用返回(只要字符串一样能保证返回的结果一样)。
但是因为事务是在函数执行结束之后由Spring进行提交,如果把锁加在createVoucherOrder内部其实有点小——因为如果解锁之后,其它线程可以进入,而此时事务尚未提交,仍然会导致安全性问题。
因此最终方案是把synchronized加在createVoucherOrder的方法外部,锁住的是用户id。
关于代理对象事务的问题:通常情况下,当一个使用了@Transactional注解的方法被调用时,Spring会从上下文中获取一个代理对象来管理事务。
但是如果加@Transactional方法是被同一个类中的另一个方法调用时,Spring不会使用代理对象,而是直接调用该方法,导致事务注解失效。
为避免这种情况,可以使用AopContext.currentProxy方法获取当前的代理对象,然后通过代理对象调用被@Transactional注解修饰的方法,确保事务生效。
在VoucherOrderServiceImpl中写入如下代码(注意:ctrl+alt+m可以把含有return的代码段进行提取):
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
//2.1秒杀尚未开始返回异常
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀尚未开始");
}
//2.2秒杀已结束返回异常
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
voucher = seckillVoucherService.getById(voucherId);
//3.判断库存是否充足
if(voucher.getStock()<1){
//3.1库存不足返回异常
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()){
//获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId) {
//6.一人一单
Long userId = UserHolder.getUser().getId();
//6.1查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//6.2判断是否存在
if(count>0){
//用户已经购买过了
return Result.fail("用户已经购买过一次!");
}
//3.2库存充足扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
.eq("voucher_id", voucherId) //相当于where条件 where id = ? and stock = ?
.gt("stock",0).update();
if(!success){
return Result.fail("库存不足!");
}
//4.创建订单,返回订单id
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");//订单id
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);//代金券id
save(voucherOrder);
return Result.ok(orderId);
}
}
在IVoucherOrderService接口中加入下面这个方法:
Result createVoucherOrder(Long voucherId);
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
在启动类HmDianPingApplication上加如下注解:
@EnableAspectJAutoProxy(exposeProxy = true)
测试: 成功实现一名用户只能领取一张优惠券。
P32 优惠券秒杀 集群下的线程并发安全问题
本P主要是为了验证在集群下synchronized并不能保证线程的并发安全。
如下图可以设置项目启动的端口号,确保启动的项目之间端口号不同:
在nginx.conf中放开8082的这个配置:
向下面这个页面发送请求:
http://localhost:8080/api/voucher/list/1
可以看到请求会分别被8082和8081接收,是轮询的效果:
首先到tb_voucher_order把之前的订单删除,到tb_seckill_voucher中把stock重新改回100。
准备2个相同的秒杀请求:要注意请求的地址是:http://localhost:8080/api/voucher-order/seckill/13
我这里直接用Jemeter来进行测试,模拟高并发场景:
下面是效果:可以看到并发请求能够同时进入集群的每台结点。
正常情况:
在集群模式下,每一个节点都是一个全新的JVM,每个JVM都有自己的锁。锁监视器只能在当前JVM的范围内,监视线程实现互斥。
现在就要实现让多个JVM使用的是同一把锁。跨JVM、跨进程的锁。
P33 分布式锁 基本原理和不同实现方式对比
synchronized只能保证单个JVM内部的多个线程之间的互斥,而没法让集群下多个JVM进程间的线程互斥。
要让多个JVM进程能看到同一个锁监视器,而且同一时间只有一个线程能拿到锁监视器。
所以必须使用分布式锁,分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁要满足:多进程可见+互斥+高可用+高性能+安全性。
分布式锁可以通过MySQL或Redis或Zookeeper来实现。
MySQL:
1.互斥:是利用mysql本身的互斥锁机制。在执行写操作的时候,MySQL会自动分配一个互斥的锁。
2.可用性:好。3.性能:受限于MySQL性能。
4.安全性:事务机制,如果断开连接,会自动释放锁。
Redis:
1.互斥:利用setnx这样的互斥命令。往Redis里set数据只有不存在时才能set成功。
2.可用性:好,Redis支持主从和集群。3.性能:好。
4.安全性:如果没有执行删除key的操作,key不会自动释放。但可以利用锁的超时机制,到期自动释放。
Zookeeper:
1.利用节点的唯一性(节点不重复)和有序性(节点递增)实现互斥。利用有序性:id最小的节点获取锁成功;释放锁只需要删除id最小的节点。
2.可用性:好。3.性能:比Redis差,一般,强调强一致性,主从间同步需要时间。
4.安全性:好。因为是临时节点,断开连接会自动释放。
P34 分布式锁 Redis的分布式锁实现思路
假如获取锁后宕机,锁无法释放——>可以添加超时过期时间。
为了防止锁在SETEX和EXPIRE之间过期,可以直接用一条命令(原子操作)来实现设置过期时间(EX)和只有lock不存在时才能设置(NX)。
采用非阻塞式获取锁,如果成功返回true,失败返回false。
P35 分布式锁 实现Redis分布式锁版本1
在utils下面创建一个ILock接口:
public interface ILock {
//尝试获取锁
boolean tryLock(long timeoutSec);
//释放锁
void unlock();
}
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标示
long threadId = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId+"",timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
//释放锁
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券信息
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
//2.1秒杀尚未开始返回异常
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀尚未开始");
}
//2.2秒杀已结束返回异常
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
voucher = seckillVoucherService.getById(voucherId);
//3.判断库存是否充足
if(voucher.getStock()<1){
//3.1库存不足返回异常
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
SimpleRedisLock lock = new SimpleRedisLock("order:"+userId,stringRedisTemplate);
boolean isLock = lock.tryLock(1200);
//判断是否获取锁成功
if(!isLock) {
return Result.fail("不允许重复下单");
}
try {
//获取代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}finally {
lock.unlock();
}
}
经测试多台节点相同用户只能获取同一张优惠券成功:
P36 分布式锁 Redis分布式锁误删问题
假如某个线程(线程A)获取到锁之后,出现了业务阻塞,导致阻塞时间超过了锁自动释放的时间,锁因超时自动释放。此时其它线程(线程B)过来拿到了锁,开始执行业务。但线程A此时业务执行完毕,释放了锁,但释放的是线程B的锁。此时线程C过来看锁已被释放,趁虚而入拿到锁,此时线程B和线程C是并行执行。
要解决这个问题:线程在删除锁之前要先看锁是否是自己加的(获取锁的标示并判断是否一致)。
P37 分布式锁 解决Redis分布式锁误删问题
1.在获取锁时存入线程标示(可以用UUID表示)。
2.在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致(如果一致释放锁,如果不一致则不释放锁)。
首先要修改SimpleRedisLock里面的如下代码,主要是调用hutool工具包生成UUID(每次线程调用都会生成一个唯一的UUID),让Redis的前缀变成UUID+线程ID:
private static final String ID_PREFIX = UUID.fastUUID().toString(true)+"-";
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,threadId,timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
//获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if(threadId.equals(id)){
//释放锁
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
P38 分布式锁 分布式锁的原子性问题
现在假设出现了其它问题,比如线程1在判断完锁标示是否一致之后出现了阻塞(比如JVM垃圾回收FULL GC导致阻塞了过长时间),此时锁超时了,线程2趁虚而入获取了锁,此时线程1直接释放了线程2的锁,此时线程3趁虚而入继续给Redis加锁,此时会出现线程2和线程3并行执行。
根本的原因是:获取锁标示和释放锁的操作不是原子性的,现在要解决的问题就是将这两个操作变成原子性的。
P39 分布式锁 Lua脚本解决多条命令原子性问题
Redis提供Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
Lua是一种编程语言,它的基本语法可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
执行脚本的方法:
执行一个写死的set命令:
在Lua语言里,数组的第一个元素下标是1。
P40 分布式锁 Java调用lua脚本改造分布式锁
繁琐版的Lua脚本内容如下:
-- 锁的key
local key = KEYS[1]
-- 当前线程标示
local threadId = ARGV[1]
--获取锁中的线程标示
local id = redis.call('get',key)
--比较线程标示与锁中的标示是否一致
if(id == threadId) then
--释放锁 del key
return redis.call('del',key)
end
return 0
简化版的Lua脚本内容如下:
--比较线程标示与锁中的标示是否一致
if(redis.call('get',KEYS[1]) == ARGV[1]) then
--释放锁 del key
return redis.call('del',KEYS[1])
end
return 0
在resources下创建unlock.lua,会提示下载一个plugins点击install,然后只需要下载一个EmmyLua即可,实测如果下载了多个Lua相关的插件会产生冲突,最终导致IDEA打不开,这真是血泪的教训!
在SimpleRedisLock中写入如下的代码,因为我们希望的是在一开始就将Lua的脚本加载好,而不是等到要调用释放锁的时候再去加载Lua脚本,所以采用静态变量和静态代码块,这些部分在类初始化的时候就会被加载:
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public void unlock() {
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name),
ID_PREFIX + Thread.currentThread().getId());
}
在程序1和程序2的下面这个位置打上断点:
在测试API中测试访问如下的URL:
http://localhost:8080/api/voucher-order/seckill/14
分别测试秒杀优惠券1和2:
在Redis中能看到程序1获取锁成功,然后直接把lock锁删掉,模拟超时释放的情况:
然后让程序2往下走一步,可以看到程序2获取到了锁
然后可以直接放行程序1,会看到结果是程序2加的锁没有被删除。
最后放行程序2,会看到程序2加的锁被删除。
总结:
基于Redis的分布式锁的实现思路:
1.利用set nx ex获取锁,并设置过期时间,保存线程标示。
2.释放锁时先判断线程标示是否与自己一致,一致则删除锁。
特性:
1.利用set nx满足互斥性。
2.利用set nx保障故障时锁依然能够释放,避免死锁,提高安全性。
3.利用Redis集群保障高可用和高并发的特性。
P41 分布式锁 Redisson功能介绍
目前基于setnx实现的分布式锁存在以下几个问题:
1.不可重入:同一线程无法多次获取同一把锁。
2.不可重试:获取锁只尝试一次就返回false,没有重试机制。
3.超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放存在安全隐患。
4.主从一致性:如果Redis提供了主从集群,主从同步存在延迟,当主节点宕机时,如果从节点还未同步主节点中的锁数据,则会出现锁信息的不一致。
Redisson是一个在Redis的基础上实现的Java驻内存数据网格。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中包含了各种分布式锁的实现。
P42 分布式锁 Redisson快速入门
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
@Configuration
public class RedissonConfig{
@Bean
public RedissonClient redissonClient(){
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("");
//创建RedissonClient对象
return Redisson.create(config);
}
}
第3步,引入RedissonClient,调用getLock获取锁对象,然后用tryLock获取锁。
第4步,启动服务
发送下面的请求:
第4步,启动服务
发送下面的请求:
在执行释放锁的语句前,可以看到Redis中有锁的记录:
用jmeter来测试,可以发现没有出现并发安全问题:
P43 分布式锁 Redisson的可重入锁原理
ReentrantLock可重入锁的原理:获取锁的时候在判断这个锁已经被占有的情况下,会检查占有锁的是否是当前线程,如果是当前线程,也会获取锁成功。会有一个计数器记录重入的次数。
会通过下面的结构来记录某个线程重入了几次锁。
每释放一次锁采用的策略是把重入次数减1。
加锁和释放锁是成对出现的,因此当方法执行到最外层结束时,重入的次数一定会减为0。
1.是否存在锁
2.存在锁,判断是否是自己的。
是,锁计数+1。
不是,获取锁失败。
3.不存在锁
获取锁,添加线程标示。
Redisson底层可重入锁加锁的逻辑:
Redisson底层可重入锁解锁的逻辑:
P44 分布式锁 Redisson的锁重试和WatchDog机制
下面是对含有waitTime(等待时间)的tryLock的跟踪:
看门狗超时时间是30秒
subscribeFuture.await等待的是释放锁的通知,如果future在指定时间内获得,返回true,等待的是time的时间,time是锁的剩余最大等待时间。
如果超时返回false,然后会进到cancel里,调用unsubscribe方法,取消订阅。
不是无休止的忙等机制,而是只有当锁释放后获得通知后才进行加锁尝试,在没收到通知前是被阻塞状态。
下面是定时更新锁的有效期的逻辑:
相当于设置了一个定时任务每隔10秒重置一次有效期。
定时任务的结束是在解锁的逻辑当中:
获取锁机制:
1.判断ttl是否为null
1.1 为null,获取锁成功(涉及自动更新锁过期时间),判断leaseTime是否为-1
1.1.1 为-1自动开启看门狗机制,定时更新锁的过期时间
看门狗默认30秒,每隔10秒会更新一次过期时间。
1.1.2 不为-1返回true
1.2 不为null,获取锁失败(涉及获取锁的失败重试),判断剩余等待时间是否大于0
1.2.1 大于0,订阅并等待释放锁的信号
在受到释放信号后会判断是否超时,如未超时继续尝试获取锁
1.2.2 不大于0,获取锁失败
释放锁机制:
1.尝试释放锁,判断是否成功
1.1 释放成功。
发送锁释放的消息(与获取锁的失败重试关联)
取消看门狗机制(与自动更新锁过期时间关联)
1.2 释放失败。返回异常。
Redisson是如何解决可重入问题、获取锁的失败重试、锁超时释放问题的?
可重入问题:利用哈希表记录线程id和重入次数。
获取锁的失败重试:利用消息订阅和信号量方式实现获取锁失败时的等待、唤醒和锁的重试获取。
锁超时释放:利用看门狗机制,每隔一段时间,重置超时时间。
P45 分布式锁 Redissson的multiLock原理
主节点负责写,从节点负责读,主节点和从节点间需要同步,会存在延迟。
如果主节点宕机,会从从节点中选拔一个新的节点作为主节点。如果主从同步尚未完成,会出现锁失效的问题。
现在在所有主节点中都存放一份锁,要求一个线程必须从所有主节点中获取锁,才算真正获取锁。
假如此时有一个主节点宕机,恰好主从同步没有完成,此时有其它线程趁虚而入获取到了新主节点的锁,但因为没能获取其它主节点的锁,因此也是获取锁失败的。
这种锁叫作联锁。
P46 秒杀优化 异步秒杀思路
秒杀业务流程:
1.扣减优惠券的库存(不能超卖,判断库存是否充足)
2.将用户抢购的优惠券信息写入订单,完成订单的创建
3.一个用户对一个优惠券只能下一单
为了获取1000名用户的token,我爆肝1h写了下面的生成代码。
生成效果如下:共计1008名用户,给每位用户生成了专属的token:
Jmeter中线程数设为1000:
在HTTP信息头管理器中进行如下设置:
在CSV数据文件设置中进行如下设置:
下面是测试结果:
最小值和最大值是响应时间的最小值和最大值。平均值是平均响应时间。
优惠券被抢完,没有超领和少领的情况发生:
查询优惠券、查询订单、减库存、创建订单都需要与数据库交互,导致效率低下。特别是减库存和创建订单都是对数据库的写操作,耗时较久。
异步开启一个独立的线程去完成Tomcat的操作。
库存:KEY用string类型,VALUE用数值类型。
一人一单:KEY用string类型,VALUE用set集合类型。
因为这段代码比较长要用Lua脚本来编写:
首先要执行Lua脚本,然后判断返回结果是否为0,如返回0代表成功下单优惠券,将优惠券id、用户id和订单id放入阻塞队列,直接返回订单id给用户。
如果想提高写入数据库的性能,可以多开线程,由单个线程的写,变成多个线程批量的写。
P47 秒杀优化 基于Redis完成秒杀资格判断
在VoucherServiceImpl的addSeckillVoucher方法的末尾添加下面这段代码把秒杀的库存保存到Redis中:
//保存秒杀的库存到Redis
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY +voucher.getId(),voucher.getStock().toString());
发送请求,新增一份优惠券:
可以看到在Redis中记录了优惠券的记录:
在redis中可以用sadd来往set集合中添加键值,可以用sismember来查询集合中是否有某个元素。
Lua脚本编写如下:
--1.参数列表
--1.1.优惠券id
local voucherId = ARGV[1]
--1.2.用户id
local userId = ARGV[2]
--2.数据key
--2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
--3.脚本业务
--3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey)) <= 0) then
--3.1.2.库存不足,返回1
return 1
end
--3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember',orderKey,userId)==1) then
--3.2.1.存在,说明是重复下单,返回2
return 2
end
--3.3.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.4.下单 sadd orderKey userId
redis.call('sadd',orderKey,userId)
return 0
在VoucherOrderServiceImpl类中写入如下代码:
Lua脚本的加载:
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static{
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
Long result = stringRedisTemplate.execute( //调用execute方法,返回值
SECKILL_SCRIPT, //加载的模板对象
Collections.emptyList(), //键参数
voucherId.toString(), //值参数1
UserHolder.getUser().getId().toString() //值参数2
);
@Override
public Result seckillVoucher(Long voucherId) {
//1.执行Lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
UserHolder.getUser().getId().toString()
);
int r = result.intValue();
if(r != 0){ //2.判断结果是否为0,不为0,代表没有购买资格
return Result.fail(r==1 ? "库存不足":"不能重复下单");
}
//2.2.为0,有购买资格,把下单信息保存到阻塞队列
long orderId = redisIdWorker.nextId("order");
// TODO 保存阻塞队列
//3.返回订单id
return Result.ok(orderId);
}
可见这种优化对系统的性能提升非常大!
P48 秒杀优化 基于阻塞队列实现秒杀异步下单
阻塞队列:尝试从队列获取元素,如果没有元素会被阻塞,直到队列中有元素才会被唤醒,获取元素。
只要类一启动,用户随时都有可能来抢购,因此VoucherOrderHandler这个类的初始化必须在类初始化后执行。
在VoucherOrderServiceImpl类中,首先要新增一个orderTasks阻塞队列,然后设置一个线程池和run方法。
在run方法中调用阻塞队列的take方法,orderTasks.take方法是一个阻塞方法,如果队列中有元素会获取,如果队列中无元素则阻塞等待。
这里相当于是开启了一个全新的线程来执行获取队列中订单信息和异步创建订单的任务:
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
private static ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();
@PostConstruct
private void init(){
seckill_order_executor.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
@SneakyThrows
@Override
public void run() {
while(true){
try {
//1.获取队列中的订单信息
VoucherOrder voucherOrder = orderTasks.take();
//2.创建订单
handleVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
log.debug("处理订单异常",e);
}
}
}
}
public IVoucherOrderService proxy ;
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户
Long userId = voucherOrder.getUserId();
//2.创建锁对象
RLock lock = redissonClient.getLock("lock:order:"+userId);
//3.获取锁
boolean isLock = lock.tryLock();
//4.判断是否获取锁成功
if(!isLock) {
log.error("不允许重复下单");
return;
}
try {
//获取代理对象
proxy.createVoucherOrder(voucherOrder);
}finally {
lock.unlock();
}
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
//6.一人一单
Long userId = voucherOrder.getUserId();
//6.1查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
//6.2判断是否存在
if(count>0){
//用户已经购买过了
log.error("用户已经购买过一次!");
return;
}
//3.2库存充足扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //相当于set条件 set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId()) //相当于where条件 where id = ? and stock = ?
.gt("stock",0).update();
if(!success){
log.error("库存不足!");
return;
}
long orderId = redisIdWorker.nextId("order");//订单id
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherOrder.getVoucherId());//代金券id
save(voucherOrder);
}
public Result seckillVoucher(Long voucherId) {
//1.执行Lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
UserHolder.getUser().getId().toString()
);
int r = result.intValue();
if(r != 0){ //2.判断结果是否为0,不为0,代表没有购买资格
return Result.fail(r==1 ? "库存不足":"不能重复下单");
}
//2.2.为0,有购买资格,把下单信息保存到阻塞队列
long orderId = redisIdWorker.nextId("order");
//封装
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);//订单id
voucherOrder.setUserId(UserHolder.getUser().getId());//用户id
voucherOrder.setVoucherId(voucherId);//代金券id
//保存阻塞队列
orderTasks.add(voucherOrder);
//获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//3.返回订单id
return Result.ok(orderId);
}
秒杀业务的优化思路:
1.先利用Redis完成库存量、一人一单的判断,完成抢单业务。
2.将下单业务放入阻塞队列,利用独立线程异步下单。
基于阻塞队列的异步秒杀存在哪些问题:
1.内存限制问题。使用的是jdk提供的阻塞队列,使用的是JVM的内存,在一开始写死了队列空间的大小,如果在高并发的情况下,队列很快会被占满,如果不对队列的空间加以限制,很容易造成内存的溢出。
2.数据安全问题。缺乏持久化机制,是基于内存来保存信息,如果服务突然宕机,内存中保存的信息都会丢失。如果任务被取出,但由于突然发生事故异常,导致任务没有被消费,任务丢失,会造成数据不一致问题。
P49 Redis消息队列 认识消息队列
1.消息队列是在JVM外部的独立服务,不受JVM内存的限制。
2.消息队列不仅负责数据存储,还要保证数据安全。消息队列在消费者接收到消息后要进行消息确认
Redis提供了3种不同的方式来实现消息队列:
1.list结构:基于List结构模拟消息队列。
2.PubSub(发布订阅):基本的点对点消息模型。
3.Stream:比较完善的功能强大的消息队列模型。
P50 Redis消息队列 基于List实现消息队列
Redis的list数据结构是一个双向链表,容易模拟出队列效果。
队列的入口和出口不在一边,可以利用:LPUSH结合RPOP,RPUSH结合LPOP来实现。
如果队列中没有消息时RPOP或LPOP的操作会返回null,不会像JVM的阻塞队列那样阻塞并等待消息,因此这里应该用BRPOP或BLPOP来实现阻塞效果。
P51 Redis消息队列 PubSub实现消息队列
PubSub(Publish Subscribe 发布订阅):Redis2.0版本引入的消息传递模型,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE 频道名称 : 订阅一个或多个频道。
PUBLISH channel msg : 向一个频道发送消息。
PSUBSCRIBE pattern : 订阅与pattern格式匹配的所有频道。
P52 Redis消息队列 Stream的单消费模式
要注意,如果想使用Stream消息队列必须把Redis的版本上升到5.0之后。
需要注意的是key和*|ID中间那俩参数是可选参数,一个是用来判断是否自动创建队列,一个是用来设置队列最大消息数量。
P53 Redis消息队列 Stream的消费组模式
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。消费者之间是竞争关系。
1.消息分流:队列中的消息会分流给组内不同消费者,而不是重复消费,从而加快消息处理的速度。
2.消息标示:消费者组会维护一个标示(类似于标签,记录读到哪里了),记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息,确保每一个消息都会被消费。
3.消息确认(解决消息丢失问题):消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。
P54 Redis消息队列 基于Stream消息队列实现异步秒杀
直接通过控制台创建一个stream.orders队列:
直接在Lua脚本中编写代码(主要增加一个局部变量,):
--1.参数列表
--1.1.优惠券id
local voucherId = ARGV[1]
--1.2.用户id
local userId = ARGV[2]
--1.3.订单id
local orderId = ARGV[3]
--2.数据key
--2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
--3.脚本业务
--3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get',stockKey)) <= 0) then
--3.1.2.库存不足,返回1
return 1
end
--3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember',orderKey,userId)==1) then
--3.2.1.存在,说明是重复下单,返回2
return 2
end
--3.3.扣库存 incrby stockKey -1
redis.call('incrby',stockKey,-1)
--3.4.下单 sadd orderKey userId
redis.call('sadd',orderKey,userId)
--3.5.发送消息到队列中 XADD stream.orders * k1 v1 k2 v2
redis.call('xadd','stream.orders','*','userId',userId,'voucherId',voucherId,'orderId',orderId)
return 0
public Result seckillVoucher(Long voucherId) {
//获取订单id
long orderId = redisIdWorker.nextId("order");
//1.执行Lua脚本(判断用户是否有购买资格,消息发出)
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
UserHolder.getUser().getId().toString(),
String.valueOf(orderId)
);
int r = result.intValue();
if(r != 0){ //2.判断结果是否为0,不为0,代表没有购买资格
return Result.fail(r==1 ? "库存不足":"不能重复下单");
}
//获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//3.返回订单id
return Result.ok(orderId);
}
private static ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();
@PostConstruct
private void init(){
seckill_order_executor.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
String queueName = "stream.order";
@SneakyThrows
@Override
public void run() {
while(true){
try {
//1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//2.判断消息获取是否成功
if(list==null || list.isEmpty()){
//2.1.获取失败,没有消息,继续下一次循环
continue;
}
//3.解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
//4.获取成功,可以下单
Map<Object, Object> values = record.getValue();
//3.创建订单
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
createVoucherOrder(voucherOrder);
//4.ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.debug("处理订单异常",e);
handlePendingList();
}
}
}
private void handlePendingList() {
while(true){
try {
//1.获取Pending-List中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order 0
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//2.判断消息获取是否成功
if(list==null || list.isEmpty()){
//2.1.获取失败,说明Pending-list里没有异常消息,结束循环
break;
}
//3.解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
//4.获取成功,可以下单
Map<Object, Object> values = record.getValue();
//3.创建订单
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//4.ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.debug("处理Pending-list异常",e);
try {
Thread.sleep(20);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
P55 达人探店 发布探店笔记
上传图片:
在我的和首页都可以看到新发布的博文:
P56 达人探店 查看探店笔记
探店笔记要包含笔记的内容和博主的相关信息。所以选择在Blog表中添加如下2个字段,这两个字段需要后续我们手动维护(赋值)。
@GetMapping("/{id}")
public Result queryBlogById(@PathVariable("id") Long id){
Blog blog = blogService.getById(id);
if(blog==null){
return Result.fail("笔记不存在");
}
User user = userService.getById(id);
blog.setIcon(user.getIcon());
blog.setName(user.getNickName());
return Result.ok(blog);
}
P57 达人探店 点赞功能
现在的点赞逻辑是,一个人可以对同一篇笔记点赞无数次。
需求:
1.同一个用户只能点赞一次,如果再次点击则取消点赞。
2.如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
分析:
1.给Blog类中添加一个isLike字段,标示是否被当前用户点赞。
2.修改点赞功能,利用Redis的Set集合判断是否点赞过,未点赞则点赞数+1,已点赞则点赞数-1。
3.修改分页查询Blog业务和根据id查询Blog的业务,判断当前用户是否点赞过,赋值给isLike字段。
@PutMapping("/like/{id}")
public Result likeBlog(@PathVariable("id") Long id) {
return blogService.likeBlog(id);
}
private final StringRedisTemplate stringRedisTemplate;
public BlogServiceImpl(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public Result likeBlog(Long id) {
//1.获取登录用户
UserDTO user = UserHolder.getUser();
Long userId = user.getId();
//2.判断当前用户是否已经点赞过
String key = "blog:liked:" +id;
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
if(BooleanUtil.isFalse(isMember)){
//3.未点赞,可以点赞
//3.1.数据库点赞数+1
boolean isSuccess = update().setSql("liked=liked+1").eq("id", id).update();
//3.2.保存用户到Redis
if(isSuccess){
stringRedisTemplate.opsForSet().add(key,userId.toString());
}
}else{
//4.已点赞,取消点赞
//4.1.数据库点赞数-1
boolean isSuccess = update().setSql("liked=liked-1").eq("id", id).update();
//4.2.把用户从Redis的set集合移除
stringRedisTemplate.opsForSet().remove(key,userId.toString());
}
return Result.ok();
}
public Boolean isBlogLiked(Blog blog) {
Long userId = null;
try {
//1.获取登录用户
userId = UserHolder.getUser().getId();
} catch (Exception e) {
log.debug("用户未登录!");
return false;
}
//2.判断当前用户是否已经点赞过
String key = "blog:liked:" +blog.getId();
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
try {
blog.setIsLike(BooleanUtil.isTrue(isMember));
} catch (Exception e) {
log.debug("点赞信息为空!");
return false;
}
return isMember;
}
P58 达人探店 点赞排行榜
@GetMapping("/likes/{id}")
public Result queryBlogLikes(@PathVariable("id") Long id) {
return blogService.queryBlogLikes(id);
}
@Override
public Result queryBlogLikes(Long id) {
String key = RedisConstants.BLOG_LIKED_KEY +id;
//1.查询top5的点赞用户 zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if(top5==null || top5.isEmpty()){
return Result.ok(Collections.emptyList());
}
//2.解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
//3.根据用户id查询用户
List<UserDTO> userDTOS = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
//4.返回
return Result.ok(userDTOS);
}
@Override
public Result queryBlogLikes(Long id) {
String key = RedisConstants.BLOG_LIKED_KEY +id;
//1.查询top5的点赞用户 zrange key 0 4
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if(top5==null || top5.isEmpty()){
return Result.ok(Collections.emptyList());
}
//2.解析出其中的用户id
List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
//3.根据用户id查询用户
String idStr = StrUtil.join(",", ids);
List<UserDTO> userDTOS = userService.query()
.in("id",ids)
.last("ORDER BY FIELD(id,"+idStr+")").list()
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
//4.返回
return Result.ok(userDTOS);
}
P59 好友关注 关注和取关
@RestController
@RequestMapping("/follow")
public class FollowController {
@Autowired
IFollowService followService;
@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFollow) {
return followService.follow(followUserId,isFollow);
}
@GetMapping("/or/not/{id}")
public Result isFollow(@PathVariable("id") Long followUserId) {
return followService.isFollow(followUserId);
}
}
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {
@Override
public Result follow(Long followUserId, Boolean isFollow) {
Long userId = UserHolder.getUser().getId();
//1.判断是关注还是取关
if(isFollow){
//2.关注,新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
save(follow);
}else{
//3.取关,删除记录
remove(new QueryWrapper<Follow>()
.eq("user_id", userId).eq("follow_user_id", followUserId));
}
return Result.ok();
}
@Override
public Result isFollow(Long followUserId) {
Long userId = UserHolder.getUser().getId();
//1.查询是否关注
Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
//2.判断是否关注
return Result.ok(count>0);
}
}
P60 好友关注 共同关注
@Resource
private final StringRedisTemplate stringRedisTemplate;
@Resource
private IUserService userService;
public FollowServiceImpl(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
@Override
public Result follow(Long followUserId, Boolean isFollow) {
Long userId = UserHolder.getUser().getId();
String key = "follows:" + userId;
//1.判断是关注还是取关
if(isFollow){
//2.关注,新增数据
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
boolean isSuccess = save(follow);
if(isSuccess){
//把关注用户的id放入redis的set集合
stringRedisTemplate.opsForSet().add(key,followUserId.toString());
}
}else{
//3.取关,删除记录
boolean isSuccess = remove(new QueryWrapper<Follow>()
.eq("user_id", userId).eq("follow_user_id", followUserId));
if(isSuccess){
//把关注用户的id从Redis移除
stringRedisTemplate.opsForSet().remove(key,followUserId.toString());
}
}
return Result.ok();
}
@GetMapping("/common/{id}")
public Result followCommons(@PathVariable("id") Long id){
return followService.followCommons(id);
}
@Override
public Result followCommons(Long id) {
//求的是目标用户和当前用户关注的交集
//1.获取key
Long userId = UserHolder.getUser().getId();
String key1 = "follows:"+userId;
String key2 = "follows:"+id;
//2.求交集
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);
if(intersect==null||intersect.isEmpty()){
return Result.ok(Collections.emptyList());
}
//3.解析id集合
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
//4.查询用户
List<UserDTO> users = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(users);
}
P61 好友关注 Feed流实现方案分析
P62 好友关注 推送到粉丝收件箱
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
blogService.saveBlog(blog);
// 返回id
return Result.ok(blog.getId());
}
@Override
public Result saveBlog(Blog blog) {
//1.获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
//2.保存探店笔记
boolean isSuccess = save(blog);
if(!isSuccess){
return Result.fail("新增笔记失败!");
}
//3.查询笔记作业的所有粉丝
//select * from tb_follow where follow_user_id = ?
List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();
//4.推送笔记id给粉丝
for(Follow follow : follows){
//4.1.获取粉丝id
Long userId = follow.getUserId();
//4.2.推送到粉丝收件箱是sortedSet
String key = "feed::"+userId;
stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
}
//返回id
return Result.ok(blog.getId());
}
P63 好友关注 滚动分页查询收件箱的思路
P64 好友关注 实现滚动分页查询
@GetMapping("/of/follow")
public Result queryBlogOfFollow(@RequestParam("lastId") Long max,@RequestParam(value = "offset",defaultValue = "0") Integer offset) {
return blogService.queryBlogOfFollow(max,offset);
}
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
//1.获取当前用户
Long userId = UserHolder.getUser().getId();
//2.查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset count
String key = FEED_KEY+userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 3);
//2.1.非空判断
if(typedTuples==null || typedTuples.isEmpty()){
return Result.ok();
}
//3.解析数据:blogId、minTime(时间戳)、offset
List<Long> ids = new ArrayList<>(typedTuples.size());
long minTime = 0;
int os = 1;
for(ZSetOperations.TypedTuple<String> tuple:typedTuples){
//4.1.获取id
String idStr = tuple.getValue();
ids.add(Long.valueOf(idStr));
//4.2.获取分数
long time = tuple.getScore().longValue();
if(time == minTime){
os++;
}else{
minTime = time;
os=1;
}
}
//4.根据id查询blog
String idStr = StrUtil.join(",",ids);
List<Blog> blogs = query().in("id",ids).last("ORDER BY FIELD(id,"+idStr+")").list();
for (Blog blog : blogs) {
isBlogLiked(blog);
User user = userService.getById(blog.getUserId());
blog.setName(user.getNickName());
blog.setIcon(user.getIcon());
}
//5.封装并返回
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(os);
r.setMinTime(minTime);
return Result.ok(r);
}
P65 附近商铺 GEO数据结构的基本用法
P66 附近商铺 导入店铺数据到GEO
@Resource
StringRedisTemplate stringRedisTemplate;
@Test
void loadShopData(){
//1.查询店铺信息
List<Shop> list = shopService.list();
//2.把店铺分组,按照typeId分组,typeId一致的放到一个集合
Map<Long,List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));
//3.分批完成写入Redis
for (Map.Entry<Long, List<Shop>> entry : map.entrySet()) {
//3.1.获取类型id
Long typeid = entry.getKey();
String key = "shop:geo:"+typeid;
//3.2.获取同类型的店铺的集合
List<Shop> value = entry.getValue();
//3.3.写入redis GEOADD key 经度 纬度 member
List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList<>(value.size());
for(Shop shop : value){
// stringRedisTemplate.opsForGeo().add(key,new Point(shop.getX(),shop.getY()),shop.getId().toString());
locations.add(new RedisGeoCommands.GeoLocation<>(
shop.getId().toString(),
new Point(shop.getX(),shop.getY())
));
}
stringRedisTemplate.opsForGeo().add(key,locations);
}
}
P67 附近商铺 实现附近商户功能
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>spring-data-redis</groupId>
<artifactId>org.springframework.data</artifactId>
</exclusion>
<exclusion>
<groupId>lettuce-core</groupId>
<artifactId>io.lettuce</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.6.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.1.6.RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
@GetMapping("/of/type")
public Result queryShopByType(
@RequestParam("typeId") Integer typeId,
@RequestParam(value = "current", defaultValue = "1") Integer current,
@RequestParam(value="x",required=false) Double x,
@RequestParam(value="y",required=false) Double y
) {
return shopService.queryShopByType(typeId,current,x,y);
}
@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
//1.判断是否需要根据坐标查询
if(x==null || y==null){
//不需要查询坐标,按数据库查
Page<Shop> page = query()
.eq("type_id",typeId)
.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
return Result.ok(page.getRecords());
}
//2.计算分页参数
int from = (current - 1)*SystemConstants.DEFAULT_PAGE_SIZE;
int end = current*SystemConstants.DEFAULT_PAGE_SIZE;
//3.查询redis,按照距离排序、分页。结果:shopId,distance
String key = SHOP_GEO_KEY+typeId;
GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
.search(
key,
GeoReference.fromCoordinate(x, y),
new Distance(5000),
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
);
//4.解析出id
if(results==null){
return Result.ok(Collections.emptyList());
}
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
//4.1.截取from-end的部分
List<Long> ids = new ArrayList<>(list.size());
Map<String,Distance> distanceMap = new HashMap<>(list.size());
if(list.size()<=from){
return Result.ok(Collections.emptyList());
}
list.stream().skip(from).forEach(result->{ //跳过可能把所有数据跳过了
//4.2.获取店铺id
String shopIdStr = result.getContent().getName();
ids.add(Long.valueOf(shopIdStr));
//4.3.获取距离
Distance distance = result.getDistance();
distanceMap.put(shopIdStr,distance);
});
//5.根据id查询shop
String idStr = StrUtil.join(",", ids);
List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD ( id," + idStr + ")").list();
for(Shop shop : shops){
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
}
//6、返回
return Result.ok(shops);
}
P68 用户签到 BitMap功能演示
P69 用户签到 实现签到功能
@PostMapping("/sign")
public Result sign(){
return userService.sign();
}
@Override
public Result sign() {
//1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
//2.获取日期
LocalDateTime now = LocalDateTime.now();
//3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
//4.获取今天是本月第几天
int dayOfMonth = now.getDayOfMonth();
//5.写入Redis SETBIT key offset 1
stringRedisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
return Result.ok();
}
P70 用户签到 统计连续签到
@GetMapping("/sign/count")
public Result signCount(){
return userService.signCount();
}
@Override
public Result signCount() {
//1.获取当前登录用户
Long userId = UserHolder.getUser().getId();
//2.获取日期
LocalDateTime now = LocalDateTime.now();
//3.拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
//4.获取今天是本月第几天
int dayOfMonth = now.getDayOfMonth();
//5.获取本月截止今天为止的所有的签到记录,返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key, BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
if(result==null || result.isEmpty()){
//没有任何签到结果
return Result.ok(0);
}
Long num = result.get(0);
if(num==null || num==0){
return Result.ok(0);
}
//6.循环遍历
int count=0;
while(true){
//6.1.让这个数字与1做与运算,得到数字的最后一个bit位
if((num&1)==0){//6.2.判断这个bit位是否为0
//6.3.如果为0,说明未签到结束
break;
}else{
//6.4.如果不为0,说明已签到,计数器+1
count++;
}
//6.5.把数字右移一位,抛弃最后一个bit位,继续下一个bit位
num >>>= 1;
}
return Result.ok(count);
}
P71 UV统计 HyperLogLog的用法
P72 UV统计 测试百万数据的统计
@Test
void testHyperLogLog(){
String[] values = new String[1000];
int j=0;
for(int i=0;i<1000000;i++){
j=j%1000;
values[j] = "user_"+i;
if(j == 999){
stringRedisTemplate.opsForHyperLogLog().add("hl3",values);
}
j++;
}
//统计数量
Long res = stringRedisTemplate.opsForHyperLogLog().size("hl3");
System.out.println("hl3"+res);
}