【RabbitMQ 实战】09 客户端连接集群生产和消费消息

一、部署一个三节点集群

下面的链接是最快最简单的一种集群部署方法
3分钟部署一个RabbitMQ集群
上的的例子中,没有映射端口,所以没法从宿主机外部连接容器,下面的yml文件中,暴露了端口。
每个容器应用都映射了宿主机的端口,分别是5602,5612,5622
docker compse文件如下

version: '3'services:stats:image: bitnami/rabbitmqenvironment:- RABBITMQ_NODE_TYPE=stats- RABBITMQ_NODE_NAME=rabbit@stats- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3ports:- '15672:15672'- '5602:5672'volumes:- 'rabbitmqstats_data:/bitnami/rabbitmq/mnesia'queue-disc1:image: bitnami/rabbitmqenvironment:- RABBITMQ_NODE_TYPE=queue-disc- RABBITMQ_NODE_NAME=rabbit@queue-disc1- RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3ports:- '5612:5672'volumes:- 'rabbitmqdisc1_data:/bitnami/rabbitmq/mnesia'queue-ram1:image: bitnami/rabbitmqenvironment:- RABBITMQ_NODE_TYPE=queue-ram- RABBITMQ_NODE_NAME=rabbit@queue-ram1- RABBITMQ_CLUSTER_NODE_NAME=rabbit@stats- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3ports:- '5622:5672'volumes:- 'rabbitmqram1_data:/bitnami/rabbitmq/mnesia'volumes:rabbitmqstats_data:driver: localrabbitmqdisc1_data:driver: localrabbitmqram1_data:driver: local

通过docker-compose up命令,就可以启动三个集群的容器了

[root@localhost mycompose]# docker-compose up

二、配置文件

原来的单节点只配置host和port,现在集群节点,就要配置addresses了,如下所示:

server:port: 8080
spring:application:name: rabbitmq-demo#配置rabbitMq 服务器rabbitmq:
#单节点直接可以写host和port
#    host: 192.168.56.201
#    port: 5672#集群连接写ip和端口addresses: 192.168.56.202:5602,192.168.56.202:5612,192.168.56.202:5622username: userpassword: bitnami#虚拟hostvirtual-host: virtual01template:mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功publisher-returns: true #是否开启生产者returnslistener:simple:acknowledge-mode: manual #手动回复方式,一般建议手动回复,即需要我们自己调用对应的ACK方法prefetch: 10 #每个消费者可拉取的,还未ack的消息数量concurrency: 3 #消费端(每个Listener)的最小线程数max-concurrency: 10 #消费端(每个Listener)的最大线程数

三、代码

生产者

和单节点的发送和消费代码一致,没有变化

@Slf4j
@RestController
@RequestMapping("/rabbit")
public class RabbitSendController implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {private static final String EXCHANGE_NAME = "my_exchange";private static final String ROUTING_KEY = "my_routing";@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 正常发送并被broker接收* @return*/@RequestMapping("send")public String send() {for (int i = 0; i < 10; i++) {OrderInfo orderInfo = new OrderInfo();orderInfo.setAddress("成都市高新区");orderInfo.setOrderId(String.valueOf(i));orderInfo.setProductName("华为P60:" + i);//设置回调关联的一个idString messageId = UUID.randomUUID().toString();log.info("开始发送消息,当前消息关联id为:{}", messageId);CorrelationData correlationData = new CorrelationData(messageId);MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();//设置ack回调rabbitTemplate.setConfirmCallback(this);//退回消息的回调rabbitTemplate.setReturnCallback(this);rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);}return "ok";}/*** 设置一个非法的路由键,模拟消息被broker退回的情况,前提是* spring.rabbitmq.template.mandatory=true 当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息* <p>* spring.rabbitmq.publisher-returns=true 生产者回调确认机制,由回调来确定消息是否发布成功** @return*/@RequestMapping("send-return")public String sendAndReturn() {OrderInfo orderInfo = new OrderInfo();orderInfo.setAddress("成都市高新区");orderInfo.setOrderId("111");orderInfo.setProductName("小米13");//设置回调关联的一个idString messageId = UUID.randomUUID().toString();log.info("开始发送消息,当前消息关联id为:{}", messageId);CorrelationData correlationData = new CorrelationData(messageId);MessageProperties messageProperties = new MessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();//设置ack回调rabbitTemplate.setConfirmCallback(this);//退回消息的回调rabbitTemplate.setReturnCallback(this);//下面这个RoutingKey是没有绑定的,所以发不出去rabbitTemplate.convertAndSend(EXCHANGE_NAME, "error.routing", message, correlationData);return "ok";}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (correlationData == null) {return;}String messageId = correlationData.getId();if (ack) {log.info("【confirm回调方法】,消息发布成功,messageId={}", messageId);} else {log.info("【confirm回调方法】,消息发布失败,messageId={}", messageId);}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("【returnedMessage回调方法】,消息被退回,message={},replyCode:{},replyText:{},exchange:{},routingKey:{}",new String(message.getBody()), replyCode, replyText, exchange, routingKey);}
}

消费者

@Slf4j
@Component
public class RabbitOrderConsumer {private static final String EXCHANGE_NAME = "my_exchange";private static final String QUEUE_NAME = "my_queue";private static final String ROUTING_KEY = "my_routing";@RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_NAME, type = "topic", durable = "true"), key = ROUTING_KEY)})public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {//上面这个tag是这么写的么,为什么每次传过来都是1?导致channel被重新创建log.info("接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), tag);channel.basicAck(tag, false);}
}

