文章目录
- 6.消息重复消费问题
- 6.1问题介绍
- 6.2解决思路
- 6.3将该消息存储到Redis
- 6.3.1将id存入string(单消费者场景)
- (1)实现思路
- (2)问题
- 6.3.2将id存入list中(多消费场景)
- (1)实现思路
- 6.3.3将id以key增量存入string中并设置过期时间
- (1)实现思路
- 6.4总结
6.消息重复消费问题
6.1问题介绍
什么是消息重复消费?首先我们来看一下消息的传输流程。消息生产者–>MQ–>消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。
所以消息重复也就出现在 两个阶段
1 :生产者多发送了消息给MQ;
2 :MQ的一条消息被消费者消费了多次。
具体场景如下:
- 生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,这时候生产者就会重新发送这条消息,导致MQ会接收到重复消息。
- 消费者消费成功后,给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息不丢失,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。由于重复消息是由于网络原因造成的,无法避免。
6.2解决思路
- 发送消息时让每个消息携带一个全局的唯一ID
- 在消费消息时先判断消息是否已经被消费过,保证消息消费逻辑的幂等性。具体消费过程为:
- 消费者获取到消息后先根据id去查询redis/db是否存在该消息
- 如果不存在,则正常消费,消费完毕后写入redis/db
- 如果存在,则证明消息被消费过,直接丢弃
6.3将该消息存储到Redis
6.3.1将id存入string(单消费者场景)
(1)实现思路
- 将id号存入value中,并且value类型为string
- 即以队列名称为key,以消息id为值
- 每次消息过来都覆盖之前的消息
@RabbitListener(queues = "queueName4")//发送的队列名称 @RabbitListener注解到类和方法都可以@RabbitHandlerpublic void receiveMessage1(Message message) throws UnsupportedEncodingException {//获取唯一idString messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"utf-8");//获取redis中该队列名称对应的value值String messageRedisValue = redisUtil.get("queueName4","");//检验唯一id是否存在if (messageRedisValue.equals(messageId)) {//存在return;}System.out.println("消息:"+msg+", id:"+messageId);//以队列为key,id为valueredisUtil.set("queueName4",messageId);}
(2)问题
- 并发冲突:如果多个消费者同时操作 Redis 中的已消费消息列表,由于 Redis 是单线程处理命令,可能会出现并发冲突导致数据不一致或丢失问题。特别是在高并发情况下,使用字符串类型的 ID 可能会增加并发冲突的风险
- 内存占用:字符串类型的 ID 在内存中占用空间相对较大,尤其是对于大量消息的情况下,会增加 Redis 的内存占用。
- 比较效率:字符串类型的 ID 比较起来相对复杂,需要进行字符串比较操作。
6.3.2将id存入list中(多消费场景)
(1)实现思路
- 以该队列名称为key,id为value
- 适合多消费场景的原因:
- 顺序性:List 是一个有序集合,可以按照消息的顺序存储消息 ID。在多消费者场景下,保持消息的顺序通常是很重要的,以确保消息按照正确的顺序被消费。
- 原子性操作:Redis 的 List 提供了多个原子性操作,比如从列表两端推入/弹出元素,这些操作可以确保多个消费者同时访问列表时不会出现数据竞争和并发问题。
- 支持阻塞操作:List 提供了阻塞式的弹出操作(如 BLPOP、BRPOP),可以在没有消息时阻塞等待新消息的到来,这对于实现消费者轮询机制非常有用。
@RabbitListener(queues = "queueName4")//发送的队列名称 @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {String messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"utf-8");//获取List<String> messageRedisValue = redisUtil.lrange("queueName4");if (messageRedisValue.contains(messageId)) {return;}System.out.println("消息:"+msg+", id:"+messageId);redisUtil.lpush("queueName4",messageId);//存入list
}
6.3.3将id以key增量存入string中并设置过期时间
(1)实现思路
以消息id为key,消息内容为value存入string中,设置过期时间( 可承受的redis服务器异常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)
@RabbitListener(queues = "queueName4")//发送的队列名称 @RabbitListener注解到类和方法都可以@RabbitHandlerpublic void receiveMessage2(Message message) throws UnsupportedEncodingException {String messageId = message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"utf-8");String messageRedisValue = redisUtil.get(messageId,"");if (msg.equals(messageRedisValue)) {return;}System.out.println("消息:"+msg+", id:"+messageId);//以id为key,消息内容为value,过期时间10分钟redisUtil.set(messageId,msg,10L);}
6.4总结
该篇文章介绍了消息重复消费问题及解决方案,问题可能产生的两个阶段(生产消息多发、消费者重复消息);解决方案:将消息发送时携带一个唯一id,消费方拿到消息时先去reids/db中有没有该数据,若没有则可以消费,否则不可以消费;并介绍了基于Redsi解决消息重复消费问题,①以队列名称为key,消息id为value,且value为string类型(适合只有一个消费方)②以队列名称为key,消息id为value,且value为list类型(适合有多个消费方场景)③以消息id为key,内容为value,并设置过期时间