java确认rabbitmq_RabbitMQ 消息确认机制

生产端 Confirm 消息确认机制

消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障!

Confirm 确认机制流程图

c7f4ac74c2f410afb2113e39ede86cf8.png

如何实现Confirm确认消息?

第一步:在 channel 上开启确认模式: channel.confirmSelect()

第二步:在 channel 上添加监听: channel.addConfirmListener(ConfirmListener listener);, 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.ConfirmListener;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class ConfirmProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_confirm_exchange";

String routingKey = "item.update";

//指定消息的投递模式:confirm 确认模式

channel.confirmSelect();

//发送

final long start = System.currentTimeMillis();

for (int i = 0; i < 5 ; i++) {

String msg = "this is confirm msg ";

channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

System.out.println("Send message : " + msg);

}

//添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息

channel.addConfirmListener(new ConfirmListener() {

/**

* 返回成功的回调函数

*/

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

System.out.println("succuss ack");

System.out.println(multiple);

System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");

}

/**

* 返回失败的回调函数

*/

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

System.out.printf("defeat ack");

System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");

}

});

}

}

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConfirmConsumer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(3000);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_confirm_exchange";

String queueName = "test_confirm_queue";

String routingKey = "item.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

channel.queueDeclare(queueName, false, false, false, null);

//一般不用代码绑定,在管理界面手动绑定

channel.queueBind(queueName, exchangeName, routingKey);

//创建消费者并接收消息

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

}

};

//设置 Channel 消费者绑定队列

channel.basicConsume(queueName, true, consumer);

}

}

我们此处只关注生产端输出消息

Send message : this is confirm msg

Send message : this is confirm msg

Send message : this is confirm msg

Send message : this is confirm msg

Send message : this is confirm msg

succuss ack

true

耗时:3ms

succuss ack

true

耗时:4ms

注意事项

我们采用的是异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会回调这个方法。除此之外还有单条同步 confirm 模式、批量同步 confirm 模式,由于现实场景中很少使用我们在此不做介绍,如有兴趣直接参考官方文档。

我们运行生产端会发现每次运行结果都不一样,会有多种情况出现,因为 Broker 会进行优化,有时会批量一次性 confirm ,有时会分开几条 confirm。

succuss ack

true

耗时:3ms

succuss ack

false

耗时:4ms

或者

succuss ack

true

耗时:3ms

Return 消息机制

Return Listener 用于处理一-些不可路 由的消息!

消息生产者,通过指定一个 Exchange 和 Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!

但是在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定的路由 key 路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return Listener !

在基础API中有一个关键的配置项:Mandatory:如果为 true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false,那么 broker 端自动删除该消息!

Return 消息机制流程图

0ccebe402d0ff9cb0e9aa9d19cd48c38.png

Return 消息示例

首先我们需要发送三条消息,并且故意将第 0 条消息的 routing Key设置为错误的,让他无法正常路由到消费端。

mandatory 设置为 true 路由不可达的消息会被监听到,不会被自动删除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());

最后添加监听即可监听到不可路由到消费端的消息channel.addReturnListener(ReturnListener r))

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReturnListeningProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_return_exchange";

String routingKey = "item.update";

String errRoutingKey = "error.update";

//指定消息的投递模式:confirm 确认模式

channel.confirmSelect();

//发送

for (int i = 0; i < 3 ; i++) {

String msg = "this is return——listening msg ";

//@param mandatory 设置为 true 路由不可达的消息会被监听到,不会被自动删除

if (i == 0) {

channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());

} else {

channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());

}

System.out.println("Send message : " + msg);

}

//添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息

channel.addConfirmListener(new ConfirmListener() {

/**

* 返回成功的回调函数

*/

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

System.out.println("succuss ack");

}

/**

* 返回失败的回调函数

*/

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

System.out.printf("defeat ack");

}

});

//添加一个 return 监听

