使用 RabbitMQ 实现消息队列
- 导入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--防止消息转换时的乱码-->
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
- 配置RabbitMQ
spring:rabbitmq:host: 192.168.72.100port: 5672 # 端口virtual-host: / # 虚拟主机username: zhl36 # 用户名password: zhl15737979065 # 密码
- 定义消息转换器,将 需要发送的消息对象 转成 json,否则在 RabbitMQ 的控制台获取的消息可能乱码
@Configuration
public class MessageConvert {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
- 编写生产者和消费者
@Service(value = "VoucherOrderServiceImplPlus")
public class VoucherOrderServiceImplPlus extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {@Autowiredprivate ISeckillVoucherService seckillVoucherService;/*** 全局id生成器*/@Autowiredprivate RedisIdWorker redisIdWorker;@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate RedissonClient redissonClient;// 使用 RabbitMQ 做消息队列@Autowiredprivate RabbitTemplate rabbitTemplate;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// 在类加载时加载lua脚本static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}/*** 生产者处理消息* @param voucherOrder*/public void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if (!isLock){log.error("请勿重复下单");}try {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,VOUCHERORDER_ROUTE_KEY,voucherOrder);} finally {lock.unlock();}}@Overridepublic Result secKillVoucher(Long voucherId) {/*** 执行lua脚本,判断返回值是否=0* ==0,可以正常下单* ==1,库存不足* ==2,用户重复下单*/Long userId = UserHolder.getUser().getId();Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString());int r = result.intValue();if (r != 0) {return Result.fail(r == 1 ? "库存不足" : "不能重复下单");}long orderId = redisIdWorker.nextId("order");VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setId(orderId);voucherOrder.setUserId(userId);voucherOrder.setVoucherId(voucherId);// 把有购买资格的,把下单信息保存到 RabbitMQhandleVoucherOrder(voucherOrder);return Result.ok(orderId);}/*** 消费者处理优惠券订单 * 交换机使用的是默认的 DIRECT 方式* @param voucherOrder*/@RabbitListener(bindings = @QueueBinding(value = @Queue("voucherOrder.queue1"),exchange = @Exchange(name = EXCHANGE_DIRECT, type = ExchangeTypes.DIRECT),key = {VOUCHERORDER_ROUTE_KEY}))@Overridepublic void createVocherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();// 查询订单int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.error("您已购买过");}// 库存--boolean success = seckillVoucherService.update().setSql("stock = stock - 1")// 实际上 stock > 0 是 CAS 以乐观锁的方式防止超卖 where voucher_id = ? and stock > 0 ?.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0).update();if (!success) {log.error("库存不足");}// 订单++save(voucherOrder);}
基于Redis实现消息队列
基于List 的消息队列
Redis 的List 数据结构是一个双向链表,很容易模拟出队列效果。
https://redis.io/docs/latest/commands/?group=list
可以使用 BLPOP 或 BRPOP 实现阻塞效果,当有消息时就处理并返回。BLPOP 结合 RPUSH 或 BRPOP 结合 LPUSH 来实现。
基于List的消息队列的优缺点
优点:
- 利用Redis 存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的消息队列
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
基于PubSub的消息队列的优缺点
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
发送消息的命令:XADD
读取消息的方式之一:XREAD
个人感觉太麻烦了,不如简单又好用的RabbitMQ😘