访问地址:http://localhost:8080/rabbit/send,然后就可以发送消息了,输出日志如下:

开始发送消息,当前消息关联id为:18049efe-a624-4288-a8f0-9c28fd776773
开始发送消息,当前消息关联id为:83d93f90-62f4-41cf-af02-03d496812561
开始发送消息,当前消息关联id为:f83257b2-95b6-408e-a5b9-74d0ec9f30b0
开始发送消息,当前消息关联id为:16a7e471-23ba-408b-9095-6add9ad1e270
开始发送消息,当前消息关联id为:152b0fb0-3a22-452d-93fe-662252c2fd8c
开始发送消息,当前消息关联id为:ade4f703-6075-485f-8e34-ec9b95bf59de
开始发送消息,当前消息关联id为:e4511f82-476a-4f4c-b704-4399baadeaf4
接收到消息:{"orderId":"1","productName":"华为P60:1","address":"成都市高新区"},deliveryTag:1
接收到消息:{"orderId":"0","productName":"华为P60:0","address":"成都市高新区"},deliveryTag:1
开始发送消息,当前消息关联id为:d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
开始发送消息,当前消息关联id为:76950a93-5887-43c1-adef-edc1e29e2fab
开始发送消息,当前消息关联id为:f08a7a68-60da-4c5d-b1b8-c9e4d9453969
【confirm回调方法】,消息发布成功,messageId=18049efe-a624-4288-a8f0-9c28fd776773
【confirm回调方法】,消息发布成功,messageId=83d93f90-62f4-41cf-af02-03d496812561
接收到消息:{"orderId":"3","productName":"华为P60:3","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"2","productName":"华为P60:2","address":"成都市高新区"},deliveryTag:1
接收到消息:{"orderId":"6","productName":"华为P60:6","address":"成都市高新区"},deliveryTag:3
接收到消息:{"orderId":"5","productName":"华为P60:5","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"9","productName":"华为P60:9","address":"成都市高新区"},deliveryTag:4
接收到消息:{"orderId":"4","productName":"华为P60:4","address":"成都市高新区"},deliveryTag:2
接收到消息:{"orderId":"7","productName":"华为P60:7","address":"成都市高新区"},deliveryTag:3
接收到消息:{"orderId":"8","productName":"华为P60:8","address":"成都市高新区"},deliveryTag:3
【confirm回调方法】,消息发布成功,messageId=f83257b2-95b6-408e-a5b9-74d0ec9f30b0
【confirm回调方法】,消息发布成功,messageId=16a7e471-23ba-408b-9095-6add9ad1e270
【confirm回调方法】,消息发布成功,messageId=152b0fb0-3a22-452d-93fe-662252c2fd8c
【confirm回调方法】,消息发布成功,messageId=ade4f703-6075-485f-8e34-ec9b95bf59de
【confirm回调方法】,消息发布成功,messageId=e4511f82-476a-4f4c-b704-4399baadeaf4
【confirm回调方法】,消息发布成功,messageId=d8cd2dd6-bb9e-4d46-bc42-0d96df70748f
【confirm回调方法】,消息发布成功,messageId=76950a93-5887-43c1-adef-edc1e29e2fab
【confirm回调方法】,消息发布成功,messageId=f08a7a68-60da-4c5d-b1b8-c9e4d9453969

上述代码仓库:https://gitee.com/syk1234/mqdmo

四、后台管理

登录管理后台页面:http://192.168.56.202:15672/
在这里插入图片描述

共有三个节点,两个磁盘节点,一个内存节点。如果你还不清楚什么是磁盘节点,什么是内存节点,可以参考【RabbitMQ 实战】08 集群原理剖析

查看连接情况,发现是连接的是节点rabbit@stats节点在这里插入图片描述
查看队列的情况,队列是在rabbit@stats节点上
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/100852.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vscode进行远程开发

之前用的是pycharm&#xff0c;但是同事说pycharm太重了&#xff0c;连接远程服务器的时候给远程服务器的压力比较大&#xff0c;有时候远程服务器可能都扛不住&#xff0c;所以换成了vscode。 参考博客 手把手教你配置VS Code远程开发工具&#xff0c;工作效率提升N倍 - 知…

词云图大揭秘:如何从文本中挖掘热点词汇?

随着互联网的普及&#xff0c;大量的文本信息在网络上被产生和传播。如何从这些海量的文本中提取出有价值的信息&#xff0c;成为了人们关注的焦点。在这个信息爆炸的时代&#xff0c;词云图作为一种直观、形象的数据可视化手段&#xff0c;越来越受到人们的喜爱。本文手把手教…

设计模式 - 七大软件设计原则

目录 一、设计模式 1.1、软件设计原则 1.1.1、开闭原则 1.2.2、单一职责原则 1.2.3、里氏替换原则 1.2.4、迪米特原则 1.2.5、接口隔离原则 1.2.6、依赖倒转原则 1.2.7、合成/聚合复用原则 一、设计模式 1.1、软件设计原则 1.1.1、开闭原则 开闭原则&#xff1a;对扩…

双周赛114(模拟、枚举 + 哈希、DFS)

文章目录 双周赛114[2869. 收集元素的最少操作次数](https://leetcode.cn/problems/minimum-operations-to-collect-elements/)模拟 [2870. 使数组为空的最少操作次数](https://leetcode.cn/problems/minimum-number-of-operations-to-make-array-empty/)哈希 枚举 [2871. 将数…

Docker--harbor私有仓库部署与管理

目录 一、Harbor简介 1、什么是Harbor 2、Harbor的特性 3、Haebor的构成 二、搭建本地私有仓库 1、本地私有仓库创建 2、将镜像上传至本地私有仓库 三、搭建Harbor仓库 1. 部署 Docker-Compose 服务 2、部署 Harbor 服务 3、启动Harbor 4、创建一个新项目 5、在其他…

并发、并行、同步、异步、阻塞、非阻塞

一、多核、多cpu &#xff08;一&#xff09;多核 Multicore 核是CPU最重要的部分。负责运算。核包括控制单元、运算单元、寄存器等单元。 多核就是指单个CPU中有多个核。 &#xff08;二&#xff09;多cpu Multiprocessor 多cpu就是一个系统拥有多个CPU。每个CPU可能有单个核…

北京股票开户的佣金手续费是多少?北京股票开户选择哪家券商?

北京股票开户的佣金手续费是多少?北京股票开户选择哪家券商? 股票注册开户是非常简单的&#xff0c;在2015年前也就是互联网还不发达的时候&#xff0c;投资者只能去券商的营业部柜台办理&#xff0c;而自从各大券商都可以网上开户后&#xff0c;更多的投资者会选择网上开户…

【运维】一些团队开发相关的软件安装。

gitlab 安装步骤 (1) 下载镜像&#xff0c;并且上传到服务器 https://mirrors.tuna.tsinghua.edu.cn/gitlab-ce/yum/el7/gitlab-ce-16.2.8-ce.0.el7.x86_64.rpm &#xff08;2&#xff09;rpm -i gitlab-ce-16.2.8-ce.0.el7.x86_64.rpm &#xff08;3&#xff09;安装成功后…

vue elementui <el-date-picker>日期选择框限制只能选择90天内的日期(包括今天)

之前也写过其他限制日期的语句&#xff0c;感觉用dayjs()的subtract()和add()也挺方便易懂的&#xff0c;以此记录 安装dayjs npm install dayjs --save dayjs().add(value : Number, unit : String); dayjs().add(7, day); //在当前的基础上加7天dayjs().subtract(value : N…

在硅云上主机搭建wordpress并使用Astra主题和avada主题

目录 前言 准备 操作 DNS解析域名 云主机绑定域名 安装wordpress网站程序 网站内Astra主题设计操作 安装主题 网站内avada主题安装 上传插件 上传主题 选择网站主题 前言 一开始以为云虚拟主机和云服务器是一个东西&#xff0c;只不过前者是虚拟的后者是不是虚拟的…

练[GYCTF2020]EasyThinking

[GYCTF2020]EasyThinking 文章目录 [GYCTF2020]EasyThinking掌握知识解题思路还得靠大佬正式开始 关键paylaod 掌握知识 ​ thinkphpV6任意文件操作漏洞&#xff0c;代码分析写入session文件的参数&#xff0c;源码泄露&#xff0c;使用蚁剑插件disable_functions绕过终端无回…

尚硅谷CSS学习笔记

什么是css css&#xff08;层叠样式表&#xff09; 它是一种标记语言&#xff0c;用于给HTML结构设置样式。简单理解css可以美化html&#xff0c;实现结构与样式的分离。 <link rel"shortcut icon" href"favicon.ico" type"image/x-icon"&g…

encoding/json vs json-iterator

encoding/json vs json-iterator 100% Compatibility 默认情况下&#xff0c;jsoniter 不会像标准库那样对映射键进行排序。如果你想要 100% 的兼容性&#xff0c;就这样使用 m : map[string]interface{}{"3": 3,"1": 1,"2": 2, } json : json…

线性代数 --- 矩阵的QR分解,A=QR

矩阵的QR分解&#xff0c;格拉姆施密特过程的矩阵表示 首先先简单的回顾一下Gram-Schmidt正交化过程的核心思想&#xff0c;如何把一组线性无关的向量构造成一组标准正交向量&#xff0c;即&#xff0c;如何把矩阵A变成矩阵Q的过程。 给定一组线性无关的向量a,b,c&#xff0c;我…

Transformer预测 | Pytorch实现基于mmTransformer多模态运动预测(堆叠Transformer)

文章目录 文章概述程序设计参考资料文章概述 Transformer预测 | Pytorch实现基于mmTransformer多模态运动预测(堆叠Transformer) 程序设计 Initialize virtual environment: conda create -n mmTrans python=3.7# -*- coding: utf-8 -*- import argparse import os

推荐高效的电脑磁盘备份解决方案!

该怎样实现电脑磁盘备份&#xff1f; 接下来&#xff0c;我们将为你介绍两种磁盘备份方法。一种是利用操作系统自带的功能&#xff0c;另一种则是通过第三方工具实现。 方法一. Windows自带的备份还原功能 要在Windows 11/10/8/7中备份软件&#xff0c;你可以使…

一文带你了解 Linux 的 Cache 与 Buffer

目录 前言一、Cache二、Buffer三、Linux 系统中的 Cache 与 Buffer总结 前言 内存的作用是什么&#xff1f;简单的理解&#xff0c;内存的存在是为了解决高速传输设备与低速传输设备之间数据传输速度不和谐而设立的中间层&#xff08;学过计算机网络的应该都知道&#xff0c;这…

【内网穿透】Docker部署Drupal并实现公网访问

目录 前言 1. Docker安装Drupal 2. 本地局域网访问 3 . Linux 安装cpolar 4. 配置Drupal公网访问地址 5. 公网远程访问Drupal 6. 固定Drupal 公网地址 前言 Dupal是一个强大的CMS&#xff0c;适用于各种不同的网站项目&#xff0c;从小型个人博客到大型企业级门户网站。…

【Python】实现excel文档中指定工作表数据的更新操作

在做数值计算时&#xff0c;个人比较习惯利用excel文档的公式做数值计算进行对比&#xff0c;检查异常&#xff0c;虽然计算量大后&#xff0c;excel计算会比较缓慢&#xff0c;但设计简单&#xff0c;易排错 但一般测试过程中使用到的数据都不是最终数值&#xff0c;会不停根据…

红队专题-从零开始VC++远程控制软件RAT-C/S-[1]远控介绍及界面编写

红队专题 招募六边形战士队员[1]---远控介绍及界面编写1.远程控制软件演示及教程简要说明主程序可执行程序 服务端生成器主机上线服务端程序 和 服务文件管理CMD进程服务自启动主程序主对话框操作菜单列表框配置信息 多线程操作非模式对话框 2.环境&#xff1a;3.界面编程新建项…