RabbitMQ交换器Exchange介绍与实践

导读

有了Rabbit的基础知识之后(基础知识详见:深入解读RabbitMQ工作原理及简单使用),本章我们重点学习一下Rabbit里面的exchange(交换器)的知识。

交换器分类

RabbitMQ的Exchange(交换器)分为四类:

  • direct(默认)
  • headers
  • fanout
  • topic

其中headers交换器允许你匹配AMQP消息的header而非路由键,除此之外headers交换器和direct交换器完全一致,但性能却很差,几乎用不到,所以我们本文也不做讲解。

**注意:**fanout、topic交换器是没有历史数据的,也就是说对于中途创建的队列,获取不到之前的消息。

1、direct交换器

direct为默认的交换器类型,也非常的简单,如果路由键匹配的话,消息就投递到相应的队列,如图:

使用代码:channel.basicPublish(“”, QueueName, null, message)推送direct交换器消息到对于的队列,空字符为默认的direct交换器,用队列名称当做路由键。

direct交换器代码示例

发送端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
channel.queueDeclare(config.QueueName, false, false, false, null);
String message = String.format("当前时间:%s", new Date().getTime());
// 推送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-路由的headers信息;参数四:消息主体】
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));

接收端,持续接收消息:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
channel.queueDeclare(config.QueueName, false, false, false, null);
Consumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "utf-8"); // 消息正文System.out.println(workName + "收到消息 => " + message);channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于当前id的消息】}
};
channel.basicConsume(config.QueueName, false, "", defaultConsumer);

接收端,获取单条消息

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 消息确认

持续消息获取使用:basic.consume;单个消息获取使用:basic.get。

注意:不能使用for循环单个消息消费来替代持续消息消费,因为这样性能很低;

消息的发后既忘特性

发后既往只的是接受者不知道消息的来源是谁发送的,如果想要指定消息的发送者,需要包含在发送内容里面,这点就像我们在信件里面注明自己的姓名一样,只有这样才能知道发送者是谁。

消息确认

看了上面的代码我们可以知道,消息接收到之后必须使用channel.basicAck()方法手动确认(非自动确认删除模式下),那么问题来了。

消息收到未确认会怎么样?

如果应用程序接收了消息,因为bug忘记确认接收的话,消息在队列的状态会从“Ready”变为“Unacked”,如图:

如果消息收到却未确认,Rabbit将不会再给这个应用程序发送更多的消息了,这是因为Rabbit认为你没有准备好接收下一条消息。

此条消息会一直保持Unacked的状态,直到你确认了消息,或者断开与Rabbit的连接,Rabbit会自动把消息改完Ready状态,分发给其他订阅者。

当然你可以利用这一点,让你的程序延迟确认该消息,直到你的程序处理完相应的业务逻辑,这样可以有效的防治Rabbit给你过多的消息,导致程序崩溃。

消息确认Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);

channel.basicAck(long deliveryTag, boolean multiple)为消息确认,参数1:消息的id;参数2:是否批量应答,true批量确认小于次id的消息。

总结:消费者消费的每条消息都必须确认。

消息拒绝

消息在确认之前,可以有两个选择:

选择1:断开与Rabbit的连接,这样Rabbit会重新把消息分派给另一个消费者;

选择2:拒绝Rabbit发送的消息使用channel.basicReject(long deliveryTag, boolean requeue),参数1:消息的id;参数2:处理消息的方式,如果是true,Rabbib会重新分配这个消息给其他订阅者,如果设置成false的话,Rabbit会把消息发送到一个特殊的“死信”队列,用来存放被拒绝而不重新放入队列的消息。

消息拒绝Demo:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.queueDeclare(config.QueueName, false, false, false, null);
GetResponse resp = channel.basicGet(config.QueueName, false);
String message = new String(resp.getBody(), "UTF-8");
channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); //消息拒绝

2、fanout交换器——发布/订阅模式

fanout有别于direct交换器,fanout是一种发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。

比如用户上传了自己的头像,这个时候图片需要清除缓存,同时用户应该得到积分奖励,你可以把这两个队列绑定到图片上传的交换器上,这样当有第三个、第四个上传完图片需要处理的需求的时候,原来的代码可以不变,只需要添加一个订阅消息即可,这样发送方和消费者的代码完全解耦,并可以轻而易举的添加新功能了。

和direct交换器不同,我们在发送消息的时候新增channel.exchangeDeclare(ExchangeName, “fanout”),这行代码声明fanout交换器。

发送端:

final String ExchangeName = "fanoutec"; // 交换器名称
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 声明fanout交换器
String message = "时间:" + new Date().getTime();
channel.basicPublish(ExchangeName, "", null, message.getBytes("UTF-8"));

接受消息不同于direct,我们需要声明fanout路由器,并使用默认的队列绑定到fanout交换器上。

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "fanout"); // 声明fanout交换器
String queueName = channel.queueDeclare().getQueue(); // 声明队列
channel.queueBind(queueName, ExchangeName, "");
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");}
};
channel.basicConsume(queueName, true, consumer);

fanout和direct的区别最多的在接收端,fanout需要绑定队列到对应的交换器用于订阅消息。