channel.addReturnListener(new ReturnListener() {

public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("return relyCode: " + replyCode);

System.out.println("return replyText: " + replyText);

System.out.println("return exchange: " + exchange);

System.out.println("return routingKey: " + routingKey);

System.out.println("return properties: " + properties);

System.out.println("return body: " + new String(body));

}

});

}

}

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReturnListeningConsumer {

public static void main(String[] args) throws Exception {

//1. 创建一个 ConnectionFactory 并进行设置

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(3000);

//2. 通过连接工厂来创建连接

Connection connection = factory.newConnection();

//3. 通过 Connection 来创建 Channel

Channel channel = connection.createChannel();

//4. 声明

String exchangeName = "test_return_exchange";

String queueName = "test_return_queue";

String routingKey = "item.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

channel.queueDeclare(queueName, false, false, false, null);

//一般不用代码绑定,在管理界面手动绑定

channel.queueBind(queueName, exchangeName, routingKey);

//5. 创建消费者并接收消息

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

}

};

//6. 设置 Channel 消费者绑定队列

channel.basicConsume(queueName, true, consumer);

}

}

我们只关注生产端结果,消费端只收到两条消息。

Send message : this is return——listening msg

Send message : this is return——listening msg

Send message : this is return——listening msg

return relyCode: 312

return replyText: NO_ROUTE

return exchange: test_return_exchange

return routingKey: error.update

return properties: #contentHeader(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)

return body: this is return——listening msg

succuss ack

succuss ack

succuss ack

消费端 Ack 和 Nack 机制

消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功!消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker!一般我们在实际应用中,都会关闭重回队列,也就是设置为False。

参考 api

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

void basicAck(long deliveryTag, boolean multiple) throws IOException;

如何设置手动 Ack 、Nack 以及重回队列

首先我们发送五条消息,将每条消息对应的循环下标 i 放入消息的 properties 中作为标记,以便于我们在后面的回调方法中识别。

其次, 我们将消费端的 ·channel.basicConsume(queueName, false, consumer); 中的 autoAck属性设置为 false,如果设置为true的话 将会正常输出五条消息。

我们通过 Thread.sleep(2000)来延时一秒,用以看清结果。我们获取到properties中的num之后,通过channel.basicNack(envelope.getDeliveryTag(), false, true);将 num为0的消息设置为 nack,即消费失败,并且将 requeue属性设置为true,即消费失败的消息重回队列末端。

import com.rabbitmq.client.*;

import java.util.HashMap;

import java.util.Map;

public class AckAndNackProducer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

String exchangeName = "test_ack_exchange";

String routingKey = "item.update";

String msg = "this is ack msg";

for (int i = 0; i < 5; i++) {

Map headers = new HashMap();

headers.put("num" ,i);

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()

.deliveryMode(2)

.headers(headers)

.build();

String tem = msg + ":" + i;

channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes());

System.out.println("Send message : " + msg);

}

channel.close();

connection.close();

}

}

import com.rabbitmq.client.*;

import java.io.IOException;

public class AckAndNackConsumer {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setVirtualHost("/");

factory.setUsername("guest");

factory.setPassword("guest");

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(3000);

Connection connection = factory.newConnection();

final Channel channel = connection.createChannel();

String exchangeName = "test_ack_exchange";

String queueName = "test_ack_queue";

String routingKey = "item.#";

channel.exchangeDeclare(exchangeName, "topic", true, false, null);

channel.queueDeclare(queueName, false, false, false, null);

//一般不用代码绑定,在管理界面手动绑定

channel.queueBind(queueName, exchangeName, routingKey);

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,

AMQP.BasicProperties properties, byte[] body)

throws IOException {

String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

if ((Integer) properties.getHeaders().get("num") == 0) {

channel.basicNack(envelope.getDeliveryTag(), false, true);

} else {

channel.basicAck(envelope.getDeliveryTag(), false);

}

}

};

//6. 设置 Channel 消费者绑定队列

channel.basicConsume(queueName, false, consumer);

}

}

我们此处只关心消费端输出,可以看到第 0 条消费失败重新回到队列尾部消费。

[x] Received 'this is ack msg:1'

[x] Received 'this is ack msg:2'

[x] Received 'this is ack msg:3'

[x] Received 'this is ack msg:4'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

[x] Received 'this is ack msg:0'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/340137.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

