【RabbitMQ 实战】09 客户端连接集群生产和消费消息

一、部署一个三节点集群

下面的链接是最快最简单的一种集群部署方法
3分钟部署一个RabbitMQ集群
上的的例子中,没有映射端口,所以没法从宿主机外部连接容器,下面的yml文件中,暴露了端口。
每个容器应用都映射了宿主机的端口,分别是5602,5612,5622
docker compse文件如下

version: '3'services:stats:image: bitnami/rabbitmqenvironment:- RABBITMQ_NODE_TYPE=stats- RABBITMQ_NODE_NAME=rabbit@stats- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3ports:- '15672:15672'- '5602:5672'volumes:- 'rabbitmqstats_data:/bitnami/rabbitmq/mnesia'queue-disc1:image: bitnami/rabbitmqenvironment:- RABBITMQ_NODE_TYPE=queue-disc- RABBITMQ_NODE_NAME=rabbit@queue-disc1- RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3ports:- '5612:5672'volumes:- 'rabbitmqdisc1_data:/bitnami/rabbitmq/mnesia'queue-ram1:image: bitnami/rabbitmqenvironment:- RABBITMQ_NODE_TYPE=queue-ram- RABBITMQ_NODE_NAME=rabbit@queue-ram1- RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3ports:- '5622:5672'volumes:- 'rabbitmqram1_data:/bitnami/rabbitmq/mnesia'volumes:rabbitmqstats_data:driver: localrabbitmqdisc1_data:driver: localrabbitmqram1_data:driver: local

通过docker-compose up命令,就可以启动三个集群的容器了

[root@localhost mycompose]# docker-compose up

二、配置文件

原来的单节点只配置host和port,现在集群节点,就要配置addresses了,如下所示:

server:port: 8080
spring:application:name: rabbitmq-demo#配置rabbitMq 服务器rabbitmq:
#单节点直接可以写host和port
#    host: 192.168.56.201
#    port: 5672#集群连接写ip和端口addresses: 192.168.56.202:5602,192.168.56.202:5612,192.168.56.202:5622username: userpassword: bitnami#虚拟hostvirtual-host: virtual01template:mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功publisher-returns: true #是否开启生产者returnslistener:simple:acknowledge-mode: manual #手动回复方式,一般建议手动回复,即需要我们自己调用对应的ACK方法prefetch: 10 #每个消费者可拉取的,还未ack的消息数量concurrency: 3 #消费端(每个Listener)的最小线程数max-concurrency: 10 #消费端(每个Listener)的最大线程数

三、代码

生产者

和单节点的发送和消费代码一致,没有变化

@Slf4j
@RestController
@RequestMapping("/rabbit")
public class RabbitSendController implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {private static final String EXCHANGE_NAME = "my_exchange";private static final String ROUTING_KEY = "my_routing";@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 正常发送并被broker接收* @return*/@RequestMapping("send")public String send() {for (int i = 0; i < 10; i++) {OrderInfo orderInfo = new OrderInfo();orderInfo.setAddress("成都市高新区");orderInfo.setOrderId(String.valueOf(i));orderInfo.setProductName("华为P60:" + i);//设置回调关联的一个idString messageId = UUID.randomUUID().toString();log.info("开始发送消息,当前消息关联id为:{}", messageId);CorrelationData correlationData = new CorrelationData(messageId);MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();//设置ack回调rabbitTemplate.setConfirmCallback(this);//退回消息的回调rabbitTemplate.setReturnCallback(this);rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);}return "ok";}/*** 设置一个非法的路由键,模拟消息被broker退回的情况,前提是* spring.rabbitmq.template.mandatory=true 当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息* <p>* spring.rabbitmq.publisher-returns=true 生产者回调确认机制,由回调来确定消息是否发布成功** @return*/@RequestMapping("send-return")public String sendAndReturn() {OrderInfo orderInfo = new OrderInfo();orderInfo.setAddress("成都市高新区");orderInfo.setOrderId("111");orderInfo.setProductName("小米13");//设置回调关联的一个idString messageId = UUID.randomUUID().toString();log.info("开始发送消息,当前消息关联id为:{}", messageId);CorrelationData correlationData = new CorrelationData(messageId);MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();//设置ack回调rabbitTemplate.setConfirmCallback(this);//退回消息的回调rabbitTemplate.setReturnCallback(this);//下面这个RoutingKey是没有绑定的,所以发不出去rabbitTemplate.convertAndSend(EXCHANGE_NAME, "error.routing", message, correlationData);return "ok";}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (correlationData == null) {return;}String messageId = correlationData.getId();if (ack) {log.info("【confirm回调方法】,消息发布成功,messageId={}", messageId);} else {log.info("【confirm回调方法】,消息发布失败,messageId={}", messageId);}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("【returnedMessage回调方法】,消息被退回,message={},replyCode:{},replyText:{},exchange:{},routingKey:{}",new String(message.getBody()), replyCode, replyText, exchange, routingKey);}
}

消费者

@Slf4j
@Component
public class RabbitOrderConsumer {private static final String EXCHANGE_NAME = "my_exchange";private static final String QUEUE_NAME = "my_queue";private static final String ROUTING_KEY = "my_routing";@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_NAME, type = "topic", durable = "true"), key = ROUTING_KEY)})public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//上面这个tag是这么写的么,为什么每次传过来都是1?导致channel被重新创建log.info("接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), tag);channel.basicAck(tag, false);}
}

