模拟秒杀
- 源码地址
- 前期准备
- 创建数据库表
- 导入数据
- dependencies
- pom
- Controller
- TSeckillProduct
- TseckillProductService
- TseckillProductServiceImpl
- TseckillProductMapper
- TseckillProductMapper.xml
- 使用JMeter压力测试
- 开始测试
- 超卖现象
- 原因
- 解决办法
- 更改数据库库存500
- 进行JMeter压力测试
- 还有一种办法 在扣库存的时候加锁再去查询一下库存的数量
- sql语句恢复原来的
- 分布式锁
- 整合Redisson分布式锁
- 添加依赖
- 添加配置文件
- 添加redisson.yml
- 更改扣库存代码TseckillProductServiceImpl.updateStockCount
- 更改updateStockCount
- 吞吐量
- RedLock
- 配置类
- 案例代码
- 异步Servlet
- 传统Servlet请求示意图
- 异步Servlet请求示意图
- 测试异步Servlet
- 库存预热
- increment()
- putIfAbsent 初始化库存
- 代码初始化库存和redis递减秒杀
- 初始化库存 Controller
- TseckillProductServiceImpl
- 设置初始化库存
- 压力测试访问秒杀 /sec/preheat
- 结果 发现库存没有多扣
- 同步吞吐量
- 使用异步线程池的方式进行秒杀
- 代码
- Controller
- TseckillProductServiceImpl
- 异步吞吐量
- 使用redis的increment 方法实现递增递减的时候还需要加锁吗
- 吞吐量分析
- 优化线程池配置
- 异步处理吞吐量成功提高
源码地址
https://gitee.com/Lin-seven/seckill
前期准备
创建数据库表
CREATE TABLE `t_seckill_product` (`id` bigint NOT NULL AUTO_INCREMENT,`product_id` bigint DEFAULT NULL,`seckill_price` decimal(10,2) DEFAULT NULL,`intergral` decimal(10,0) DEFAULT NULL,`stock_count` int DEFAULT NULL,`start_date` date DEFAULT NULL,`time` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8mb3;
导入数据
INSERT INTO `shop-seckill`.`t_seckill_product`(`id`, `product_id`, `seckill_price`, `intergral`, `stock_count`, `start_date`, `time`) VALUES (2, 23, 3699.00, 36990, 10, '2024-07-15', 10);
dependencies
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.3.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.79</version></dependency></dependencies>
pom
# 应用服务 WEB 访问端口
server:port: 8899
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/shop-seckill?serverTimezone=GMT%2B8username: rootpassword: 123456application:name: shop-seckill#开启日志
mybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImplmapper-locations: classpath*:/**/mapper/**/*.xml
Controller
package com.lx.seckill.controller;import com.lx.seckill.pojo.TSeckillProduct;
import com.lx.seckill.service.TseckillProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.List;
import java.util.UUID;/*** TODO 添加描述** @author wangLJ* @date 2024/7/18 11:33*/
@RestController
@RequestMapping("/sec")
public class SeckillContoller {public Integer count =0;@Autowiredprivate TseckillProductService service;@GetMapping("/list")public List<TSeckillProduct> getList(){List<TSeckillProduct> list = service.list();System.out.println(count);return list;}@GetMapping("/byid")public TSeckillProduct byid(){TSeckillProduct list = service.byid();return list;}@GetMapping("/kill")public String kill() {TSeckillProduct list = service.byid();if (list.getStockCount()>0){//扣减库存service.updateStockCount();}return "ok";}}
TSeckillProduct
@Data
//@TableName(value = "eric_user")
public class TSeckillProduct {@TableId(value = "id", type = IdType.AUTO)private Integer id;/*** 商品ID*/private Integer productId;// 价格private Integer seckillPrice;//价格private Integer intergral;//数量private Integer stockCount;//场次private Integer time;}
TseckillProductService
public interface TseckillProductService {List<TSeckillProduct> list();TSeckillProduct byid();void updateStockCount();
}
TseckillProductServiceImpl
@Service
public class TseckillProductServiceImpl extends ServiceImpl<TseckillProductMapper, TSeckillProduct> implements TseckillProductService{@AutowiredTseckillProductMapper mapper;@Overridepublic List<TSeckillProduct> list() {return super.list();}@Overridepublic TSeckillProduct byid() {TSeckillProduct tSeckillProduct = mapper.selectById(2);return tSeckillProduct;}@Overridepublic void updateStockCount() {//扣减库存mapper.updateStockCount( );}}
TseckillProductMapper
@Mapper
public interface TseckillProductMapper extends BaseMapper<TSeckillProduct> {void updateStockCount();
}
TseckillProductMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" ><mapper namespace="com.lx.seckill.mapper.TseckillProductMapper"><update id="updateStockCount">UPDATE t_seckill_product set stock_count=stock_count-1 where id =2</update>
</mapper>
使用JMeter压力测试
安装教程https://blog.csdn.net/m0_37583655/article/details/126507267-靖节先生
线程组
安装好之后添加查看结果树和聚合报告
开始测试
填好对应的后端地址 点击开始
超卖现象
发现库存都成负数了。这是为什么呢?
原因
原因在于,有很多线程同时执行了service.byid();方法
拿到的库存,保存在自己的栈帧中。如果有一百个线程拿到的库存是大于0的,就有一百个线程去执行扣库存的方法
解决办法
使用synchronized
在去查询库存的时候加锁
@Overridepublic synchronized TSeckillProduct byid() {TSeckillProduct tSeckillProduct = mapper.selectById(2);return tSeckillProduct;}
更改数据库库存500
进行JMeter压力测试
通过测试发现,库存总是等于-1
这是因为
**读取与更新之间的窗口期:**当一个线程读取库存后,其他线程可能已经完成了库存的扣减操作,但当前线程由于持有库存值的本地副本,仍可能认为库存充足而继续执行扣减操作。
数据库事务隔离级别:即使使用了@Transactional注解,如果数据库的事务隔离级别设置不当(例如,默认的READ_COMMITTED),在读取库存和更新库存之间,其他事务可能已经改变了库存值,导致当前事务基于过期数据进行操作。
为了防止库存值变为负数,您可以采取以下几种策略之一:
**使用乐观锁:**在数据库表中增加一个版本号字段,每次更新库存时也更新版本号,并在更新语句中加入版本号的检查,以确保数据的一致性。
UPDATE t_seckill_product SET stock_count = stock_count - 1 WHERE id = 2 AND stock_count > 0
**使用悲观锁:**在查询库存时即锁定库存记录,直到事务结束。这可以通过SQL语句中的FOR UPDATE关键字实现。
原子操作:使用数据库提供的原子操作,如MySQL的INNODB存储引擎支持的原子自减操作,直接在数据库层面完成库存的扣减和检查。
UPDATE t_seckill_product SET stock_count = stock_count - 1 WHERE id = 2 FOR UPDATE
还有一种办法 在扣库存的时候加锁再去查询一下库存的数量
去除TSeckillProduct方法的synchronized 在updateStockCount方法加上synchronized
@Overridepublic TSeckillProduct byid() {TSeckillProduct tSeckillProduct = mapper.selectById(2);return tSeckillProduct;}@Transactional@Overridepublic synchronized void updateStockCount() {TSeckillProduct tSeckillProduct = mapper.selectById(2);if (tSeckillProduct.getStockCount()>0){//扣减库存mapper.updateStockCount( );}}
sql语句恢复原来的
<update id="updateStockCount">UPDATE t_seckill_product set stock_count=stock_count-1 where id =2</update>
分布式锁
如果是集群架构,以上使用synchronized 就不适用了。因为synchronized 锁住的只是当前JVM中的this对象,集群状态下,还是有可能发生并发关系,同时执行到updateStockCount,当获取的库存都会存入自己的栈帧,所以还会导致库存超卖的情况
整合Redisson分布式锁
添加依赖
<!-- redisson分布式锁--><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.13.6</version></dependency>
添加配置文件
# redis配置
spring:redis:database: 0host: 127.0.0.1
# password: redis@passport: 6379redisson:file: classpath:redisson.yml
添加redisson.yml
application.yml与 redisson.yml在同级,目录结构如下:
# 单节点配置
singleServerConfig:# 数据库编号database: 0# 节点地址address: redis://127.0.0.1:6379# 密码
# password: redis@pass
更改扣库存代码TseckillProductServiceImpl.updateStockCount
注入RedissonClient
@ResourceRedissonClient redissonClient;
更改updateStockCount
@Transactional@Overridepublic void updateStockCount() {/*** 获取一把锁,只要锁的名字一样,就是同一把锁,"my-lock"是锁名,也是Redis的哈希模型的对外key*/RLock lock = redissonClient.getLock("my-lock");//加锁/*** 阻塞式等待,默认加的锁等待时间为30s。每到20s(经过三分之一看门狗时间后)就会自动续借成30s* 1.锁的自动续期,如果在业务执行期间业务没有执行完成,redisson会为该锁自动续期* 2.加锁的业务只要运行完成,就不会自动续期,即使不手动解锁,锁在默认的30s后会自动删除*/// lock.lock();/*** (推荐)指定锁的过期时间,看门狗不会自动续期:* 在自定义锁的存在时间时不会自动解锁* 注意:* 设置的自动解锁时间一定要稳稳地大于业务时间*/lock.lock(30, TimeUnit.SECONDS);try {TSeckillProduct tSeckillProduct = mapper.selectById(2);if (tSeckillProduct.getStockCount()>0){//扣减库存mapper.updateStockCount( );}} finally {//释放锁lock.unlock();}}
分布式公平锁、读写锁、信号量、闭锁请查看文档
https://www.yuque.com/manmushanhe-kfrkq/pgm4gc/lpmvfoqmo1yzq8yv
吞吐量
RedLock
分布式锁算法,通过多个Redis实例来实现高可用的分布式锁服务,确保分布式系统中的数据安全和一致性。
适用于redis 集群模式下
依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.16.2</version>
</dependency>
配置类
@Configuration
public class RedLockConfig {@Bean(destroyMethod="shutdown")RedissonClient redissonClient() {Config config = new Config();// 可以配置多个Redis节点config.useClusterServers().setScanInterval(2000).addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001");return Redisson.create(config);}
}
案例代码
@Autowiredprivate RedissonClient redissonClient;public void yourMethod() {// 获取RedLockRedLock redLock = new RedLock(redissonClient);// 加锁boolean lockAcquired = redLock.lock("yourLockKey");try {if (lockAcquired) {// 业务逻辑} else {// 处理锁竞争}} finally {// 释放锁redLock.unlock("yourLockKey");}}
异步Servlet
传统Servlet请求示意图
同步阻塞: 传统Servlet采用同步阻塞的方式处理请求。每个请求到达时,Servlet容器会为其分配一个线程,
直到请求处理完成
或超时才释放线程
。
线程资源消耗: 每个请求都需要一个独立的线程,当请求需要等待IO操作或其他资源时,线程会被阻塞,导致资源浪费。
**简单易用:**开发和理解成本较低,符合传统的Servlet编程模型,适用于一般性的Web应用。
适用场景:
小规模并发请求的应用场景。
对响应时间和吞吐量要求不高的应用。
异步Servlet请求示意图
非阻塞异步:异步Servlet通过异步处理请求,不会阻塞服务器的主线程,可以在处理请求的过程中释放线程资源,等待IO操作完成后再继续处理。
性能优化:能够在高并发情况下显著提高系统的吞吐量和响应速度,有效地利用系统资源。
适用场景:
高并发请求的应用场景,特别是需要处理大量IO操作的情况,如文件上传、推送服务等。
对于需要提高系统性能和响应速度的要求较高的应用。
传统Servlet和异步Servlet虽然都不能
提高用户的等待时间
,但是能够提高系统的线程的利用率
作用:异步Servlet允许Servlet容器在处理请求时不阻塞线程
,从而提高服务器的并发处理能力和资源利用率
。它特别适合处理需要长时间计算或等待外部资源响应的请求,如文件上传、图像处理等。
实现方式:通过AsyncContext接口实现,允许Servlet在处理请求时将其放入异步处理模式,然后在后台线程完成处理,并在处理完成后返回响应。
测试异步Servlet
创建线程池
@Configuration
public class AsyncConfig {@Bean(name = "threadPoolTaskExecutor")public ThreadPoolTaskExecutor threadPoolTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5); // 核心线程数executor.setMaxPoolSize(10); // 最大线程数executor.setQueueCapacity(20); // 队列大小executor.setThreadNamePrefix("MyExecutor-"); // 线程名前缀executor.initialize();return executor;}}
**测试 使用CompletableFuture开启异步线程 **
@Autowiredprivate ThreadPoolTaskExecutor executor;/*** 异步请求* @return*/@GetMapping("/asynd")@Async("threadPoolTaskExecutor") // 指定使用哪个TaskExecutorpublic CompletableFuture<String> handleAsyncRequest() {return CompletableFuture.supplyAsync(() -> {String threadName = Thread.currentThread().getName();System.out.println("阿里 - " +threadName);// 模拟一个耗时操作try {Thread.sleep(5000);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("Sleep interrupted", e);}return "Hello from async method!";}, executor);}
库存预热
库存每次都去查询,这样会造成每次连接数据库,就会造成数据库的压力。
可以使用redisd
的increment
方法 实现递增的功能
increment()
increment() 方法是原子性的,确保多线程或多个客户端同时对同一个字段进行递增操作时的线程安全性 如果Redis中不存在这个key
的值, 会将其初始化为0然后执行递增操作。
putIfAbsent 初始化库存
opsForHash().putIfAbsent() 方法的作用是在哈希表中设置字段的值,
仅当字段不存在时才会设置成功
。如果字段已经存在,这个方法不会更新现有的值,而是保持当前的值不变。
如果想要改变key的值
可以直接使用PUT 方法
代码初始化库存和redis递减秒杀
初始化库存 Controller
/*** 初始化库存* @return*/@RequestMapping("/putIfAbsent")@ResponseBodypublic String putIfAbsent() {return service.putIfAbsent();}/*** 库存预热-秒杀* @return*/@RequestMapping("/preheat")@ResponseBodypublic String preheat() {return service.preheat();}
TseckillProductServiceImpl
@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic String putIfAbsent() {/*** opsForHash().putIfAbsent() 方法的作用是在哈希表中设置字段的值,* 但仅当字段不存在时才会设置成功。如果字段已经存在,这个方法不会更新现有的值,而是保持当前的值不变。*/TSeckillProduct tSeckillProduct = mapper.selectById(2);Integer stockCount = tSeckillProduct.getStockCount();/*** 应该使用putIfAbsent 方法* redisTemplate.opsForHash().putIfAbsent("product:stock", "quantity", stockCount+"");* 也可以使用* redisTemplate.opsForValue().setIfAbsent("product:stock", stockCount + "");* 测试为了方便设置库存,从而采用put 方法*///redisTemplate.opsForHash().putIfAbsent("product:stock", "quantity", stockCount+"");redisTemplate.opsForHash().put("product:stock", "quantity", stockCount+"");return "库存"+stockCount ;}@Overridepublic String preheat() {Long result = redisTemplate.opsForHash().increment("product:stock", "quantity", -1);System.out.println(result);if (result >= 0) {//扣减库存mapper.updateStockCount();return result + "";} else {return "库存不足";}}
设置初始化库存
初始化库存----- http://localhost:8899/sec/putIfAbsent
压力测试访问秒杀 /sec/preheat
结果 发现库存没有多扣
同步吞吐量
使用异步线程池的方式进行秒杀
代码
Controller
@RequestMapping("/asynPreheat")@ResponseBodypublic CompletableFuture<String> asynPreheat() {return service.asynPreheat();}
TseckillProductServiceImpl
@Async("threadPoolTaskExecutor") // 指定使用哪个TaskExecutor@Overridepublic CompletableFuture<String> asynPreheat() {return CompletableFuture.supplyAsync(() -> {Long result = redisTemplate.opsForHash().increment("product:stock", "quantity", -1);// 模拟一个耗时操作if (result >= 0) {//扣减库存mapper.updateStockCount();return result + "";} else {return "库存不足";}}, executor);}}
异步吞吐量
使用redis的increment 方法实现递增递减的时候还需要加锁吗
是 Redis 提供的原子性操作,确保在并发情况下递增或递减哈希表中的字段值是线程安全的。这是因为 Redis 单个命令的执行是原子性的,不会被其他命令打断,所以递增或递减操作是安全的。
吞吐量分析
为什么使用线程池的asynPreheat的吞吐量是10.5/sec(低) 反而没有使用线程池preheat 的吞吐量是919/sec(较高)
asynPreheat() 方法:
使用了 @Async 注解,表明该方法会在一个单独的线程中执行,并且指定了一个自定义的线程池 (executor)。
异步方法通常会有额外的线程
切换和调度开销
,尤其是在高并发
情况下,线程池可能会面临任务排队和等待执行的情况。
preheat() 方法:
这是一个同步方法,每次调用都会在当前线程中执行,
没有额外的线程开销
。 吞吐量高达 919/sec
可能是因为每次请求都能快速响应,并且没有异步等待
或线程切换的开销
。
优化线程池配置
- 更改核心线程数
- 更改最大线程数
- 更改队列大小
@Bean(name = "threadPoolTaskExecutor")public ThreadPoolTaskExecutor threadPoolTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(20); // 核心线程数executor.setMaxPoolSize(30); // 最大线程数executor.setQueueCapacity(90); // 队列大小executor.setThreadNamePrefix("MyExecutor-"); // 线程名前缀executor.initialize();return executor;}