引言
在之前的文章中,我们介绍了如何使用 Redis 和 Lua 脚本来应对秒杀活动中的高并发请求,并通过引入阻塞队列实现异步下单来提升系统性能。然而,在高并发场景下,阻塞队列的容量和处理速度可能会成为瓶颈。这篇文章将介绍如何使用 Redis Stream 队列进一步优化秒杀系统,提升整体性能和稳定性。
方案设计
基本思路
- 用户发起秒杀请求,通过 Redis Lua 脚本进行资格判断和库存扣减。
- Lua 脚本将订单信息写入 Redis Stream 队列。
- 后台线程从 Redis Stream 队列中读取订单信息并处理订单,确保数据库操作的线程安全和高效。
- 返回订单 ID 给用户。
具体实现
Lua 脚本
Lua 脚本负责判断秒杀资格、扣减库存,并将订单信息写入 Redis Stream 队列。
-- 1.参数列表
-- 1.1.优惠券ID
local voucherId = ARGV[1]
-- 1.2.用户ID
local userId = ARGV[2]
local orderId = ARGV[3]-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId-- 3.脚本逻辑
-- 3.1.判断库存是否足够
if (tonumber(redis.call('get', stockKey)) <= 0) thenreturn 1
end
-- 3.2.判断用户是否已经抢购过
if (redis.call('sismember', orderKey, userId) == 1) thenreturn 2
end
-- 3.3.减少库存
redis.call('incrby', stockKey, -1)
-- 3.4.记录用户下单
redis.call('sadd', orderKey, userId)
-- 3.5.将订单信息写入Redis Stream
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
Java 代码
在 Java 代码中,我们通过 Redis Stream 队列实现异步下单,并利用 Redisson 分布式锁确保订单操作的线程安全。
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Resourceprivate ISeckillVoucherService seckillVoucherService;@Resourceprivate RedisIdWorker redisIdWorker;@Resourceprivate StringRedisTemplate stringRedisTemplate;@Resourceprivate RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {String queueName = "stream.orders";@Overridepublic void run() {while (true) {try {// 从Redis Stream队列中获取订单信息List<MapRecord<String, Object, Object>> list =stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 判断是否成功获取订单信息if (list == null || list.isEmpty()) {continue;}// 解析订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 处理订单handleVoucherOrder(voucherOrder);// ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理订单异常", e);handlePendingList();}}}private void handlePendingList() {while (true) {try {// 从Pending List中获取未处理的订单信息List<MapRecord<String, Object, Object>> list =stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 判断是否成功获取订单信息if (list == null || list.isEmpty()) {break;}// 解析订单信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 处理订单handleVoucherOrder(voucherOrder);// ACK确认stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理Pending List订单异常", e);try {Thread.sleep(50);} catch (InterruptedException ex) {ex.printStackTrace();}}}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if (!isLock) {log.error("不允许重复下单");return;}try {proxy.createVoucherOrder(voucherOrder);} finally {lock.unlock();}}private IVoucherOrderService proxy;@Overridepublic Result seckillVoucher(Long voucherId) {Long userId = UserHolder.getUser().getId();long orderId = redisIdWorker.nextId("order");Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(),userId.toString(), String.valueOf(orderId));int r = result.intValue();if (r != 0) {return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}proxy = (IVoucherOrderService) AopContext.currentProxy();return Result.ok(orderId);}@Override@Transactionalpublic void createVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.error("不允许重复下单!");return;}boolean success = seckillVoucherService.update().setSql("stock = stock - 1").eq("voucher_id",voucherOrder.getVoucherId()).gt("stock", 0).update();if (!success) {log.error("库存不足!");return;}save(voucherOrder);}
}
代码详解
初始化和启动订单处理线程
我们在服务启动时,通过 @PostConstruct
注解确保订单处理线程被初始化和启动。
订单处理逻辑
通过 VoucherOrderHandler
类,从 Redis Stream 队列中读取订单信息并处理订单,确保订单的创建和库存的扣减是原子操作。
订单处理方法
在订单处理方法 handleVoucherOrder
中,我们通过 Redisson 分布式锁确保同一用户不会重复下单。
结论
通过引入 Redis Stream 队列,我们进一步优化了秒杀系统的性能和稳定性。Stream 队列不仅解决了阻塞队列容量的限制问题,还提供了更强大的消息处理能力,适用于各种高并发场景。这种优化方法不仅适用于秒杀活动,还可以推广到其他需要高性能处理的系统中。希望这些改进对你的系统设计有所帮助。