hadoop源码分析_Spark2.x精通:Job触发流程源码深度剖析(一)

&#xff0c; 一、概述 之前几篇文章对Spark集群的Master、Worker启动流程进行了源码剖析&#xff0c;后面直接从客户端角度出发&#xff0c;讲解了spark-submit任务提交过程及driver的启动&#xff1b;集群启动、任务提交、SparkContext初始化等前期准备工作完成之后&am…

如何在Java中将String转换为int

在本教程中&#xff0c;我们将看到将Java中的String转换为int&#xff08;或Integer&#xff09;的各种方法。 您可以使用以下任何一种方式&#xff1a; –使用Integer.parseInt&#xff08;string&#xff09; –使用Integer.valueof&#xff08;string&#xff09; –使用…

jboss 程序位置_介绍JBoss BPM Suite安装程序

jboss 程序位置本周&#xff0c;我们想向您介绍JBoss BRMS和JBoss BPM Suite产品随附的一个鲜为人知的安装程序组件。 请注意&#xff0c;当前所有的演示项目都要求您下载JBoss BPM Suite可部署的eap zip产品文件和JBoss EAP 6.1.1 zip产品文件。 展望未来&#xff0c;我们将迁…

java换成中文_如果我们的编程替换成中文会变成怎样?

首先大概的看一下中文编码&#xff1a;你以为会写中文就会编程吗&#xff1f;这就像你以为会写汉字就会写出好文章一样。编程是和机器沟通&#xff0c;因此要用机器的语言而不是人类的语言。最早的程序就是0和1的数字&#xff0c;不是中文也不是英文。以前的程序员&#xff0c;…

高等数学公式大全_高中物理知识思维导图大全,赶紧收藏!

物理作为理综的重中之重&#xff0c;物理的学习一直是广大考生的难点。如何快捷高效的掌握物理知识点是高考复习的重点之一&#xff0c;根据高中物理三年知识点用思维导图的方式&#xff0c;来助大家掌握物理知识点。运动的描述 重力 基本相互作用 相互作用 牛顿运动定律 力的合…

go环境搭建_学习的golang第一步,搭建我们运行的环境,go! go! go

这是Golang教程系列中的第一个教程。本教程介绍了Go&#xff0c;并讨论了选择Go优于其他编程语言的优势。我们还将学习如何在Mac OS&#xff0c;Windows和Linux中安装Go。介绍Go也称为Golang是由Google开发的一种开源&#xff0c;编译和静态类型的编程语言。创造Go的关键人物是…

如何在Java中将数组转换为列表

你好朋友&#xff0c; 在本教程中&#xff0c;我们将学习将数组转换为List的各种方法。 package com.blogspot.javasolutionsguide;import com.google.common.collect.Lists; import org.apache.commons.collections4.CollectionUtils;import java.util.ArrayList; import ja…

html5网页制作代码_HTML5的网页设计教程

关注小编&#xff0c;教你如何制作网页HTML5是超文本标记语言(HyperText Markup Language)的第五代版本&#xff0c;它是书写网页代码的一种规范、一种标准。它通过标记符号来标记要显示的网页中的各个部分。浏览器根据这个标准显示其中的内容(如&#xff1a;文字如何处理&…

aop+注解 实现对实体类的字段校验_SpringBoot实现通用的接口参数校验

来自&#xff1a;掘金&#xff0c;作者&#xff1a;cipher链接&#xff1a;https://juejin.im/post/5af3c25b5188253064651c76原文链接&#xff1a;http://www.ciphermagic.cn/spring-boot-aop-param-check.html本文介绍基于Spring Boot和JDK8编写一个AOP&#xff0c;结合自定义…

java基础分享_一、java基础教程

1、java是一门比较纯粹的面向对象编程语言&#xff0c;所以java的所有代码都必须写在类的内部。1.1 java的可执行文件后缀名是".java"&#xff0c;例如HelloWorld.java&#xff0c;并且每个可执行文件内部&#xff0c;必须有且仅有一个public公共类/公共接口/公共抽象…

Spring Boot自定义横幅生成

