RabbitMQ 消息队列安装及入门

市面常见消息队列中间件对比

技术名称吞吐量 /IO/并发时效性(类似延迟)消息到达时间可用性可靠性优势应用场景
activemq万级简单易学中小型企业、项目
rabbitmq万级极高(微秒)极高生态好(基本什么语言都支持)、时效性高、易学适合绝大数的分布式应用
kafka10万 QBS高(毫秒)极高极高吞吐量大、可靠性、可用性、强大的数据流处理能力适合大规模处理数据的场景、比如构建日志手机系统、实时数据传输、事件流收集传输
rocketmq10万 QBS高ms极高极高吞吐量大、可靠性、可用性、可扩展性适用于金融等可靠性要求较高的场景、适合大规模的消息处理。金融、电商、大规模社交
pulsar10万 QBS高ms极高可靠性、可用性很高、新兴(技术架构先进)适合大规模、高并发的分布式系统(云原生)适合实时分析、事件流处理、物联网数据处理。

RabbitMQ 

RabbitMQ 是基于 AMQP 高级消息队列协议的。

 实际使用可根据官方文档的 demo 。

官方文档:RabbitMQ Tutorials | RabbitMQ

模型

生产者:通俗就是发消息的人,比如在外卖软件上点餐的人

消费者:通俗就是处理消息的任务,比如外卖软件上的商家,需要根据顾客的要求制作餐

交换机:负责把消息转发到对应的队列中,比如有外卖的时候,系统给附近的外面小哥派单

队列:存放消息的地方,等待消费者消费,比如商家肯定不是只做一份餐,做好的餐放在一个指定的位置等待外卖小哥来取餐

路由:转发,就是怎么把消息从一个地方转到另一个地方,通常加在交换机和队列之间,比如系统指定某个范围的外卖小哥接这单

安装

1. 首先安装 RabbitMQ,直接官网下载即可,如果下载速度慢,可以换个网络,或者找找有没有国内镜像。(当初我下载的时候找了半天的镜像都是版本比较老的,结果想着挂一晚上下载,结果官网突然就快了,白折腾了。)

官方网站:Installing on Windows | RabbitMQ

国内镜像:Index of rabbitmq-server-local/v3.12.7

一路 next ,傻瓜式安装即可

安装之后检查服务中是否已经运行了。

2. 安装监控面板

在 RabbitMQ 安装目录下的 sbin 目录下的CMD 输入下面的命令

rabbitmq-plugins.bat enable rabbitmq_management

 安装成功:

默认端口号是 5672,webUI 是 15672

在浏览器输入地址打开管理界面:http://localhost:15672

默认账号密码是 guest

注意:1. 安装目录不能是中文,不能有空格等非法字符,否则页面打不开

           2. 如果想要在远程服务器访问 RabbitMQ 管理面板,需要创建管理员账号,比如在宝塔面板使用时宝塔面板提供的 admin账号,地址就是宝塔面板的 IP 

创建账号:access-control | RabbitMQ

入门

依赖引入

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>

单消费者和生产者

一对一的关系

1. 生产者代码

public class SingleProducer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//创建连接ConnectionFactory factory = new ConnectionFactory();//设置了本地连接,如果修改了用户名和密码,需要设置/*factory.setPassword();factory.setUsername();*/factory.setHost("localhost");//建立连接、创建频道//频道,类似客户端,用于调用serverConnection connection = factory.newConnection();Channel channel = connection.createChannel();//创建队列,队列持久化,第二份参数设置为 truechannel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";//发送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}

channel 频道:理解为操作消息队列的 Client,通过 channel 收发消息,提供了和消息对了 server 建立通信的传输方法

channel.queueDeclare 方法参数:

queue:这是一个字符串参数,代表要声明的队列的名称。如果队列不存在,则会自动创建一个新的队列。

durable:这是一个布尔值参数,表示队列是否持久化。如果设置为true,则队列会在服务器重启后仍然存在;如果设置为false,则队列在服务器重启后会被删除。默认值为false。

exclusive:这也是一个布尔值参数,表示队列是否为独占模式。如果设置为true,则只有当前连接可以访问该队列;如果设置为false,则其他连接也可以访问该队列。默认值为false。

