1.消息的可靠性
RabbitMQ提供了Confirm的确认机制。
Confirm机制用于确认消息是否已经发送给了交换机
2.Java的实现
1.导入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency>
2.Confirm机制的生产者
package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {//声明队列名字 public static final String QUEUE_NAME="queueA";public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn = MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上Channel channel = conn.createChannel();//3 开启confirmchannel.confirmSelect();//3.声明了一个队列/*** queue – the name of the queue* durable – true代表创建的队列是持久化的(当mq重启后,该队列依然存在)* exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)* autoDelete – 该队列是否可以被mq服务器自动删除* arguments – 队列的其他参数,可以为null*/
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello doubleasdasda!";//生产者如何发送消息,使用下面的方法即可/*** exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机* routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字* other properties - 消息的其他属性,可以为null* body – 消息的内容,注意,要是有 字节数组*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//检查消息是否发送成功了try {/*** 判断是否发送到交换机上,如果发送到了返回true,* 如果因为交换机名字错了,发送不到交换机,则会抛出异常,会自动关闭channel*/if (channel.waitForConfirms()) {//如果返回true,代表交换机成功接收到了消息System.out.println("消息已经成功发送给了交换机");//关闭资源channel.close();}else {System.out.println("消息发送给交换机失败了");//关闭资源channel.close();}} catch (InterruptedException e) {System.out.println("消息发送给交换机失败了");System.out.println("失败的消息为:"+message);}conn.close();}
}
3.confirm 机制的消费者
package com.qf.mq2302.hello;import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class Recv {private final static String QUEUE_NAME="hello-queue";public static void main(String[] args) throws Exception {//1.获取连接对象Connection conn = MQUtils.getConnection();//2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上Channel channel = conn.createChannel();/*** 第一个参数队列名称* 第二个参数,耐用性* 第三个参数排外性* 第四个参数是否自动删除* 第五个参数,可以定义什么类型的队列*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中DeliverCallback deliverCallback =new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {System.out.println(consumerTag);//从Delivery对象中可以获取到生产者,发送的消息的字节数组byte[] body = message.getBody();String msg = new String(body, "utf-8");//在这里写消费者的业务逻辑,例如,发送邮件System.out.println(msg);}};//4.让当前消费者开始消费(QUEUE_NAME)队列中的消息/*** queue – the name of the queue* autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。* deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑* cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});}}
3.整合springboot实现
1.导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.yml配置文件
spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /testpublisher-confirm-type: correlated #在springboot 项目下开启生产者的confirm机制
3.RabbitMQ配置文件
package com.qf.bootmq2302.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();//设置连接工厂对象rabbitTemplate.setConnectionFactory(cachingConnectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("correlationData:"+correlationData.getId());System.out.println("correlationData:"+new String(correlationData.getReturnedMessage().getBody()));//通过id可以去redis 里取 value消息//代表消息是否发送给交换机成功,发送失败false ,发送成功 trueSystem.out.println("ack:"+ack);//代表错误的原因System.out.println("cause:"+cause);}});return rabbitTemplate;}}
4.生产者写一个Controller
@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/test1")public String test1(String msg,String routkey){System.out.println(msg);String exchangeName = "";//默认交换机String routingkey = routkey;//队列名字//创建一个 CorrelationData 对象CorrelationData correlationData = new CorrelationData();correlationData.setId("001");Message message = new Message(msg.getBytes(), null);correlationData.setReturnedMessage(message);//要把消息的内容和消息的编号 存放到redis中, key=消息编号,value=消息内容//key = bootmq:failmessage:001//生产者发送消息//第四个参数,可以携带自定义的correlationDatarabbitTemplate.convertAndSend(exchangeName,routingkey,msg,correlationData);return "ok";}
5.消费者写一个接收队列消息
@RabbitListener(queues = "queueA")public void getMsg1(Map<String,Object> data, Channel channel,Message message) throws IOException {System.out.println(data);//手动ack//若开启手动ack,不给手动ack,就按照 prefetch: 1 #等价于basicQos(1)的量,就这么多,不会多给你了,因为你没有确认。确认一条,就给你一条channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
6.消费者的配置文件
spring:rabbitmq:host: 8.140.244.227port: 6786username: testpassword: testvirtual-host: /test#手动ACKlistener:simple:acknowledge-mode: manual # 手动ackprefetch: 1 #等价于basicQos(1)