消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长时间的任务并且只完成了部分突然就挂掉了,会发生什么情况?
RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,也无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制。
消息应答机制
消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。
1.自动应答
并不完善。需要一个良好的环境,不发生极端的情况。使用较少。容易产生消息都是。
2.手动应答
Channel.basicAck用于肯定确认,RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃。
Channel.basicNack用于否定确认。
Channel.basicReject用于否定确认,不处理该消息了直接拒绝,可以将其丢弃。比basicNack方法少了一个参数。
multiple 手动应答的好处在于可以批量应答且减少网络拥堵。
basicAck方法第二个参数,multiple为true表示批量。
批量应答,会将信道中的消息都进行应答。如果不是批量应答,只会应答信道中当前这条消息。
建议不批量应答,以免造成消息的丢失。
消息自动重新入队
如果消费者由于某些原因失去连接(通道已关闭,连接已关闭,TCP连接丢失),导致消息未发送ack确认,RabbitMQ发现消息未完全处理,并将其重新排队。如果此时其他消费者可以处理,它将很快将其重新分配给其他消费者。这样即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
如果防止消息丢失?
rabbitmq中有消息应答机制,不要采用自动应答,需要一个良好的条件以及不能发生极端的情况,不建议使用。而是应该采用手动应答,手动应答又分为批量应答和非批量应答。批量应答会将信道里的消息都进行应答,不建议使用,以免造成消息丢失。非批量应答只会应答当前这条消息。
如果消息真的发生了丢失怎么办?
应该将消息自动重新入队。
代码
package com.xkj.org.mq.ack;import com.rabbitmq.client.Channel;
import com.xkj.org.utils.RabbitMQUtil;import java.io.IOException;
import java.util.Scanner;public class Task01 {private static final String QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMQUtil.getChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//发送消息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()) {String message = scanner.next();channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));}}
}
package com.xkj.org.mq.ack;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xkj.org.utils.RabbitMQUtil;import java.io.IOException;public class Worker01 {private static final String QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMQUtil.getChannel();DeliverCallback deliverCallback = (consumerTag, message) -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("接受到消息:"+ new String(message.getBody(), "UTF-8"));//第一个参数,消息标记tag//第二个参数,false非批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = consumerTag -> {System.out.println("work1 消息消费被中断");};System.out.println("worker1等待1s接收消息.......");//设置手动应答channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);}}
package com.xkj.org.mq.ack;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xkj.org.utils.RabbitMQUtil;import java.io.IOException;public class Worker02 {private static final String QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMQUtil.getChannel();DeliverCallback deliverCallback = (consumerTag, message) -> {try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("接受到消息:"+ new String(message.getBody(), "UTF-8"));//第一个参数,消息标记tag//第二个参数,false非批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};CancelCallback cancelCallback = consumerTag -> {System.out.println("work2 消息消费被中断");};System.out.println("worker2等待10s接收消息.......");//设置手动应答channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);}}