autoDelete:这是另一个布尔值参数,表示队列是否自动删除。如果设置为true,则当最后一个消费者取消订阅时,队列将被删除;如果设置为false,则队列将一直存在,直到手动删除或服务器重启。默认值为false。

arguments:这是一个可选参数,用于设置队列的其他属性,比如消息的最大长度、最大优先级等。

channel.basicPublish 参数:

exchange:这是一个字符串参数,代表交换机的名称。如果不需要使用特定的交换机,可以传递一个空字符串("")。交换机是RabbitMQ中用于接收生产者发送的消息并根据绑定规则路由到队列的组件。

routingKey:这也是一个字符串参数,它指定了发布消息的队列。无论通道绑定到哪个队列,最终发布的消息都会包含这个指定的路由键。路由键是用来确定消息应该发送到哪个队列的重要信息。

message:这是要发布的消息本身,通常是字节数组的形式。

properties:这是一个可选参数,用于设置消息的属性,比如消息的优先级、过期时间等。

在使用channel.basicPublish时,需要注意以下几点:

exchange和routingKey不能为空:在AMQImpl类中的实现要求这两个参数都不能为null,否则会抛出异常。

交换机类型:根据不同的需求,可以选择不同类型的交换机,如fanout、direct或topic。每种类型的交换机都有其特定的路由规则。

非命名队列:在某些情况下,比如日志系统,可以使用非命名队列,这样消费者可以接收到所有相关的日志消息,而不是特定的部分。

2. 消费者代码

public class SingleConsumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明队列,同一个消息队列参数必须一致channel.queueDeclare(QUEUE_NAME, false, false, false, null);//定义了如何处理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};//接收、消费消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}

 channel.basicConsume 参数:

  1. queue:这是一个字符串参数,代表要消费的队列的名称。如果队列不存在,则会抛出异常。
  2. onMessage:这是一个回调函数,当有新的消息到达时会被调用。该函数需要接收两个参数:一个表示消息内容的Delivery对象和一个表示通道的Channel对象。
  3. consumerTag:这是一个可选参数,用于标识消费者。如果没有指定,则会自动生成一个唯一的标识符。
  4. autoAck:这是一个布尔值参数,表示是否自动确认消息。如果设置为true,则在消息被处理后会自动发送确认信息;如果设置为false,则需要手动发送确认信息。默认值为false。
  5. arguments:这是一个可选参数,用于设置消费者的其他属性,比如消息的最大长度、最大优先级等。

在使用channel.basicConsume时,需要注意以下几点:

  1. 队列名称:队列名称应该是唯一的,否则会抛出异常。
  2. 消息处理:在onMessage回调函数中,需要对消息进行处理,并根据需要发送确认信息。
  3. 消费者标识符:可以通过设置consumerTag来标识消费者,以便在后续操作中进行识别和管理。
  4. 消费者属性:可以通过设置消费者的其他属性来控制消费者的行为,比如设置消息的最大长度、最大优先级等。

多消费者

多个消费者,比如一个工厂生产商品,一个商店卖不完,分给多个商店一起卖

生产者代码和上面一样


public class MultiProducer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//创建连接ConnectionFactory factory = new ConnectionFactory();//设置本地连接factory.setHost("localhost");//创建队列,创建频道,类似客户端try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//队列持久化channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);//设置消息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){//输入消息String message = scanner.nextLine();//发送消息channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}

控制处理任务的积压数,最多同时处理任务数

channel.basicQos(1); //最多处理1个

消息确认机制

ack 确认、nack 消息失败、reject 拒绝

当消息拿走之后会有一个确认机制,保证消息成功被消费。当消费者接收消息会给一个反馈,确认消息的状态,成功消息才会被移除。

支持配置 autoack ,建议修改为 false,根据实际情况手动确认。

//手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
//手动拒绝
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);

 消费者代码