每当我们启动Spring Boot应用程序时&#xff0c;都会显示如下所示的文本消息。 这称为横幅。 现在&#xff0c;如果我们可以创建一个特定于我们的Spring Boot应用程序的自定义横幅并使用它代替默认的Spring Boot横幅&#xff0c;那将不是一件很棒的事。 有很多方法可以生成和使…

java等待_Java学习:等待唤醒机制

等待唤醒机制线程的状态NEW 至今尚未启动的线程处于这种状态RUNNABLE 正在Java虚拟机中执行的线程处于这种状态BLOCKED 受阻塞并等待某个监视器锁的线程处于这种状态WAITING 无限期的等待另一个线程来执行某一待定操作的线程处于这种状态TIMED_WAITNG 等待另一个线程来执行取…

游戏ai 行为树_游戏AI –行为树简介

游戏ai 行为树游戏AI是一个非常广泛的主题&#xff0c;尽管有很多资料&#xff0c;但我找不到能以较慢且更易理解的速度缓慢介绍这些概念的东西。 本文将尝试解释如何基于行为树的概念来设计一个非常简单但可扩展的AI系统。 什么是AI&#xff1f; 人工智能是参与游戏的实体表现…

java构造器_Java构造器就是这么简单!

前言理解构造器之前&#xff0c;首先我们需要了解Java中为什么要引入构造器&#xff0c;以及构造器的作用。在很久之前&#xff0c;程序员们编写C程序总会忘记初始化变量&#xff08;这真的是一件琐碎但必须的事&#xff09;&#xff0c;C引入了 构造器(constructor) 的概念&am…

JavaFX技巧32:需要图标吗? 使用Ikonli!

动机 自2013年以来&#xff0c;我一直在编写JavaFX应用程序和库的代码&#xff0c;它们的共同点是&#xff0c;我需要找到可以用于它们的良好图标/图形。 作为前Swing开发人员&#xff0c;我首先使用图像文件&#xff0c;GIF或PNG。 通常&#xff0c;我会从IconExperience&…

java应用部署docker_Docker部署JavaWeb项目实战

摘要&#xff1a;本文主要讲了怎样在Ubuntu14.04 64位系统下来创建一个执行Java web应用程序的Docker容器。一、下载镜像、启动容器1、下载镜像先查看镜像docker images记住这个Image ID&#xff0c;下面我们启动容器须要用到它。假设看到以上输出&#xff0c;说明您能够使用“…

如何用Java创建不可变的Map

你好朋友&#xff0c; 在本教程中&#xff0c;我们将看到如何用Java创建不可变的Map。 –不可变的类或对象是什么意思&#xff1f; –什么是不可变地图&#xff1f; –如何在Java中创建不可变的Map&#xff1f; 不变的类或对象是什么意思&#xff1f; 不可变的类或对象是创…

quartz java 线程 不释放_java Quartz 内存泄漏

我用定时器启动应用的时候发现内存泄漏&#xff0c;具体报错如下&#xff1a;十月 30, 2015 2:30:12 下午 org.apache.catalina.startup.HostConfig undeploy信息: Undeploying context [/ChinaMoney Maven Webapp]十月 30, 2015 2:30:15 下午 org.apache.catalina.loader.Weba…

在ultraedit查找每行第二个单词_新手收藏!亚马逊关键字查找

亚马逊销售中最重要的是“排名”。而“关键字”对提高排名很重要。搜索结果对亚马逊的销售产生重大影响。要想让你的产品被显示在搜索结果的顶部&#xff0c;那你必须选择有效的关键字。搜索关键词排名一直上不去&#xff0c;你可能会这么想&#xff1a;“关键字不好吧......。…

java opencv磨皮算法_使用OPENCV简单实现具有肤质保留功能的磨皮增白算法

在一个美颜高手那里发现一个美颜算法&#xff0c;他写出了数学表达式&#xff0c;没有给出代码&#xff0c;正好在研究OPENCV&#xff0c;顺手实现之。具体过程就是一系列矩阵运算&#xff0c;据说是从一个PS高手那里研究 出来的&#xff0c;一并表示感谢。这是数学表达式&…