其中channel.queueDeclare().getQueue()为随机队列,Rabbit会随机生成队列名称,一旦消费者断开连接,该队列会自动删除。

注意:对于fanout交换器来说routingKey(路由键)是无效的,这个参数是被忽略的。

3、topic交换器——匹配订阅模式

最后介绍的是topic交换器,topic交换器运行和fanout类似,但是可以更灵活的匹配自己想要订阅的信息,这个时候routingKey路由键就排上用场了,使用路由键进行消息(规则)匹配。

假设我们现在有一个日志系统,会把所有日志级别的日志发送到交换器,warning、log、error、fatal,但我们只想处理error以上的日志,要怎么处理?这就需要使用topic路由器了。

topic路由器的关键在于定义路由键,定义routingKey名称不能超过255字节,使用“.”作为分隔符,例如:com.mq.rabbit.error。

消费消息的时候routingKey可以使用下面字符匹配消息:

  • “*”可以匹配所有内容;
  • “#”匹配0和多个字符;

例如发布了一个“com.mq.rabbit.error”的消息:

能匹配上的路由键:

  • cn.mq.rabbit.*

  • cn.mq.rabbit.#

  • .error

  • cn.mq.#

  • #

不能匹配上的路由键:

  • cn.mq.*
  • *.error
  • *

所以如果想要订阅所有消息,可以使用“#”匹配。

**注意:**fanout、topic交换器是没有历史数据的,也就是说对于中途创建的队列,获取不到之前的消息。

发布端:

String routingKey = "com.mq.rabbit.error";
Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 声明topic交换器
String message = "时间:" + new Date().getTime();
channel.basicPublish(ExchangeName, routingKey, null, message.getBytes("UTF-8"));

接收端:

Connection conn = connectionFactoryUtil.GetRabbitConnection();
Channel channel = conn.createChannel();
channel.exchangeDeclare(ExchangeName, "topic"); // 声明topic交换器
String queueName = channel.queueDeclare().getQueue(); // 声明队列
String routingKey = "#.error";
channel.queueBind(queueName, ExchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(routingKey + "|接收消息 => " + message);}
};
channel.basicConsume(queueName, true, consumer);

扩展部分—自定义线程池

如果需要更大的控制连接,用户可自己设置线程池,代码如下:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

其实看过源码的同学可能知道,factory.newConnection本身默认也有线程池的机制,ConnectionFactory.class部分源码如下:

private ExecutorService sharedExecutor;
public Connection newConnection() throws IOException, TimeoutException {return newConnection(this.sharedExecutor, Collections.singletonList(new Address(getHost(), getPort())));
}
public void setSharedExecutor(ExecutorService executor) {this.sharedExecutor = executor;
}

其中this.sharedExecutor就是默认的线程池,可以通过setSharedExecutor()方法设置ConnectionFactory的线程池,如果不设置则为null。

用户如果自己设置了线程池,像本小节第一段代码写的那样,那么当连接关闭的时候,不会自动关闭用户自定义的线程池,所以用户必须自己手动关闭,通过调用shutdown()方法,否则可能会阻止JVM的终止。

官方的建议是只有在程序出现严重性能瓶颈的时候,才应该考虑使用此功能。

项目地址

GitHub:https://github.com/vipstone/rabbitmq-java.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/546916.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

二、WIN10 64位下Pycharm打包.py程序为可执行文件exe

Win10在开发环境下,我们可以直接通过IDE (Pycharm)直接运行,当我们完成一个程序时,我们需要在独立环境下执行,因此我们需要将.py程序打包成windows环境下可直接执行的exe文件。 操作步骤如下: 1.在Pycharm中安装插件PyInstaller 搜索需要添加的PyInstaller模块,并安…

受限玻尔兹曼机(RBM)与python在Tensorflow的实现

简介 受限玻尔兹曼机是一种无监督,重构原始数据的一个简单的神经网络。 受限玻尔兹曼机先把输入转为可以表示它们的一系列输出;这些输出可以反向重构这些输入。通过前向和后向训练,训练好的网络能够提取出输入中最重要的特征。 为什么RBM很…

RabbitMQ事务和Confirm发送方消息确认——深入解读

引言 根据前面的知识(深入了解RabbitMQ工作原理及简单使用、Rabbit的几种工作模式介绍与实践)我们知道,如果要保证消息的可靠性,需要对消息进行持久化处理,然而消息持久化除了需要代码的设置之外,还有一个…

十四、PyCharm开发Python利用WMI修改电脑IP、DNS

1.在PyCharm开发中安装WMI插件 搜索需要添加的WM插件,并安装,安装成功后会有提示!

Python 3深度置信网络(DBN)在Tensorflow中的实现MNIST手写数字识别

任何程序错误,以及技术疑问或需要解答的,请扫码添加作者VX:1755337994 使用DBN识别手写体 传统的多层感知机或者神经网络的一个问题: 反向传播可能总是导致局部最小值。 当误差表面(error surface)包含了多个凹槽,当你…

PHP安装ZIP扩展

