深入解读RabbitMQ工作原理及简单使用

深入解读RabbitMQ工作原理及简单使用

RabbitMQ系列目录

  1. RabbitMQ在Ubuntu上的环境搭建
  2. 深入解读RabbitMQ工作原理及简单使用
  3. Rabbit的几种工作模式介绍与实践
  4. Rabbit事务与消息确认
  5. Rabbit集群搭建
  6. 使用HAProxy为RabbitMQ搭建负载均衡
  7. REST API控制Rabbit

RabbitMQ简介

在介绍RabbitMQ之前实现要介绍一下MQ,MQ是什么?

MQ全称是Message Queue,可以理解为消息队列的意思,简单来说就是消息以管道的方式进行传递。

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言的。

使用场景

在我们秒杀抢购商品的时候,系统会提醒我们稍等排队中,而不是像几年前一样页面卡死或报错给用户。

像这种排队结算就用到了消息队列机制,放入通道里面一个一个结算处理,而不是某个时间断突然涌入大批量的查询新增把数据库给搞宕机,所以RabbitMQ本质上起到的作用就是削峰填谷,为业务保驾护航。

为什么选择RabbitMQ

现在的市面上有很多MQ可以选择,比如ActiveMQ、ZeroMQ、Appche Qpid,那问题来了为什么要选择RabbitMQ?

  1. 除了Qpid,RabbitMQ是唯一一个实现了AMQP标准的消息服务器;
  2. 可靠性,RabbitMQ的持久化支持,保证了消息的稳定性;
  3. 高并发,RabbitMQ使用了Erlang开发语言,Erlang是为电话交换机开发的语言,天生自带高并发光环,和高可用特性;
  4. 集群部署简单,正是应为Erlang使得RabbitMQ集群部署变的超级简单;
  5. 社区活跃度高,根据网上资料来看,RabbitMQ也是首选;

工作机制

生产者、消费者和代理

在了解消息通讯之前首先要了解3个概念:生产者、消费者和代理。

生产者:消息的创建者,负责创建和推送数据到消息服务器;

消费者:消息的接收方,用于处理数据和确认消息;

代理:就是RabbitMQ本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

消息发送原理

首先你必须连接到Rabbit才能发布和消费消息,那怎么连接和发送消息的呢?

你的应用程序和Rabbit Server之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是你试图连接Rabbit之前发送的Rabbit服务器连接信息和用户名和密码,有点像程序连接数据库,使用Java有两种连接认证的方式,后面代码会详细介绍,一旦认证通过你的应用程序和Rabbit就创建了一条AMQP信道(Channel)。

信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。

为什么不通过TCP直接发送命令?

对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。

如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。

你必须知道的Rabbit

想要真正的了解Rabbit有些名词是你必须知道的。

包括:ConnectionFactory(连接管理器)、Channel(信道)、Exchange(交换器)、Queue(队列)、RoutingKey(路由键)、BindingKey(绑定键)。

ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;

Channel(信道):消息推送使用的通道;

Exchange(交换器):用于接受、分配消息;

Queue(队列):用于存储生产者的消息;

RoutingKey(路由键):用于把生成者的数据分配到交换器上;

BindingKey(绑定键):用于把交换器的消息绑定到队列上;

看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥作用的,请看下图:

关于更多交换器的信息,我们在后面再讲。

消息持久化

Rabbit队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化。

当你把消息发送到Rabbit服务器的时候,你需要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足3个条件:

  1. 投递消息的时候durable设置为true,消息持久化;

  2. 消息已经到达持久化交换器上;

  3. 消息已经到达持久化的队列;

持久化工作原理

Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。

持久化的缺点

消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。

所以使用者要根据自己的情况,选择适合自己的方式。

虚拟主机

每个Rabbit都能创建很多vhost,我们称之为虚拟主机,每个虚拟主机其实都是mini版的RabbitMQ,拥有自己的队列,交换器和绑定,拥有自己的权限机制。

环境搭建

前文我们已经介绍了Ubuntu搭建RabbitMQ的步骤:RabbitMQ在Ubuntu上的环境搭建

