使用redis命令创建消息队列
在redis-cli中执行如下指令
XGROUP CREATE key groupName ID [MKSTREAM]
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列
示例:
XGROUP CREATE streams.orders g1 0 MKSTREAM
编写Lua脚本,向redis消息队列中发送消息
-- lua脚本中其他事项处理部分-- 获取调用的参数列表
-- 优惠卷id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]-- key
-- 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 订单key
local orderKey = 'seckill:order:' .. voucherId-- 业务
-- 判断库存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then-- 库存不足return 1
end
-- 判断用户是否已经下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then-- 存在说明重复下单return 2
end
-- 扣库存,下单
redis.call('incrby', stockKey, -1)
redis.call('sadd', orderKey, userId)
-- 发消息到队列, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)return 0
业务代码——执行Lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();// 从resources目录下加载脚本SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));// lua脚本执行返回值SECKILL_SCRIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {// 获取用户Long userId = UserHolder.getUser().getId();// 订单Idlong orderId = redisIdWorker.nextId("order");// 执行lua脚本int result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId)).intValue();// 判断结果为0if (result != 0) {// 不为0,没有购买资格return Result.fail(result == 1 ? "库存不足" : "不能重复下单");}// 获取代理对象(事务)proxy = (IVoucherOrderService) AopContext.currentProxy();// 返回订单信息return Result.ok(orderId);}
业务代码——从消息队列获取消息并处理
// 线程池private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();// 注解含义,在Bean被创建完毕后执行@PostConstructprivate void init() {// SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}// 从消息队列中获取消息,异步下单private class VoucherOrderHandler implements Runnable {String queueName = "stream.orders";@Overridepublic void run() {while (true) {try {// 获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 count 1 block 2000 STREAMS stream.order >List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 判断消息是否获取成功if (list == null || list.isEmpty()) {// 如果获取失败,说明没有消息,继续下一次循环continue;}// 解析消息中的订单消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 如果获取成功,执行下单handleVoucherOrder(voucherOrder);// ACK确认,SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("订单处理异常", e);// 发生异常后去pending-list中处理消息handlePendingList();}}}private void handlePendingList() {while (true) {try {// 获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 count 1 STREAMS stream.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 判断消息是否获取成功if (list == null || list.isEmpty()) {// 如果获取失败,说明pending-list没有异常消息,结束循环break;}// 解析消息中的订单消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 如果获取成功,执行下单handleVoucherOrder(voucherOrder);// ACK确认,SACK stream.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("处理pending-list订单处理异常", e);}}}}