public class MultiConsumer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");for (int i = 0; i < 2; i++) {final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();//队列持久化,参数要一致channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//控制处理任务的积压数,最多同时处理任务数channel.basicQos(1);//定义了如何处理消息int finalI = i;DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {//处理工作的逻辑System.out.println(" [x] Received '" +"消费者:" + finalI + " 消息:"+ message + "'");//睡一定时间,模拟机器处理能力有限Thread.sleep(20000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//接收消息,消费消息,开启消息监听channel.basicConsume(TASK_QUEUE_NAME, false , deliverCallback, consumerTag -> {});}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/14481.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

leetcode124 二叉树中的最大路径和-dp

题目 二叉树中的 路径 被定义为一条节点序列&#xff0c;序列中每对相邻节点之间都存在一条边。同一个节点在一条路径序列中 至多出现一次 。该路径 至少包含一个 节点&#xff0c;且不一定经过根节点。 路径和 是路径中各节点值的总和。 给你一个二叉树的根节点 root &…

【Crypto】Rabbit

文章目录 一、Rabbit解题感悟 一、Rabbit 题目提示很明显是Rabbit加密&#xff0c;直接解 小小flag&#xff0c;拿下&#xff01; 解题感悟 提示的太明显了

redis核心面试题二(实战优化)

文章目录 10. redis配置mysql实战优化[重要]11. redis之缓存击穿、缓存穿透、缓存雪崩12. redis实现分布式session 10. redis配置mysql实战优化[重要] // 最初实现OverrideTransactionalpublic Product createProduct(Product product) {productRepo.saveAndFlush(product);je…

MQTT 5.0 报文解析 05:DISCONNECT

欢迎阅读 MQTT 5.0 报文系列 的第五篇文章。在上一篇中&#xff0c;我们已经介绍了 MQTT 5.0 的 PINGREQ 和 PINGRESP 报文。现在&#xff0c;我们将介绍下一个控制报文&#xff1a;DISCONNECT。 在 MQTT 中&#xff0c;客户端和服务端可以在断开网络连接前向对端发送一个 DIS…

手把手教你搭建一个花店小程序商城

如果你是一位花店店主&#xff0c;想要为你的生意搭建一个精美的小程序商城&#xff0c;以下是你将遵循的五个步骤。 步骤1&#xff1a;登录乔拓云平台进入后台 首先&#xff0c;你需要登录乔拓云平台的后台管理页面。你可以在电脑或移动设备上的浏览器中输入乔拓云的官方网站…

2024.5.26 机器学习周报

目录 引言 Abstract 文献阅读 1、题目 2、引言 3、创新点 4、Motivation 5、naive Lite-HRNet 6、Lite-HRNet 7、实验 深度学习 解读SAM(Segment Anything Model) 1、SAM Task 2、SAM Model 2.1、Patch Embedding 2.2、Positiona Embedding 2.3、Transformer …

互联网医院开发:引领智慧医疗新时代

随着科技的迅猛发展和互联网的普及&#xff0c;传统医疗模式正在迎来一场深刻的变革。互联网医院的崛起&#xff0c;打破了时间和空间的限制&#xff0c;为患者和医疗机构带来了更加便捷、高效、安全的医疗服务体验。本文将从技术角度深入探讨互联网医院的开发&#xff0c;包括…

多线程(八)