如果你是在Windows10上去安装那就更简单了,先放下载地址:

Erlang/Rabbit Server百度网盘链接:https://pan.baidu.com/s/1TnKDV-ZuXLiIgyK8c8f9dg 密码:wct9

当然也可去Erlang和Rabbit官网去下,就是速度比较慢。我的百度云Rabbit最新版本:3.7.6,Erlang版本:20.2,注意:不要下载最新的Erlang,在Windows10上打开扩展插件有问题,打不开。

  1. 安装Erlang;

  2. 安装Rabbit Server;

  3. 进入安装目录\sbin下,使用命令“rabbitmq-plugins enable rabbitmq_management”启动网页管理插件;

  4. 重启Rabbit服务;

使用:http://localhost:15672进行测试,默认的登陆账号为:guest,密码为:guest

重复安装Rabbit Server的坑

如果不是第一次在Windows上安装Rabbit Server一定要把Rabbit和Erlang卸载干净之后,找到注册表:HKEY_LOCAL_MACHINE\SOFTWARE\Ericsson\Erlang\ErlSrv 删除其下的所有项。

不然会出现Rabbit安装之后启动不了的情况,理论上卸载的顺序也是先Rabbit在Erlang。

代码实现

java版实现,使用maven项目,创建可以查看:MyEclipse2017破解设置与maven项目搭建

项目创建成功之后,添加Rabbit Client jar包,只需要在pom.xml里面配置,如下信息:

 <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>4.7.0</version>
</dependency>

java实现代码分为两个类,第一个是创建Rabbit连接,第二是应用类使用最简单的方式发布和消费消息。

Rabbit的连接,两种方式:

方式一:

public static Connection GetRabbitConnection() {ConnectionFactory factory = new ConnectionFactory();factory.setUsername(Config.UserName);factory.setPassword(Config.Password);factory.setVirtualHost(Config.VHost);factory.setHost(Config.Host);factory.setPort(Config.Port);Connection conn = null;try {conn = factory.newConnection();} catch (Exception e) {e.printStackTrace();}return conn;
}

方式二:

public static Connection GetRabbitConnection2() {ConnectionFactory factory = new ConnectionFactory();// 连接格式:amqp://userName:password@hostName:portNumber/virtualHostString uri = String.format("amqp://%s:%s@%s:%d%s", Config.UserName, Config.Password, Config.Host, Config.Port,Config.VHost);Connection conn = null;try {factory.setUri(uri);factory.setVirtualHost(Config.VHost);conn = factory.newConnection();} catch (Exception e) {e.printStackTrace();}return conn;
}

第二部分:应用类,使用最简单的方式发布和消费消息

public static void main(String[] args) {Publisher(); // 推送消息Consumer(); // 消费消息
}/*** 推送消息*/
public static void Publisher() {// 创建一个连接Connection conn = ConnectionFactoryUtil.GetRabbitConnection();if (conn != null) {try {// 创建通道Channel channel = conn.createChannel();// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】channel.queueDeclare(Config.QueueName, false, false, false, null);String content = String.format("当前时间:%s", new Date().getTime());// 发送内容【参数说明:参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性;参数四:消息主体】channel.basicPublish("", Config.QueueName, null, content.getBytes("UTF-8"));System.out.println("已发送消息:" + content);// 关闭连接channel.close();conn.close();} catch (Exception e) {e.printStackTrace();}}
}/*** 消费消息*/
public static void Consumer() {// 创建一个连接Connection conn = ConnectionFactoryUtil.GetRabbitConnection();if (conn != null) {try {// 创建通道Channel channel = conn.createChannel();// 声明队列【参数说明:参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】channel.queueDeclare(Config.QueueName, false, false, false, null);// 创建订阅器,并接受消息channel.basicConsume(Config.QueueName, false, "", new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String routingKey = envelope.getRoutingKey(); // 队列名称String contentType = properties.getContentType(); // 内容类型String content = new String(body, "utf-8"); // 消息正文System.out.println("消息正文:" + content);channel.basicAck(envelope.getDeliveryTag(), false); // 手动确认消息【参数说明:参数一:该消息的index;参数二:是否批量应答,true批量确认小于index的消息】}});} catch (Exception e) {e.printStackTrace();}}
}