访问地址:http://localhost:8080/rabbit/send,然后就可以发送消息了,输出日志如下:

开始发送消息,当前消息关联id为:18049efe-a624-4288-a8f0-9c28fd776773
开始发送消息,当前消息关联id为:83d93f90-62f4-41cf-af02-03d496812561
开始发送消息,当前消息关联id为:f83257b2-95b6-408e-a5b9-74d0ec9f30b0
开始发送消息,当前消息关联id为:16a7e471-23ba-408b-9095-6add9ad1e270
开始发送消息,当前消息关联id为:152b0fb0-3a22-452d-93fe-662252c2fd8c
开始发送消息,当前消息关联id为:ade4f703-6075-485f-8e34-ec9b95bf59de
开始发送消息,当前消息关联id为:e4511f82-476a-4f4c-b704-4399baadeaf4
接收到消息:{"orderId":"1","productName":"华为P60:1","address":"成都市高新区"},deliveryTag:1
接收到消息:{"orderId":"0","productName":"华为P60:0","address":"成都市高新区"},deliveryTag:1
开始发送消息,当前消息关联id为:d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
开始发送消息,当前消息关联id为:76950a93-5887-43c1-adef-edc1e29e2fab
开始发送消息,当前消息关联id为:f08a7a68-60da-4c5d-b1b8-c9e4d9453969
【confirm回调方法】,消息发布成功,messageId=18049efe-a624-4288-a8f0-9c28fd776773
【confirm回调方法】,消息发布成功,messageId=83d93f90-62f4-41cf-af02-03d496812561
接收到消息:{"orderId":"3","productName":"华为P60:3","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"2","productName":"华为P60:2","address":"成都市高新区"},deliveryTag:1
接收到消息:{"orderId":"6","productName":"华为P60:6","address":"成都市高新区"},deliveryTag:3
接收到消息:{"orderId":"5","productName":"华为P60:5","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"9","productName":"华为P60:9","address":"成都市高新区"},deliveryTag:4
接收到消息:{"orderId":"4","productName":"华为P60:4","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"7","productName":"华为P60:7","address":"成都市高新区"},deliveryTag:3
接收到消息:{"orderId":"8","productName":"华为P60:8","address":"成都市高新区"},deliveryTag:3
【confirm回调方法】,消息发布成功,messageId=f83257b2-95b6-408e-a5b9-74d0ec9f30b0
【confirm回调方法】,消息发布成功,messageId=16a7e471-23ba-408b-9095-6add9ad1e270
【confirm回调方法】,消息发布成功,messageId=152b0fb0-3a22-452d-93fe-662252c2fd8c
【confirm回调方法】,消息发布成功,messageId=ade4f703-6075-485f-8e34-ec9b95bf59de
【confirm回调方法】,消息发布成功,messageId=e4511f82-476a-4f4c-b704-4399baadeaf4
【confirm回调方法】,消息发布成功,messageId=d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
【confirm回调方法】,消息发布成功,messageId=76950a93-5887-43c1-adef-edc1e29e2fab
【confirm回调方法】,消息发布成功,messageId=f08a7a68-60da-4c5d-b1b8-c9e4d9453969

上述代码仓库:https://gitee.com/syk1234/mqdmo

四、后台管理

登录管理后台页面:http://192.168.56.202:15672/
在这里插入图片描述

共有三个节点,两个磁盘节点,一个内存节点。如果你还不清楚什么是磁盘节点,什么是内存节点,可以参考【RabbitMQ 实战】08 集群原理剖析

查看连接情况,发现是连接的是节点rabbit@stats节点在这里插入图片描述
查看队列的情况,队列是在rabbit@stats节点上
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/100852.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【ES6 03】变量解构赋值

变量解构赋值 数组解构赋值1 基操2 默认值 对象的解构赋值默认值注意 字符串的解构赋值数值与布尔值的解构赋值函数参数的解构赋值圆括号不得使用 作用 数组解构赋值 1 基操 ES6允许按照一定的模式从数组和对象中提取值从而对变量进行赋值&#xff0c;也即解构&#xff08;De…

Vscode进行远程开发

之前用的是pycharm&#xff0c;但是同事说pycharm太重了&#xff0c;连接远程服务器的时候给远程服务器的压力比较大&#xff0c;有时候远程服务器可能都扛不住&#xff0c;所以换成了vscode。 参考博客 手把手教你配置VS Code远程开发工具&#xff0c;工作效率提升N倍 - 知…

词云图大揭秘:如何从文本中挖掘热点词汇?

随着互联网的普及&#xff0c;大量的文本信息在网络上被产生和传播。如何从这些海量的文本中提取出有价值的信息&#xff0c;成为了人们关注的焦点。在这个信息爆炸的时代&#xff0c;词云图作为一种直观、形象的数据可视化手段&#xff0c;越来越受到人们的喜爱。本文手把手教…

设计模式 - 七大软件设计原则

目录 一、设计模式 1.1、软件设计原则 1.1.1、开闭原则 1.2.2、单一职责原则 1.2.3、里氏替换原则 1.2.4、迪米特原则 1.2.5、接口隔离原则 1.2.6、依赖倒转原则 1.2.7、合成/聚合复用原则 一、设计模式 1.1、软件设计原则 1.1.1、开闭原则 开闭原则&#xff1a;对扩…

双周赛114(模拟、枚举 + 哈希、DFS)

文章目录 双周赛114[2869. 收集元素的最少操作次数](https://leetcode.cn/problems/minimum-operations-to-collect-elements/)模拟 [2870. 使数组为空的最少操作次数](https://leetcode.cn/problems/minimum-number-of-operations-to-make-array-empty/)哈希 枚举 [2871. 将数…

LeetCode-503-下一个更大元素Ⅱ

题目描述&#xff1a; 给定一个循环数组 nums &#xff08; nums[nums.length - 1] 的下一个元素是 nums[0] &#xff09;&#xff0c;返回 nums 中每个元素的 下一个更大元素 。 数字 x 的 下一个更大的元素 是按数组遍历顺序&#xff0c;这个数字之后的第一个比它更大的数&am…

Docker--harbor私有仓库部署与管理

目录 一、Harbor简介 1、什么是Harbor 2、Harbor的特性 3、Haebor的构成 二、搭建本地私有仓库 1、本地私有仓库创建 2、将镜像上传至本地私有仓库 三、搭建Harbor仓库 1. 部署 Docker-Compose 服务 2、部署 Harbor 服务 3、启动Harbor 4、创建一个新项目 5、在其他…

Docker 容器管理

容器概念 docker容器相当于一个进程&#xff0c;性能接近于原生&#xff0c;几乎没有损耗&#xff1b; docker容器在单台主机上支持的数量成百上千&#xff1b; 容器与容器之间相互隔离&#xff1b; 镜像是创建容器的基础&#xff0c;可以理解镜像为一个压缩包 容器管理 …

并发、并行、同步、异步、阻塞、非阻塞

一、多核、多cpu &#xff08;一&#xff09;多核 Multicore 核是CPU最重要的部分。负责运算。核包括控制单元、运算单元、寄存器等单元。 多核就是指单个CPU中有多个核。 &#xff08;二&#xff09;多cpu Multiprocessor 多cpu就是一个系统拥有多个CPU。每个CPU可能有单个核…

5种排序算法

文章目录 一&#xff0c;排序算法时间复杂度比较二&#xff0c;插入排序三&#xff0c;冒泡排序四&#xff0c;快速排序五&#xff0c;堆排序六&#xff0c;二分归并排序 一&#xff0c;排序算法时间复杂度比较 算法最坏情况下平均情况下插入排序O(n )O(n)冒泡排序O(n)O(n)快速…

北京股票开户的佣金手续费是多少?北京股票开户选择哪家券商?

北京股票开户的佣金手续费是多少?北京股票开户选择哪家券商? 股票注册开户是非常简单的&#xff0c;在2015年前也就是互联网还不发达的时候&#xff0c;投资者只能去券商的营业部柜台办理&#xff0c;而自从各大券商都可以网上开户后&#xff0c;更多的投资者会选择网上开户…

【运维】一些团队开发相关的软件安装。

gitlab 安装步骤 (1) 下载镜像&#xff0c;并且上传到服务器 https://mirrors.tuna.tsinghua.edu.cn/gitlab-ce/yum/el7/gitlab-ce-16.2.8-ce.0.el7.x86_64.rpm &#xff08;2&#xff09;rpm -i gitlab-ce-16.2.8-ce.0.el7.x86_64.rpm &#xff08;3&#xff09;安装成功后…

观察者模式 行为型设计模式之七

1.定义 在GOF的《设计模式&#xff1a;可复用面向对象软件的基础》一书中对观察者模式是这样定义的&#xff1a;定义对象的一种一对多的依赖关系&#xff0c;当一个对象的状态发生改变时&#xff0c;所有依赖于它的对象都得到通知并被自动更新。当一个对象发生了变化&#xff0…

vue elementui <el-date-picker>日期选择框限制只能选择90天内的日期(包括今天)

之前也写过其他限制日期的语句&#xff0c;感觉用dayjs()的subtract()和add()也挺方便易懂的&#xff0c;以此记录 安装dayjs npm install dayjs --save dayjs().add(value : Number, unit : String); dayjs().add(7, day); //在当前的基础上加7天dayjs().subtract(value : N…

《TWS蓝牙耳机通信原理与接口技术》

+他V hezkz17进数字音频系统研究开发交流答疑群(课题组) 耳机BT与手机BT通信 主耳与从耳通信 耳机BLE盒手机BLE通信 充电盒与耳机通信 上位机与耳机通信 上位机与充电盒通信 1 耳机BT与手机BT通信 传输音频数据传递控制信息 (3) 耳机BLE与手机BLE通信 安卓/苹果app-耳机…

在硅云上主机搭建wordpress并使用Astra主题和avada主题

目录 前言 准备 操作 DNS解析域名 云主机绑定域名 安装wordpress网站程序 网站内Astra主题设计操作 安装主题 网站内avada主题安装 上传插件 上传主题 选择网站主题 前言 一开始以为云虚拟主机和云服务器是一个东西&#xff0c;只不过前者是虚拟的后者是不是虚拟的…

练[GYCTF2020]EasyThinking

[GYCTF2020]EasyThinking 文章目录 [GYCTF2020]EasyThinking掌握知识解题思路还得靠大佬正式开始 关键paylaod 掌握知识 ​ thinkphpV6任意文件操作漏洞&#xff0c;代码分析写入session文件的参数&#xff0c;源码泄露&#xff0c;使用蚁剑插件disable_functions绕过终端无回…

尚硅谷CSS学习笔记

什么是css css&#xff08;层叠样式表&#xff09; 它是一种标记语言&#xff0c;用于给HTML结构设置样式。简单理解css可以美化html&#xff0c;实现结构与样式的分离。 <link rel"shortcut icon" href"favicon.ico" type"image/x-icon"&g…

【调度算法】快速非支配排序算法

这段代码实现的是快速非支配排序算法&#xff08;Fast Non-dominated Sorting Algorithm&#xff09;。 算法输入和输出&#xff1a; 这个函数的输入是两个列表 values1 和 values2&#xff0c;分别表示多目标优化问题中每个解在两个目标函数下的取值。输入的两个列表应该具有…

encoding/json vs json-iterator

encoding/json vs json-iterator 100% Compatibility 默认情况下&#xff0c;jsoniter 不会像标准库那样对映射键进行排序。如果你想要 100% 的兼容性&#xff0c;就这样使用 m : map[string]interface{}{"3": 3,"1": 1,"2": 2, } json : json…