一、wait和notify 等待 通知 机制 和join的用途类似,多个线程之间随机调度,引入 wait notify 就是为了能够从应用层面上,干预到多个不同线程代码的执行顺序.( 这里说的干预,不是影响系统的线程调度策略 内核里的线程调度,仍然是无序的. 相当于是在应用程序…

Pod容器资源限制和探针

目录 一、资源限制 1.Pod和容器的资源请求和限制 2.CPU 资源单位 案例一 案例二 二、健康检查&#xff0c;又称为探针&#xff08;Probe&#xff09; 1.探针的三种规则 2.Probe支持三种检查方法 3.探测获得的三种结果 案例一&#xff1a;exec 案例二&#xff1a;htt…

OneMO同行 心级服务:中移物联OneMO模组助力客户终端寒冷环境下的稳定运行

中移物联OneMO模组以客户为中心&#xff0c;基于中国移动心级服务要求&#xff0c;开展“OneMO同行 心级服务 标定一流”高标服务主题活动&#xff0c;升级“服务内容““服务方式”和“服务意识”&#xff0c;为行业客户提供全新的服务体验。 近日&#xff0c;某车载监控设备…

ACM实训第十七天

Is It A Tree? 问题 考试时应该做不出来&#xff0c;果断放弃 树是一种众所周知的数据结构&#xff0c;它要么是空的(null, void, nothing)&#xff0c;要么是一个或的集合满足以下属性的节点之间有向边连接的节点较多。 •只有一个节点&#xff0c;称为根节点&#xff0c;它…

【Crypto】摩丝

文章目录 一、摩斯解题感悟 一、摩斯 很明显莫尔斯密码 iloveyou还挺浪漫 小小flag&#xff0c;拿下 解题感悟 莫尔斯密码这种题还是比较明显的

智能锁千千万,谁是你的NO.1,亲身实测凯迪仕传奇大师K70旗舰新品

智能锁千千万&#xff0c;谁是你的NO.1。欢迎来到智哪儿评测室&#xff0c;这次我们为大家带来了凯迪仕传奇大师K70系列的一款重磅新品。 在科技的浪潮中&#xff0c;家居安全领域正经历着前所未有的变革。智能锁越来越成为家的安全守护神&#xff0c;以及智能生活的得力助手。…

Android 11 Framework实时监听Activity堆栈变化

核心类 Framework中有一个类SystemActivityMonitoringService专门用于监控Activity堆栈变化&#xff0c;属于隐藏Api&#xff0c;应用侧无法调用。此类位于 packages/services/Car/service/src/com/android/car/SystemActivityMonitoringService.java 方法 void registerTa…

大数据Hive中的UDF:自定义数据处理的利器(下)

在上一篇文章中&#xff0c;我们对第一种用户定义函数&#xff08;UDF&#xff09;进行了基础介绍。接下来&#xff0c;本文将带您深入了解剩余的两种UDF函数类型。 文章目录 1. UDAF1.1 简单UDAF1.2 通用UDAF 2. UDTF3. 总结 1. UDAF 1.1 简单UDAF 第一种方式是 Simple(简单…

每日一题《leetcode--382.链表随机结点》

https://leetcode.cn/problems/linked-list-random-node/ 这道题我们首先看到题目中的要求&#xff1a;在单链表中随机选取一个链表中的结点&#xff0c;要使每个结点被选取的概率是一样的。 当我们看到随机这两个字时&#xff0c;应该就会想起rand()这个函数。接着我们把使用这…

自己搭建内网穿透

本文介绍使用最新版frp搭建内网穿透&#xff0c;最新版本的frp在配置上与之前有很大不同&#xff0c;需要使用.toml文件进行配置。其中主要问题出现在toml文件内部。 一、云服务器配置 下载frp sudo apt update sudo apt install wget wget https://github.com/fatedier/frp…

求出这行英文中最后一个单词的长度

【题目描述】蓝宝看到了一行奇怪的英文&#xff0c;这行英文由若干单词组成&#xff0c;每个单词前后用一些字符*隔开请帮助蓝宝求出这行英文中最后一个单词的长度。【输入格式】 输入一行&#xff0c;就就是蓝宝看到的奇怪的英文。 【输出格式】 输出一行&#xff0c;是个整数…

文旅3d仿真数字人形象为游客提供全方位的便捷服务

在AI人工智能与VR虚拟现实技术的双重驱动下&#xff0c;文旅3D数字代言人正以其独特的魅力&#xff0c;频频亮相于各类文旅场景&#xff0c;为游客带来前所未有的个性化服务体验。他们不仅有趣有品&#xff0c;更能言善道&#xff0c;成为文旅业数字化发展的新亮点。 这些文旅3…

我的文章分类合集目录

文章目录 Java相关基础常规问题类Docker类RabbitMQ类分库分表 网络工程相关路由交换、Cisco Packet TracerIP地址 前端相关数据库 Java相关 基础 Java开发规范、项目开发流程 SpringBoot整合MyBatis实现增删改查(简单,详细) SpringBoot整合MybatisPlus&#xff08;详细&#…