代码里面已经写了很详细的注释,在这里也不过多的介绍了。

执行效果,如图:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/546918.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python根据坐标点拟合曲线绘图

python根据坐标点拟合曲线绘图 任何程序错误&#xff0c;以及技术疑问或需要解答的&#xff0c;请添加 import os import numpy as np from scipy import log from scipy.optimize import curve_fit import matplotlib.pyplot as plt import math from sklearn.metrics impor…

RabbitMQ交换器Exchange介绍与实践

导读 有了Rabbit的基础知识之后&#xff08;基础知识详见&#xff1a;深入解读RabbitMQ工作原理及简单使用&#xff09;&#xff0c;本章我们重点学习一下Rabbit里面的exchange&#xff08;交换器&#xff09;的知识。 交换器分类 RabbitMQ的Exchange&#xff08;交换器&…

二、WIN10 64位下Pycharm打包.py程序为可执行文件exe

Win10在开发环境下,我们可以直接通过IDE (Pycharm)直接运行,当我们完成一个程序时,我们需要在独立环境下执行,因此我们需要将.py程序打包成windows环境下可直接执行的exe文件。 操作步骤如下: 1.在Pycharm中安装插件PyInstaller 搜索需要添加的PyInstaller模块,并安…

受限玻尔兹曼机(RBM)与python在Tensorflow的实现

简介 受限玻尔兹曼机是一种无监督&#xff0c;重构原始数据的一个简单的神经网络。 受限玻尔兹曼机先把输入转为可以表示它们的一系列输出&#xff1b;这些输出可以反向重构这些输入。通过前向和后向训练&#xff0c;训练好的网络能够提取出输入中最重要的特征。 为什么RBM很…

RabbitMQ事务和Confirm发送方消息确认——深入解读

引言 根据前面的知识&#xff08;深入了解RabbitMQ工作原理及简单使用、Rabbit的几种工作模式介绍与实践&#xff09;我们知道&#xff0c;如果要保证消息的可靠性&#xff0c;需要对消息进行持久化处理&#xff0c;然而消息持久化除了需要代码的设置之外&#xff0c;还有一个…

十四、PyCharm开发Python利用WMI修改电脑IP、DNS

1.在PyCharm开发中安装WMI插件 搜索需要添加的WM插件,并安装,安装成功后会有提示!

Python 3深度置信网络(DBN)在Tensorflow中的实现MNIST手写数字识别

任何程序错误&#xff0c;以及技术疑问或需要解答的&#xff0c;请扫码添加作者VX&#xff1a;1755337994 使用DBN识别手写体 传统的多层感知机或者神经网络的一个问题&#xff1a; 反向传播可能总是导致局部最小值。 当误差表面(error surface)包含了多个凹槽&#xff0c;当你…

PHP安装ZIP扩展

2019独角兽企业重金招聘Python工程师标准>>> 下载ZIP扩展包 wget http://pecl.php.net/get/zip-1.10.2.tgztar zxvf zip-1.10.2.tgz 进入解压后的目录&#xff0c;执行 /usr/local/php/bin/phpize 编译 ./configure --with-php-config/usr/local/php/bin/php-config…

使用Docker部署RabbitMQ集群

使用Docker部署RabbitMQ集群 概述 本文重点介绍的Docker的使用&#xff0c;以及如何部署RabbitMQ集群&#xff0c;最基础的Docker安装&#xff0c;本文不做过多的描述&#xff0c;读者可以自行度娘。 Windows10上Docker的安装 因为本人用的是Windows系统&#xff0c;所有推…

Python官方中文开发文档

Python官方中文开发文档&#xff1a; https://docs.python.org/zh-cn/3/

perl anyevent socket监控web日志server