2019独角兽企业重金招聘Python工程师标准>>> 下载ZIP扩展包 wget http://pecl.php.net/get/zip-1.10.2.tgztar zxvf zip-1.10.2.tgz 进入解压后的目录,执行 /usr/local/php/bin/phpize 编译 ./configure --with-php-config/usr/local/php/bin/php-config…

使用Docker部署RabbitMQ集群

使用Docker部署RabbitMQ集群 概述 本文重点介绍的Docker的使用,以及如何部署RabbitMQ集群,最基础的Docker安装,本文不做过多的描述,读者可以自行度娘。 Windows10上Docker的安装 因为本人用的是Windows系统,所有推…

Python官方中文开发文档

Python官方中文开发文档: https://docs.python.org/zh-cn/3/

perl anyevent socket监控web日志server

上篇已经讲过client端的CODE这部分code主要用来接收client端发送来的日志,从数据库中读取reglar然后去匹配.如果出现匹配则判断为XSS***.server端的SOCKET接收用了coro相关的模块.配置文件仿照前一篇博客读取即可.#!/usr/bin/perl use warnings; use strict; use AnyEvent; use…

Tensorflow No module named ‘tensorflow.examples.tutorials‘解决办法,有用

任何程序错误,以及技术疑问或需要解答的,请扫码添加作者VX::1755337994 1 .利用TensorFlow代码下载MNIS丁 TensorFlow 提供了一个库, 可以直接用来自动下载与安装MNIST , 见如下代码: 代码5-1 M…

你不知道的RabbitMQ集群架构全解

你不知道的RabbitMQ集群架构全解 前言 本文将系统的介绍一下RabbitMQ集群架构的特点、异常处理、搭建和使用中要注意的一些细节。 知识点 一、为什么使用集群? 二、集群的特点 三、集群异常处理 四、集群节点类型 五、集群搭建方法 六、镜像队列 一、为什…

IPFS搭建HTTPS去中心化网站,真实可用

、 首先,我们需要知道IPFS是什么? 其实IPFS是一种协议,全称为Inter-Planetary File System,是一种点对点超媒体协议,旨在取代旧的HTTP,使网络更快,更安全,更开放。 我们平常都通过…

Ubuntu 18.04.1 搭建Java环境和HelloWorld

一、搭建Java环境 系统环境 Ubuntu 18.04.1JDK 8IDEA 2018.2 1.下载JDK 官网地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html 选择相应的版本,点击jdk,进入下载页面,选择“Linux x64”版本的后缀为…

Python openpyxl打开有公式的excel表取值错误的解决办法,Python openpyxl获取excel有公式的单元格的数值错误,Python操作excel(.xlsx)封装类

Python openpyxl打开有公式的表格,如果直接读取,会出现有公式的单元格为空或零的情况。 参见: https://blog.csdn.net/weixin_45903952/article/details/105073611?utm_mediumdistribute.wap_relevant.none-task-blog-title-3 wb openpyxl…

Python实现GCS bucket断点续传功能,分块上传文件

Python实现GCS bucket断点续传功能,分块上传文件 环境:Python 3.6 我有一个关于使用断点续传到Google Cloud Storage的上传速度的问题。我已经编写了一个Python客户端,用于将大文件上传到GCS(它具有一些特殊功能,这…

Spring Boot 最佳实践(一)快速入门

一、关于Spring Boot 在开始了解Spring Boot之前,我们需要先了解一下Spring,因为Spring Boot的诞生和Spring是息息相关的,Spring Boot是Spring发展到一定程度的一个产物,但并不是Spring的替代品,Spring Boot是为了让程…

Wo Cloud CentOS 挂载磁盘小计

为什么80%的码农都做不了架构师?>>> 涉及到的命令:fdisk/mkfs/mount 列出当前磁盘[rootvity ~]# fdisk -lDisk /dev/vda: 21.5 GB, 21474836480 bytes 16 heads, 63 sectors/track, 41610 cylinders Units cylinders of 1008 * 512 516096…

PC通过IE浏览器对华为S5700交换机进行WEB管理

1.PC和交换机通过网线连接,通过CONSOLE线缆连接华为S5700交换机,使用如下命令查看是否有web.7z文件 <Quidway>dir2.新建VLAN和配置VLAN的IP <Quidway>system-view [Quidway]<

最邻近插值、双线性插值、三次卷积插值最通俗入门理论解析,论文材料

如有任何问题&#xff0c;请联系VX&#xff1a;1755337994 前言 图像处理中有三种常用的插值算法&#xff1a; 最邻近插值 双线性插值 双立方&#xff08;三次卷积&#xff09;插值 其中效果最好的是双立方&#xff08;三次卷积&#xff09;插值&#xff0c;本文介绍它的原…

Spring Boot 最佳实践(二)集成Jsp与生产环境部署

一、简介 提起Java不得不说的一个开发场景就是Web开发&#xff0c;也是Java最热门的开发场景之一&#xff0c;说到Web开发绕不开的一个技术就是JSP&#xff0c;因为目前市面上仍有很多的公司在使用JSP&#xff0c;所以本文就来介绍一下Spring Boot 怎么集成JSP开发&#xff0c…