1. 消息队列
1.1. MQ 的相关概念
1.1.1. 什么是MQ
MQ(message queue)
,从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
1.1.2. 为什么要用 MQ
1. 流量消峰
- 举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
2. 应用解耦
- 以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
3. 异步处理
- 有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api,B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。
1.1.3. MQ 的分类
1. ActiveMQ
- 优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据。
- 缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。
2. Kafka
- 大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。
- 优点: 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web 管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
- 缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
3. RocketMQ
-
RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。
-
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ。
-
缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码。
4. RabbitMQ
-
2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
-
优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高
-
https://www.rabbitmq.com/news.html。
-
缺点:商业版需要收费,学习成本较高。
1.1.4. MQ 的选择
1. Kafka
- Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了。
2. RocketMQ
- 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。
3. RabbitMQ
- 结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。
1.2. RabbitMQ
1.2.1. RabbitMQ 的概念
- RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
1.2.2. 四大核心概念
-
生产者
- 产生数据发送消息的程序是生产者
-
交换机
- 交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。
-
队列
- 队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。
-
消费者
- 消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
1.2.3. RabbitMQ 核心部分
1.2.4. 各个名词介绍
-
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
-
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
-
Connection:publisher/consumer 和 broker 之间的 TCP 连接。
-
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销。
-
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
-
Queue:消息最终被送到这里等待 consumer 取走。
-
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
2. Rabbitmq单机安装部署
- 官网地址:https://www.rabbitmq.com/download.html
#准备gpgkey密钥
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
rpm --import https://packagecloud.io/rabbitmq/erlang/gpgkey
rpm --import https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey#配置yum仓库
vim /etc/yum.repos.d/rabbitmq.repo[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
repo_gpgcheck=1
gpgcheck=1
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkeyhttps://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkeyhttps://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300##
## RabbitMQ server
##[rabbitmq_server]
name=rabbitmq_server
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkeyhttps://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300[rabbitmq_server-source]
name=rabbitmq_server-source
baseurl=https://packagecloud.io/rabbitmq/rabbitmq-server/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300#安装rabbitmq和erlangyum -y install erlang rabbitmq-server-3.9.15systemctl start rabbitmq-server.servicesystemctl enable rabbitmq-server.service#安装 RabbitMQ Web 插件rabbitmq-plugins enable rabbitmq_managementsystemctl restart rabbitmq-server#创建账号密码并设置用户的权限rabbitmqctl add_user ys ys1234rabbitmqctl set_permissions -p / ys ".*" ".*" ".*"#创建web管理用户
rabbitmqctl add_user admin admin1234
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
2.1. 访问rabbitmq管理界面
3. RabbitMQ 集群
3.1. 使用集群的原因:
- 前面介绍了如何安装及运行 RabbitMQ 服务,不过这些是单机版的,无法满足目前真实应用的要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ服务器可以满足每秒 1000 条消息的吞吐量,那么如果应用需要 RabbitMQ 服务满足每秒 10 万条消息的吞吐量呢?购买昂贵的服务器来增强单机 RabbitMQ 务的性能显得捉襟见肘,搭建一个 RabbitMQ 集群才是解决实际问题的关键。
3.2. Rabbitmq集群安装部署
- RabbitMQ支持高可用性和可扩展性的集群部署。在RabbitMQ集群中,至少需要3个节点来确保高可用性和数据冗余。
- 主机
主机名 | IP地址 |
---|---|
node1 | 192.168.10.10 |
node2 | 192.168.10.20 |
node3 | 192.168.10.30 |
3.2.1. 修改主机名
#3台主机操作一样
#添加hosts解析
vim /etc/hosts
192.168.10.10 node1
192.168.10.20 node2
192.168.10.30 node3
#修改主机名
#每个节点执行这条命令即可,注意这条命令网卡名需要根据实际情况填写
hostname `cat /etc/hosts|grep $(ifconfig ens33|grep broadcast|awk '{print $2}')|awk '{print $2}'`;su
3.2.2. 以确保各个节点的 cookie 文件使用的是同一个值
- 在 node1 上执行远程操作命令
scp -r /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
3.2.3. 在node2、node3启动服务
systemctl start rabbitmq-server.service
3.2.4. 在node2节点执行以下操作
#rabbitmqctl stop 会将 Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
3.2.5. 在node3节点执行以下操作
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app
3.2.6. 查看集群状态:
rabbitmqctl cluster_status
3.2.7. 解除集群节点(node2 和 node3 机器分别执行)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2(node1 机器上执行)
3.3. 镜像队列
3.3.1. 使用镜像的原因
- 如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗。通过 publisherconfirm 机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。
- 引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。
3.3.2. 搭建步骤
-
登录rabbitmq管理界面
-
点击admin–>Policies添加以下内容:
4. Haproxy+Keepalive 实现高可用负载均衡
- 整体架构图
4.1. Haproxy 实现负载均衡
- HAProxy 提供高可用性、负载均衡及基于 TCPHTTP 应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括 Twitter,Reddit,StackOverflow,GitHub 在内的多家知名互联网公司在使用。HAProxy 实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。
4.2. 搭建步骤
# 10.40、10.50安装软件
yum -y install haproxy keepalived
#修改 10.40、10.50 的 haproxy.cfg
vim /etc/haproxy/haproxy.cfg
global
#定义了haproxy的日志输出方式,这里使用了local0和local1两个facility,其中local0表示输出到syslog的local0日志文件中
#local1表示输出到syslog的local1日志文件中,级别为notice。log /dev/log local0log /dev/log local1 noticepidfile /var/run/haproxy.pid #定义了haproxy进程的pid文件路径chroot /var/lib/haproxy #定义了haproxy要使用的chroot目录,这里使用了/var/lib/haproxy
#定义了haproxy的统计信息socket文件路径和权限,这里指定了/var/run/haproxy-admin.sock,权限为660,级别为adminstats socket /var/run/haproxy-admin.sock mode 660 level adminstats timeout 30s #定义了haproxy统计信息的超时时间,这里设置为30秒
#定义了haproxy进程要使用的用户和组,这里使用了haproxy用户和haproxy组user haproxygroup haproxydaemon #定义了haproxy以守护进程方式运行nbproc 1 #定义了haproxy的工作进程数,这里设置为1
defaultslog global #定义了默认的日志输出方式,这里使用了global,表示使用全局日志配置timeout connect 5000 #定义了连接超时时间,这里设置为5000毫秒timeout client 10m #定义了客户端超时时间,这里设置为10分钟timeout server 10m #定义了服务器超时时间,这里设置为10分钟
listen admin_statsbind 0.0.0.0:10080 #定义了监听地址和端口,这里使用了0.0.0.0:10080,表示监听所有IP地址的10080端口mode http #定义了监听模式,这里使用了http模式log 127.0.0.1 local0 err #定义了日志输出方式,这里将日志输出到127.0.0.1的local0设备,级别为errstats refresh 30s #定义了统计信息刷新时间,这里设置为30秒stats uri /status #定义了统计信息页面的URI,这里设置为/statusstats realm welcome login\ Haproxy #定义了统计信息页面的realm,这里设置为"welcome login Haproxy"stats auth admin:123456 #定义了统计信息页面的用户名和密码,这里设置为admin和123456stats hide-version #定义了是否隐藏haproxy的版本信息,这里设置为隐藏stats admin if TRUE #定义了是否启用统计信息页面的管理功能,这里设置为启用
listen rabbitmqbind 0.0.0.0:5672 #定义了监听地址和端口,这里使用了0.0.0.0:8443,表示监听所有IP地址的8443端口mode tcp #定义了监听模式,这里使用了tcp模式option tcplog #定义了是否启用TCP日志记录,这里设置为启用balance source #定义了负载均衡算法,这里使用了source算法
#定义了后端服务器,这里定义了三个服务器,分别是master1、master2和master3
#它们的IP地址和端口分别为192.168.2.10:5672、192.168.2.20:5672和192.168.2.30:5672
#check表示启用健康检查,inter 2000表示每2秒进行一次健康检查
#fall 2表示检查失败的阈值为2,rise 2表示检查成功的阈值为2,weight 1表示权重为1server node1 192.168.10.10:5672 check inter 2000 fall 2 rise 2 weight 1server node2 192.168.10.20:5672 check inter 2000 fall 2 rise 2 weight 1server node3 192.168.10.30:5672 check inter 2000 fall 2 rise 2 weight 1listen rabbitmq_webbind 0.0.0.0:15672 #定义了监听地址和端口,这里使用了0.0.0.0:8443,表示监听所有IP地址的8443端口mode tcp #定义了监听模式,这里使用了tcp模式option tcplog #定义了是否启用TCP日志记录,这里设置为启用balance source #定义了负载均衡算法,这里使用了source算法server node1 192.168.10.10:15672 check inter 2000 fall 2 rise 2 weight 1server node2 192.168.10.20:15672 check inter 2000 fall 2 rise 2 weight 1server node3 192.168.10.30:15672 check inter 2000 fall 2 rise 2 weight 1
- 启动haproxy
systemctl start haproxy.service
systemctl enable haproxy.service
4.3. 配置keepalived
#10.40修改配置文件keepalived.conf
vim /etc/keepalived/keepalived.conf
! Configuration File for keepalived
global_defs { #全局定义部分,用于设置全局参数notification_email { #设置通知邮件的收件人邮箱yangshuangsxy@163.com }notification_email_from root@localhost #设置通知邮件的发件人邮箱smtp_server 127.0.0.1 #设置 SMTP 服务器的地址smtp_connect_timeout 30 #设置 SMTP 连接超时时间router_id LVS_DEVEL #设置路由器的标识符
}
vrrp_script chk_haproxy { #定义一个 VRRP 脚本,用于检查 HAProxy 的状态script "/data/sh/check_haproxy.sh" #指定要执行的脚本路径interval 2 #设置脚本执行的间隔时间weight 2 #设置脚本的权重}
# VIP1
vrrp_instance VI_1 { #定义一个 VRRP 实例state MASTER #设置实例的状态,可以是 MASTER 或 BACKUPinterface ens33 #指定 VRRP 实例绑定的网络接口virtual_router_id 51 #设置虚拟路由器的 IDpriority 100 #设置实例的优先级advert_int 5 #设置 VRRP 广告间隔时间nopreempt #禁止抢占模式,即不允许备用节点抢占主节点authentication { #设置认证参数auth_type PASS #设置认证类型,可以是 PASS 或 AHauth_pass 1111 #设置认证密码}virtual_ipaddress { #设置虚拟 IP 地址192.168.10.100}track_script { #设置要跟踪的脚本chk_haproxy}
}#10.50修改配置文件keepalived.conf如下:
state BACKUP
priority 90
#其他的配置文件内容和上面的一样
4.4. 创建检测脚本,10.40和10.50都执行以下操作
# 检测是否安装psmisc,如果没有安装,需要安装
rpm -qa | grep psmisc
psmisc-22.20-17.el7.x86_64vim /data/sh/check_haproxy.sh
#!/bin/bash
#auto check nginx process
#2024年1月16日15:16:29
#by author XiaoYuEr
killall -0 haproxy
if [[ $? -ne 0 ]];then
service keepalived stop
fi
#添加执行权限
chmod +x /data/sh/check_haproxy.sh
4.5. 启动服务,并验证
systemctl restart haproxy.service
systemctl enable haproxy.service
systemctl restart keepalived.service
systemctl enable keepalived.service#在10.40主机上验证:
ip add1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0.0.1/8 scope host lovalid_lft forever preferred_lft foreverinet6 ::1/128 scope hostvalid_lft forever preferred_lft forever
2: ens33: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000link/ether 00:0c:29:de:b6:0a brd ff:ff:ff:ff:ff:ffinet 192.168.10.40/16 brd 192.168.255.255 scope global noprefixroute ens33valid_lft forever preferred_lft foreverinet 192.168.10.100/32 scope global ens33valid_lft forever preferred_lft foreverinet6 fe80::3cd3:afe6:494b:44e6/64 scope link noprefixroutevalid_lft forever preferred_lft forever