文章目录
- 一、安装RabbitMQ
- 1、前置要求
- 2、安装docker版
- 复制第一个节点的.erlang.cookie
- 进入各节点命令行
- 配置集群
- 检查集群状态
- 3、三台组合集群安装版
- rabbitmq节点
- rabbitmq-node2节点
- rabbitmq-node3节点
- 二、负载均衡:Management UI
- 1、说明
- 2、安装HAProxy
- 3、修改配置文件
- 三、负载均衡:核心功能
- 四、测试
- pom
- 生产者
- 消费者
一、安装RabbitMQ
1、前置要求
CentOS发行版的版本CentOS 8 Stream
下载:
官网:https://centos.org/download/
官网下载:https://wiki.centos.org/Download.html
阿里云镜像:http://mirrors.aliyun.com/centos/8-stream/isos/x86_64/
镜像下载:http://mirrors.aliyun.com/centos/8-stream/isos/x86_64/CentOS-Stream-8-x86_64-latest-boot.iso
2、安装docker版
拉取镜像
docker pull rabbitmq:3.13-management
启动容器
# 开三台修改端口地址 麻了,在这弄了好久,node3节点会去找node1和node2,所以需要两个别名都加上,注意hostname和node名称要一致(即下面name和hostname要一致)
docker run -d \
--name rabbitmq-node1 \
--hostname rabbitmq-node1 \
-e RABBITMQ_ERLANG_COOKIE='rabbitcookie' \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-managementdocker run -d \
--name rabbitmq-node2 \
--hostname rabbitmq-node2 \
-e RABBITMQ_ERLANG_COOKIE='rabbitcookie' \
-p 5673:5672 \
-p 15673:15672 \
--link rabbitmq-node1:rabbitmq-node1 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-managementdocker run -d \
--name rabbitmq-node3 \
--hostname rabbitmq-node3 \
-e RABBITMQ_ERLANG_COOKIE='rabbitcookie' \
-p 5674:5672 \
-p 15674:15672 \
--link rabbitmq-node1:rabbitmq-node1 \
--link rabbitmq-node2:rabbitmq-node2 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management
使用docker ps
检查是否成功启动
查看所有容器网络配置:默认网络配置为桥接模式
docker inspect --format='{{.Name}} - {{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' $(docker ps -aq)
复制第一个节点的.erlang.cookie
# 先复制到本地/opt/test,不可直接在两个容器中复制会报错
docker cp rabbitmq-node1:/var/lib/rabbitmq/.erlang.cookie /opt/testdocker cp /opt/test/.erlang.cookie rabbitmq-node2:/var/lib/rabbitmq/.erlang.cookie
docker cp /opt/test/.erlang.cookie rabbitmq-node3:/var/lib/rabbitmq/.erlang.cookie
在rabbitmq-node1、rabbitmq-node2、rabbitmq-node3查看内容是否一致
more /var/lib/rabbitmq/.erlang.cookie
进入各节点命令行
docker exec -it rabbitmq-node1 /bin/bash
docker exec -it rabbitmq-node2 /bin/bash
docker exec -it rabbitmq-node3 /bin/bash
# 进入rabbitmq-node2容器命令行
apt update
apt install curl -y
curl rabbitmq-node1:15672
检查别名是否生效(因为在容器中命令行无法修改hosts文件,所以在启动时通过配置别名来访问另一个容器),使用curl查看是否有返回
配置集群
进入rabbitmq-node2、rabbitmq-node3容器命令行
# 执行代码
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbitmq-node1
rabbitmqctl start_app
检查集群状态
rabbitmqctl cluster_status
其他博客参考案例:https://blog.csdn.net/want_you_gogo/article/details/120850605
3、三台组合集群安装版
rabbitmq节点
查看rabbitmq节点cookie值并记录
more /var/lib/rabbitmq/.erlang.cookie
AJRZLSANMKBYKOAXMSIA
重置节点应用
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
配置hostname
# 修改文件vim /etc/hosts,追加如下内容:
172.17.0.2 node1
172.17.0.3 node2
172.17.0.4 node3
# 退出rabbitmq节点容器
exit
rabbitmq-node2节点
修改rabbitmq节点cookie值和第一个节点一样
vim /var/lib/rabbitmq/.erlang.cookie
# 如果无法修改
# 先退出vim
:!e
:q
# 赋权
chmod 700 /var/lib/rabbitmq/.erlang.cookie
AJRZLSANMKBYKOAXMSIA
配置hostname
# 修改文件vim /etc/hosts,追加如下内容:
172.17.0.2 node1
172.17.0.3 node2
172.17.0.4 node3
重置节点应用(这里注意和第一个节点不同)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# 退出rabbitmq节点容器
exit
rabbitmq-node3节点
修改rabbitmq节点cookie值和第一个节点一样
vim /var/lib/rabbitmq/.erlang.cookie
# 如果无法修改
# 先退出vim
:!e
:q
# 赋权
chmod 700 /var/lib/rabbitmq/.erlang.cookie
AJRZLSANMKBYKOAXMSIA
配置hostname
# 修改文件vim /etc/hosts,追加如下内容:
172.17.0.2 node1
172.17.0.3 node2
172.17.0.4 node3
重置节点应用
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# 退出rabbitmq节点容器
exit
二、负载均衡:Management UI
1、说明
- 其实访问任何一个RabbitMQ实例的管理界面都是对集群操作,所以配置负载均衡通过统一入口访问在我们学习期间就是锦上添花
- 先给管理界面做负载均衡,然后方便我们在管理界面上创建交换机、队列等操作
2、安装HAProxy
yum install -y haproxy
haproxy -v
systemctl start haproxy
systemctl enable haproxy
3、修改配置文件
配置文件位置:
/etc/haproxy/haproxy.cfg
在配置文件末尾增加如下内容:
frontend rabbitmq_ui_frontend
bind 192.168.200.100:22222
mode http
default_backend rabbitmq_ui_backendbackend rabbitmq_ui_backend
mode http
balance roundrobin
option httpchk GET /
server rabbitmq_ui1 192.168.200.100:15672 check
server rabbitmq_ui2 192.168.200.150:15672 check
server rabbitmq_ui3 192.168.200.200:15672 check
设置SELinux策略,允许HAProxy拥有权限连接任意端口:
setsebool -P haproxy_connect_any=1
SELinux是Linux系统中的安全模块,它可以限制进程的权限以提高系统的安全性。在某些情况下,SELinux可能会阻止HAProxy绑定指定的端口,这就需要通过设置域(domain)的安全策略来解决此问题。
通过执行
setsebool -P haproxy_connect_any=1
命令,您已经为HAProxy设置了一个布尔值,允许HAProxy连接到任意端口。这样,HAProxy就可以成功绑定指定的socket,并正常工作。
重启HAProxy:
systemctl restart haproxy
检查是否成功组建集群
三、负载均衡:核心功能
新增
vim /etc/haproxy/haproxy.cfg
在配置文件末尾增加如下内容:
frontend rabbitmq_frontend
bind 192.168.217.134:11111
mode tcp
default_backend rabbitmq_backendbackend rabbitmq_backend
mode tcp
balance roundrobin
server rabbitmq1 127.0.0.1:5672 check
server rabbitmq2 127.0.0.1:5673 check
server rabbitmq3 127.0.0.1:5674 check
四、测试
创建组件
- 交换机:exchange.cluster.test
- 队列:queue.cluster.test
- 路由键:routing.key.cluster.test
pom
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.1.5</version>
</parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
</dependencies>
生产者
主启动类
package com.atguigu.mq; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication
public class RabbitMQProducerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args);}}
配置YAML
spring:rabbitmq:host: 192.168.217.134port: 11111username: guestpassword: 123456virtual-host: /publisher-confirm-type: CORRELATED # 交换机的确认publisher-returns: true # 队列的确认
logging:level:com.atguigu.mq.config.MQProducerAckConfig: info
配置类
package com.atguigu.mq.config;import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送到交换机成功!数据:" + correlationData);} else {log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);}}@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("消息主体: " + new String(returned.getMessage().getBody()));log.info("应答码: " + returned.getReplyCode());log.info("描述:" + returned.getReplyText());log.info("消息使用的交换器 exchange : " + returned.getExchange());log.info("消息使用的路由键 routing : " + returned.getRoutingKey());}
}
测试类
package com.atguigu.mq.test;import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class RabbitMQTest {@Resourceprivate RabbitTemplate rabbitTemplate;public static final String EXCHANGE_CLUSTER_TEST = "exchange.cluster.test";public static final String ROUTING_KEY_CLUSTER_TEST = "routing.key.cluster.test";@Testpublic void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_CLUSTER_TEST, ROUTING_KEY_CLUSTER_TEST, "message test cluster~~~");}}
消费者
主启动类
package com.atguigu.mq; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication
public class RabbitMQProducerMainType {public static void main(String[] args) {SpringApplication.run(RabbitMQProducerMainType.class, args);}}
配置YAML
spring:rabbitmq:host: 192.168.217.134port: 11111username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual
logging:level:com.atguigu.mq.listener.MyProcessor: info
监听器
package com.atguigu.mq.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class MyProcessor {@RabbitListener(queues = {"queue.cluster.test"})public void processNormalQueueMessage(String data, Message message, Channel channel) throws IOException {log.info("消费端:" + data);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}