上篇已经讲过client端的CODE这部分code主要用来接收client端发送来的日志,从数据库中读取reglar然后去匹配.如果出现匹配则判断为XSS***.server端的SOCKET接收用了coro相关的模块.配置文件仿照前一篇博客读取即可.#!/usr/bin/perl use warnings; use strict; use AnyEvent; use…

Tensorflow No module named ‘tensorflow.examples.tutorials‘解决办法,有用

任何程序错误&#xff0c;以及技术疑问或需要解答的&#xff0c;请扫码添加作者VX&#xff1a;&#xff1a;1755337994 1 .利用TensorFlow代码下载MNIS丁 TensorFlow 提供了一个库&#xff0c; 可以直接用来自动下载与安装MNIST &#xff0c; 见如下代码&#xff1a; 代码5-1 M…

你不知道的RabbitMQ集群架构全解

你不知道的RabbitMQ集群架构全解 前言 本文将系统的介绍一下RabbitMQ集群架构的特点、异常处理、搭建和使用中要注意的一些细节。 知识点 一、为什么使用集群&#xff1f; 二、集群的特点 三、集群异常处理 四、集群节点类型 五、集群搭建方法 六、镜像队列 一、为什…

IPFS搭建HTTPS去中心化网站,真实可用

、 首先&#xff0c;我们需要知道IPFS是什么&#xff1f; 其实IPFS是一种协议&#xff0c;全称为Inter-Planetary File System&#xff0c;是一种点对点超媒体协议&#xff0c;旨在取代旧的HTTP&#xff0c;使网络更快&#xff0c;更安全&#xff0c;更开放。 我们平常都通过…

Ubuntu 18.04.1 搭建Java环境和HelloWorld

一、搭建Java环境 系统环境 Ubuntu 18.04.1JDK 8IDEA 2018.2 1.下载JDK 官网地址&#xff1a;http://www.oracle.com/technetwork/java/javase/downloads/index.html 选择相应的版本&#xff0c;点击jdk&#xff0c;进入下载页面&#xff0c;选择“Linux x64”版本的后缀为…

Python openpyxl打开有公式的excel表取值错误的解决办法,Python openpyxl获取excel有公式的单元格的数值错误,Python操作excel(.xlsx)封装类

Python openpyxl打开有公式的表格&#xff0c;如果直接读取&#xff0c;会出现有公式的单元格为空或零的情况。 参见&#xff1a; https://blog.csdn.net/weixin_45903952/article/details/105073611?utm_mediumdistribute.wap_relevant.none-task-blog-title-3 wb openpyxl…

Python实现GCS bucket断点续传功能,分块上传文件

Python实现GCS bucket断点续传功能&#xff0c;分块上传文件 环境&#xff1a;Python 3.6 我有一个关于使用断点续传到Google Cloud Storage的上传速度的问题。我已经编写了一个Python客户端&#xff0c;用于将大文件上传到GCS&#xff08;它具有一些特殊功能&#xff0c;这…

Spring Boot 最佳实践(一)快速入门

一、关于Spring Boot 在开始了解Spring Boot之前&#xff0c;我们需要先了解一下Spring&#xff0c;因为Spring Boot的诞生和Spring是息息相关的&#xff0c;Spring Boot是Spring发展到一定程度的一个产物&#xff0c;但并不是Spring的替代品&#xff0c;Spring Boot是为了让程…

Wo Cloud CentOS 挂载磁盘小计

为什么80%的码农都做不了架构师&#xff1f;>>> 涉及到的命令&#xff1a;fdisk/mkfs/mount 列出当前磁盘[rootvity ~]# fdisk -lDisk /dev/vda: 21.5 GB, 21474836480 bytes 16 heads, 63 sectors/track, 41610 cylinders Units cylinders of 1008 * 512 516096…

PC通过IE浏览器对华为S5700交换机进行WEB管理

1.PC和交换机通过网线连接,通过CONSOLE线缆连接华为S5700交换机,使用如下命令查看是否有web.7z文件 <Quidway>dir2.新建VLAN和配置VLAN的IP <Quidway>system-view [Quidway]<