一、概述
目前网上关于防止库存超卖,我没找到可以支持一次购买多件的,都是基于一次只能购买一件做的秒杀方案,但是实际场景中,一般秒杀活动都是支持1~5件的,因此为了补缺,写了此文,方便自己之后使用。
二、建表
1、商品表
CREATE TABLE `product_test` (`product_id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '商品ID',`stock` int(11) unsigned DEFAULT NULL COMMENT '商品库存',PRIMARY KEY (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='商品表';
2、订单记录表
CREATE TABLE `order_test` (`order_id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '订单ID',`product_id` int(10) unsigned NOT NULL COMMENT '商品ID',`sale` int(11) DEFAULT '0' COMMENT '下单数量',PRIMARY KEY (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='订单日志表';
3、向商品表内插入一条记录
INSERT INTO product_test (`stock`) VALUES (1300);
三、不同方案下测试报告
方案一:只将MySQL 库存字段设置了无符号
该方案存在超卖情况。测试总请求数20000,并发数8000的情况下,反应较慢。该方案在高并发下不可行,不是高并发情况下也有风险。
方案二:使用MySQL排它锁(for update)
该方案无超卖情况,但是响应时间过长。我使用20000请求,8000并发的测试情况下,时间均在11s ~ 13s 之间响应速度感人,不推荐高并发下采用该方案。
方案三:使用Redis队列【推荐方案】
该方案无超卖情况,相应速度较MySQL排它锁方案响应速度提高很多使用20000总请求,8000并发,每个请求平均响应时间5.38秒
使用30000总请求,1000并发,每个请求平均响应时间3.77秒
使用10000总请求,1000并发,每个请求平均响应时间1.23秒
使用5000总请求,1000并发,每个请求平均响应时间0.61秒需要注意,采用该方案,Redis中的商品库存数据一定要提前生成,而不是等查询时生成,应该增加商品数据时,就实时添加库存数据到Redis中,之后所有操作都从Redis操作(包括增删改查),之后持久化同步到数据库,可以采用异步消息队列方式。如果是旧系统,则应该写个脚本,先把数据库上只读锁,然后将商品库存预热到Redis中,再解开MySQL的只读锁,之后所有库存操作都在Redis中进行
方案四:使用Redis事务监听【待补充】
四、代码部分
方案一、只将库存字段设置为无符号
<?php
declare(strict_types = 1);class OperateStock
{protected $pdo = null;CONST REDUCE_STOCK = 1;// 减少库存操作CONST INCREASE_STOCK = 2;// 增加库存操作/*** 下订单扣减库存** @param int $productId* @param int $num*/public function placeOrder(int $productId, int $num){$stock = $this->getProductStock($productId);if ($num > $stock) {return $this->response(0, '超出库存,无法下单');}// 执行扣减库存操作$res = $this->changeStock($productId, $num);// 记录日志$this->recordOrderLog($productId, $num);return $res;}/*** 修改商品库存** @param int $productId* @param int $num* @param int $action* @return string*/private function changeStock(int $productId, int $num, int $action = self::REDUCE_STOCK){$operateAction = $action == self::REDUCE_STOCK ? '-' : '+';try {$sql = 'update product_test set stock = stock '.$operateAction.' '.$num. " where product_id = $productId";$this->getMySQL()->query($sql);} catch (\Exception $e) {return $this->response(0, $e->getMessage());}return $this->response(1, 'success');}/*** 记录销售日志** @param int $productId* @param int $num*/private function recordOrderLog(int $productId, int $num){$sql = "insert into order_test (product_id,sale) values ($productId,$num)";$this->getMySQL()->query($sql);}/*** 获取MySQL连接** @return PDO*/private function getMySQL(){if (false == $this->pdo) {$dsn = 'mysql:host=127.0.0.1;dbname=test';$this->pdo = new \PDO($dsn, 'root', '123456');}return $this->pdo;}/*** 获取商品库存数** @param $productId* @return mixed*/private function getProductStock($productId){// 查询库存$sql = "select stock from product_test where product_id = $productId limit 1";$stock = $this->getMySQL()->query($sql)->fetch(2);return $stock['stock'];}/*** 统一响应** @param int $statusCode* @param string $msg* @param array $data* @return string*/private function response(int $statusCode, string $msg, array $data = []){$data = ['status' => $statusCode,'msg' => $msg,'data' => $data];return json_encode($data);}
}
// 生成随机的商品下单数,使用ab压测工具测试并发下是否超卖 ab -n 20000 -c 8000 http://test.cn/
$num = rand(1, 300);
$object = new OperateStock();
print_r($object->placeOrder(1, $num));
2、使用 MySQL 排它锁(FOR UPDATE )方案
在事务中执行,并且在首次查商品剩余库存时,就将排它锁加上,注意,是查询时就加上,而不是操作时再加
<?php
declare(strict_types = 1);class OperateStock
{protected $pdo = null;CONST REDUCE_STOCK = 1;// 减少库存操作CONST INCREASE_STOCK = 2;// 增加库存操作/*** 下订单扣减库存** @param int $productId* @param int $num*/public function placeOrder(int $productId, int $num){try {// 开启事务$this->getMySQL()->beginTransaction();$stock = $this->getProductStock($productId);if ($num > $stock) {return $this->response(0, '超出库存,无法下单');}// 执行扣减库存操作$res = $this->changeStock($productId, $num);// 记录日志$this->recordOrderLog($productId, $num);$this->getMySQL()->commit();} catch (\Exception $e) {$this->getMySQL()->rollBack();$this->response(0, $e->getMessage());}return $res;}private function changeStock(int $productId, int $num, int $action = self::REDUCE_STOCK){$operateAction = $action == self::REDUCE_STOCK ? '-' : '+';try {$sql = 'update product_test set stock = stock '.$operateAction.' '.$num. " where product_id = $productId";$this->getMySQL()->query($sql);} catch (\Exception $e) {return $this->response(0, $e->getMessage());}return $this->response(1, 'success');}/*** 记录销售日志** @param int $productId* @param int $num*/private function recordOrderLog(int $productId, int $num){$sql = "insert into order_test (product_id,sale) values ($productId,$num)";$this->getMySQL()->query($sql);}/*** 获取MySQL连接** @return PDO*/private function getMySQL(){if (false == $this->pdo) {$dsn = 'mysql:host=127.0.0.1;dbname=test';$this->pdo = new \PDO($dsn, 'root', '123456');}return $this->pdo;}/*** 获取商品库存数** @param $productId* @return mixed*/private function getProductStock($productId){// 查询库存$sql = "select stock from product_test where product_id = $productId limit 1 for update";$stock = $this->getMySQL()->query($sql)->fetch(2);return $stock['stock'];}/*** 统一响应** @param int $statusCode* @param string $msg* @param array $data* @return string*/private function response(int $statusCode, string $msg, array $data = []){$data = ['status' => $statusCode,'msg' => $msg,'data' => $data];return json_encode($data);}/*** 获取日志表中销量总量** @author cyf*/public function getSaleSum(int $productId){$sql = 'select sum(sale) from order_test where product_id = '.$productId;$data = $this->getMySQL()->query($sql)->fetch(2);return $this->response(1, 'success', [$data]);}}
// 生成随机的商品下单数,使用ab压测工具测试并发下是否超卖 ab -n 10000 -c 5000 http://test.cn/
$num = rand(1, 300);
$object = new OperateStock();
print_r($object->placeOrder(1, $num));
//print_r($object->getSaleSum(1));
3、使用 Redis 队列方案【推荐】
<?php
declare(strict_types = 1);class OperateStock
{protected $pdo = null;protected $redis = null;protected $stockKeyPre = 'product_stock_';// 商品库存redis key前缀CONST REDUCE_STOCK = 1;// 减少库存操作CONST INCREASE_STOCK = 2;// 增加库存操作/*** 下订单扣减库存** @param int $productId* @param int $num* @return string* @throws Exception*/public function placeOrder(int $productId, int $num){try {$this->reduceStock($productId, $num);// 推送消息队列,对数据库中库存数据进行异步扣减// 记录订单销售日志$this->recordOrderLog($productId, $num);return $this->response(1, '下单成功');} catch (\Exception $e) {return $this->response(0, $e->getMessage());}}/*** 扣减库存** @param int $productId* @param int $num* @return bool* @throws Exception*/private function reduceStock(int $productId, int $num){$redis = $this->getRedis();$key = $this->stockKeyPre.$productId;$valueArray = [];try {for ($i = 0; $i < $num; $i++) {$res = $redis->rPop($key);if (false == $res) {throw new \Exception('库存不够啦');}$valueArray[] = $res;}return true;} catch (\Exception $e) {// 手动对已经下单的数据进行回滚,并抛出异常给上游调用方foreach ($valueArray as $v) {$redis->lPush($key, $v);}throw new \Exception('库存不够啦');}}/*** 增删改商品时,重置Redis内的该商品的库存【测试方法】** @author cyf*/public function resetStockToRedis(int $productId, int $num){$redis = $this->getRedis();$key = $this->stockKeyPre.$productId;for($i = 1; $i <= $num; $i++) {$redis->lpush($key, $i);}return $this->response(1, 'success');}/*** 记录销售日志** @param int $productId* @param int $num*/private function recordOrderLog(int $productId, int $num){$sql = "insert into order_test (product_id,sale) values ($productId,$num)";$this->getMySQL()->query($sql);}/*** 获取MySQL连接** @return PDO*/private function getMySQL(){if (false == $this->pdo) {$dsn = 'mysql:host=127.0.0.1;dbname=test';$this->pdo = new \PDO($dsn, 'root', '123456');}return $this->pdo;}/*** 获取Redis连接** @return null|Redis*/private function getRedis(){if (false == $this->redis) {$redis = new Redis();$redis->connect('127.0.0.1', 6379);$redis->auth('haveyb');$this->redis = $redis;}return $this->redis;}/*** 统一响应** @param int $statusCode* @param string $msg* @param array $data* @return string*/private function response(int $statusCode, string $msg, array $data = []){$data = ['status' => $statusCode,'msg' => $msg,'data' => $data];return json_encode($data);}/*** 获取日志表中销量总量** @param int $productId* @return string*/public function getSaleSum(int $productId){$sql = 'select sum(sale) from order_test where product_id = '.$productId;$data = $this->getMySQL()->query($sql)->fetch(2);return $this->response(1, 'success', [$data]);}}$object = new OperateStock();
// 先生成商品的队列结构库存,这个数据一定是抢购前就生成好的,而不是查询redis数据查不到时才去生成的,否则并发情况下会出错
//$object->resetStockToRedis(1, 2000);// 生成随机的商品下单数,使用ab压测工具测试并发下是否超卖 ab -n 30000 -c 6000 http://test.cn/
//$num = 1;
$num = rand(1, 299);
print_r($object->placeOrder(1, $num));// 获取订单日志中该商品实际销售总数,主要用于核对校验并发状况下,是否超卖
//print_r($object->getSaleSum(1));
方案四:使用Redis事务监听【待补充】
原文链接:老迟笔记-高并发下防止库存超卖解决方案