RabbitMQ
- 一、RabbitMQ概述
- 1、什么是消息队列
- 2、为什么要使用消息队列
- 3、RabbitMQ特点
- 二、RabbitMQ安装
- 1、安装前准备
- 1.1 依赖包安装
- 1.2 安装Erlang
- 2、安装
- 3、常用命令
- 3.1. 启动和关闭
- 3.2. 插件管理
- 3.3. 用户管理
- 3.4. 权限管理
- 3.5. vhost管理
- 三、RabbitMQ消息发送和接收
- 1、 RabbitMQ的消息发送和接收机制
- 2、Exchange类型
- 3、RabbitMQ发送消息
- 四、SpringBoot集成RabbitMQ
- 五、RabbitMQ集群
一、RabbitMQ概述
1、什么是消息队列
消息(Message) 是指在应用间传递的数据。消息可以非常简单,比如只包含文本字符串,也可以很复杂,可能包含嵌入对象。
消息队列(Message Queue) 是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。
这样发布者和使用者都不用知道对方的存在。
2、为什么要使用消息队列
从上面的描述中可以看出消息队列是一种应用间的异步协调机制
,那什么时候需要使用MQ呢?
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成响应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这是可以将一些不需要立即生效的操作拆分出来异步执行,比如发送红包、发短信通知等。这种场景下就可以用MQ,在下单的主流程(比如扣减库存、生成响应单据)完成之后发送一条消息到MQ让主流程快速完成,而由另外的单独线程拉去MQ的消息(或由MQ推送消息),当发现MQ中有发红包或发短信之类的消息时,执行响应的业务逻辑。
以上是用于业务结偶
的情况,其他常见场景包括最终一致性
、广播
、错峰流控
等等。
强一致性:比如上面四个操作要么全部执行成功,要么全部不执行。
最终一致性:比如上面四个操作,主操作先执行,最后另外两个操作最终也执行完即可,不必同时完成。
广播:比如消息推送。
错峰流控:比如流量太大的时候,超过一定量的时候,再来访问就排队,防止服务器被整垮。
3、RabbitMQ特点
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP:Advanced Message Queue,高级消息队列协议。应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
- 可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 - 灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 - 消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 - 高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 - 多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 - 多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 - 管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 - 跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
kafka效率高但是不安全,RabbitMQ效率没他高但是安全,使用的企业也多。
二、RabbitMQ安装
一般来说安装 RabbitMQ 之前要安装 Erlang ,可以去Erlang官网下载。接着去RabbitMQ官网下载安装包,之后解压缩即可。
Erlang官方下载地址:https://www.erlang.org/downloads
RabbitMQ官方下载地址:https://www.rabbitmq.com/download.html
1、安装前准备
1.1 依赖包安装
安装RabbitMQ之前必须要先安装所需要的依赖包可以使用下面的一次性安装命令
yum install gcc glibc-devel make ncurses-devel openssl-devel xmlto
1.2 安装Erlang
- 将Erlang源代码包otp_src_19.3.tar.gz上传到Linux的/home目录下
- 解压erlang 源码包
tar -zxvf otp_src_19.3.tar.gz
- 手动创建erlang 的安装目录
mkdir /usr/local/erlang
- 进入erlang的解压目录
cd otp_src_19.3
- 配置erlang的安装信息
./configure --prefix=/usr/local/erlang --without-javac
- 编译并安装
make && make install
- 配置环境变量
vim /etc/profile
- 将这些配置填写到profile文件的最后
ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH
- 启动环境变量配置文件
source /etc/profile
- 查看
erl -version
2、安装
1、 将RabbitMQ安装包 rabbitmq-server-3.7.2-1.el7.noarch.rpm 上传到 /home 目录
2、安装RabbitMQ(rpm文件相当于windows中的exe文件)
rpm -ivh --nodeps rabbitmq-server-3.7.2-1.el7.noarch.rpm
3、常用命令
3.1. 启动和关闭
- 启动RabbitMQ
rabbitmq-server start
注意:这里可能会出现错误,错误原因是/var/lib/rabbitmq/.erlang.cookie文件权限不够。
解决方案对这个文件授权:
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookiechmod 400 /var/lib/rabbitmq/.erlang.cookie
- 停止服务
rabbitmqctl stop
- 查看启动后的进程
ps -ef | grep rabbitmq
3.2. 插件管理
- 添加插件
rabbitmq-plugins enable {插件名}
- 删除插件
rabbitmq-plugins disable {插件名}
注意:RabbitMQ启动以后可以使用浏览器进入管控台但是默认情况RabbitMQ不允许直接使用浏览器浏览器进行访问因此必须添加插件
rabbitmq-plugins enable rabbitmq_management
- 使用浏览器访问管控台
http://RabbitMQ服务器IP:15672
http://192.168.71.128:15672
3.3. 用户管理
RabbitMQ安装成功后使用默认用户名guest登录:
账号:guest
密码:guest
注意:这里guest只允许本机登录访问
需要创建用户并授权远程访问命令如下:
-
添加用户:
rabbitmqctl add_user {username} {password}
-
删除用户:
rabbitmqctl delete_user {username}
-
修改密码:
rabbitmqctl change_password {username} {newpassword}
-
设置用户角色:
rabbitmqctl set_user_tags {username} {tag}
tag用户角色取值为:management, monitoring, policymaker, administrator
各角色详解:
management:
用户可以通过AMQP做的任何事外加:
列出自己可以通过AMQP登入的virtual hosts
查看自己的virtual hosts中的queues, exchanges 和 bindings
查看和关闭自己的channels 和 connections
查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。policymaker :
management可以做的任何事外加:
查看、创建和删除自己的virtual hosts所属的policies和parametersmonitoring :
management可以做的任何事外加:
列出所有virtual hosts,包括他们不能登录的virtual hosts
查看其他用户的connections和channels
查看节点级别的数据如clustering和memory使用情况
查看真正的关于所有virtual hosts的全局的统计信息administrator:
policymaker和monitoring可以做的任何事外加:
创建和删除virtual hosts
查看、创建和删除users
查看创建和删除permissions
关闭其他用户的connections
3.4. 权限管理
- 授权命令:
rabbitmqctl set_permissions [-p vhostpath] {user} {conf} {write} {read}-p vhostpath
用于指定一个资源的命名空间,例如 –p / 表示根路径命名空间
user:用于指定要为哪个用户授权填写用户名
conf:一个正则表达式match哪些配置资源能够被该用户配置。
write:一个正则表达式match哪些配置资源能够被该用户读。
read:一个正则表达式match哪些配置资源能够被该用户访问。
例如:rabbitmqctl set_permissions -p / root “.” “.” “.*”
用于设置root用户拥有对所有资源的 读写配置权限
- 查看用户权限
rabbitmqctl list_permissions [vhostpath]
例如查看根径经下的所有用户权限: rabbitmqctl list_permissions
查看指定命名空间下的所有用户权限: rabbitmqctl list_permissions /abc
- 查看指定用户下的权限
rabbitmqctl list_user_permissions {username}
例如查看root用户下的权限 rabbitmqctl list_user_permissions root
- 清除用户权限rabbitmqctl clear_permissions {username}
例如:清除root用户的权限 rabbitmqctl clear_permissions root
3.5. vhost管理
vhost是RabbitMQ中的一个命名空间,可以限制消息的存放位置利用这个命名空间可以进行权限的控制有点类似Windows中的文件夹一样,在不同的文件夹中存放不同的文件。
- 添加vhost:
rabbitmqctl add vhost {name}
例如 rabbitmqctl add vhost bjpowernode
- 删除vhost:
rabbitmqctl delete vhost {name}
例如 rabbitmqctl delete vhost bjpowernode
三、RabbitMQ消息发送和接收
1、 RabbitMQ的消息发送和接收机制
MQ的基本抽象模型:
RabbitMQ的内部接收:
举例说明:
2、Exchange类型
- direct
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。它是完全匹配
、单播
的模式。
- fanout
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 类型是一种广播
模式,转发消息是最快的
。
这种模式就和我们收看电视直播一样,需要提前在消费者中监听队列,否则如果消息先发送了,那么消费者可能永远接收不到,就错过了。这种模式的交换机适合适合使用在一些消息数据不是很重要的应用中,用户接收到和接收不到都无所谓。例如:手机app的消息推送。 - topic
topic 交换器通过模式匹配
分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它有两个通配符:符号“#”和符号“*”。#
匹配0个或多个单词,*
匹配不多不少一个单词。
他也会丢失消息,也应该先启动消费者监听队列。
3、RabbitMQ发送消息
创建maven工程
- 添加依赖:
<dependencies>
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.1.1</version>
</dependency>
</dependencies>
- 编写Send类
public class Send{public static void main(String[] args) throws IOException, TimeoutException {//创建链接工厂对象ConnectionFactory factory=new ConnectionFactory();//设置RabbitMQ连接相关信息factory.setHost("192.168.171.143");//设置RabbitMQ的主机IPfactory.setPort(5672);//设置RabbitMQ的端口号factory.setUsername("root");//设置访问用户名factory.setPassword("root");//设置访问密码Connection connection=null;//定义链接对象Channel channel=null;//定义通道对象connection=factory.newConnection();//获取链接channel=connection.createChannel();//获取通道//指定Exchange的类型//参数1为 交换机名称//参数2为 交换机类型取值为 direct、fanout、topic、headers//参数3为 是否为持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare("myExchange", "direct", true);//发送消息到RabbitMQ//参数1 我们自定义的交换机名称//参数2 自定义的RoutingKey值//参数3 设置消息的属性,可以通过消息属性设置消息是否是持久化的//参数4 具体要发送的消息信息String message ="Hello World!3";channel.basicPublish("myExchange","myRoutingKey",null,message.getBytes("UTF-8"));System.out.println("消息发送成功: "+message);channel.close();connection.close();}
}