RabbitMQ 开发指南

连接RabbitMQ

连接方式一:
在这里插入图片描述
也可以选择使用URI的方式来实现

连接方式二:
在这里插入图片描述
Connection接口被用来创建一个Channel,在创建之后,Channel可以用来发送或者接收消息。

Channel channel = conn.createChannel();

使用交换器和队列

声明一个交换器和队列

channel.exchangeDeclare(exchangeName,"direct",true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,exchangeName,routingKey);

上面创建了一个持久化的、非自动删除的、绑定类型为direct的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(队列名称由RabbitMQ自动生成),这里的交换器和队列都没有设置特殊的参数。

上面声明的队列具备如下特性:只对当前应用中同一个Connection层面可用,同一个Connection的不同Channel可共用,并且也会在应用连接断开时自动删除。

如果要在应用中共享一个队列,可做如下声明:

channel.exchangeDeclare(exchangeName,"direct",true);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);

这里的队列被声明为持久化的、非排他的、非自动删除的,而且也被分配给另一个确定的已知名称。

exchangeDeclare方法详解

exchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。

Exchange.DeclareOK exchangeDeclare(String exchange,String type,
boolean durable,boolean autoDelete,boolean internal,Map<String,Object> arguments)throws Exception;
  • exchange: 交换器名称
  • type: 交换器类型,常见的有fanout、direct、topic
  • durable: 是否持久化,durable设置true表示持久化,持久化后可以将交换器存盘。
  • autoDelete:是否自动删除,表示一旦这个交换器上没有任何绑定(即没有任何队列与其相连),RabbitMQ会自动删除这个交换机。这对于临时的、生命周期与某些特定队列或应用程序密切相关的交换机非常有用。
  • internal :如果为true,表明是内置的交换器,客户端程序无法直接发送消息到这个交换器,只能通过交换器路由到交换器这种方式。
  • argument:其他一些结构化参数,比如alternate-exchange。

此外,其他声明交换机的方法

  • exchangeDeclareNoWait: 表示在声明exchange时候,不需要服务器返回。这可能会导致一种情况,在声明完一个交换器之后(实际服务器还并未完成交换器的创建),客户端紧接着使用这个交换器,必然会发生异常。
  • exchangeDeclarePassive:可以用来检测交换机是否存在,如果存在则正常返回,如果不存在则抛出异常。

queueDeclare方法详解

queueDeclare方法,只有如下两个重载方法

1. Queue.DeclareOK queueDeclare() throws IOException;
2. Queue.DeclareOK queueDeclare(String queue,boolean durable,
boolean exclusive,boolean autoDelete,Map<String,Object> arguments)
throws IOException;

不带任何参数的queueDeclare方法默认创建一个由RabbitMQ 自动生成名称 的、排他的自动删除非持久化 的队列。

方法参数详解

  • queue:队列名称
  • durable:设置是否持久化,持久化的队列会存盘,在服务器重启的时候保证不丢失消息。
  • exclusive:排他队列
  • autoDelete:自动删除,当队列中没有任何活跃的消费者,RabbitMQ会在一段时间后自动删除该队列。当一个队列删除时,其上持久化的消息也会随之被删除。
  • arguments : 设置队列的其他参数

排他队列的特点和行为

  • 独占性:一旦某个连接声明了一个排他队列,任何其他连接都无法访问或者声明同名的队列。
  • 自动删除:当声明排他队列的连接关闭时,RabbitMQ会自动删除这个队列,即使队列被声明为持久化的。
    同一连接通道共享:虽然排他队列对其他连接不可见,但同一连接内的不同通道可以共享访问这个排他队列。
  • 排他队列常用于那些希望队列仅被当前线程或应用实例使用的场景。

注意:生产者和消费者都能够使用queueDeclare声明一个队列,如果消费者在同一个信道上订阅了另一个队列,就无法在声明队列了一个消费者只能订阅一个队列)。必须先取消订阅,然后将channel设置为“传输模式”,之后才能声明队列。

交换机持久化和队列持久化的区别

  • 交换机持久化主要关注的是交换机的配置和元数据的长期存储,确保重启后配置不变。
  • 队列持久化关注队列自身及其消息的长期存储,需要结合消息的持久化设置来防止消息丢失。

queueBind方法详解

queueBind方法如下

1. Queue.BindOK queueBind(String queue,String exchange,String routingKey) 
throws IOException;2. Queue.BindOK queueBind(String queue,String exchange,String routingKey,
Map<String,Object> arguments) throws IOException;3. void queueBindNoWait(String queue,String exchange,String routingKey,
Map<String,Object> arguments) throws IOException;

参数详解

  • queue:队列名称
  • exchange:交换机名称
  • routingKey:用来绑定队列和交换机的路由键
  • argument:定义绑定的一些参数

exchangeBind方法详解

exchangeBind方法如下

 1. Exchange.BindOK exchangeBind(String destination,String source,String routingKey)throws IOException;2. Exchange.BindOK exchangeBind(String destination,String source,String routingKey,
Map<String,Object> arguments) throws IOException;3. Exchange.BindOK exchangeBindNoWait(String destination,String source,
String routingKey,Map<String,Object> arguments) throws IOException;

绑定以后,消息从source交换机转发到destination交换机。某种程度上可以将destination交换机看作一个队列。
在这里插入图片描述

发送消息

如果要发送一个消息,可以使用Channel类的basicPublish方法。

示例:

byte[] messageBodyBytes = "Hello,World!".getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);

对于basicPublish而言,有几个重载方法

1. void basicPublish(String exchange,String routingKey,
BasicProperties props,byte[] body) throws IOException;2.void basicPublish(String exchange,String routingKey,boolean mandatory,
BasicProperties props,byte[] body) throws IOException;3. void basicPublish(String exchange,String routingKey,boolean mandatory,
boolean immediate,BasicProperties props,byte[] body) throws IOException;
  • exchange :交换机的名称,指明消息需要发送到哪个交换机中,如果设置为空,则消息会被发送到RabbitMQ默认的交换机中。
  • routingKey:路由键,交换机根据路由键将消息存储到相应的队列中。
  • props:消息的基本属性集,包括contentType、deliveryMode等
  • byte[] body: 真正要发送的消息内容

消费消息

消费模式分为两种:Push模式和Pull模式。推模式采用Basic.Consume进行消费,拉模式采用Basic.Get进行消费。

推模式

当调用Consumer相关API方法时,不同的订阅采用不同的消费者标签(consumerTag)来区分彼此,在同一个Channel中的消费者也需要通过唯一的消费者标签以作区分,关键消费代码如下所示

在这里插入图片描述
对于消费者来说,显示的设置autoAck为false,接收消息之后进行显示ack操作,这个设置是非常必要的。可以防止消息不必要的丢失。

核心方法

String basicConsume(String queue,boolean autoack,String consumerTag,
boolean noLocal,boolean exclusive,Map<String,Object> arguments,Consumer callback) 
throws IOException;
  • queue: 队列的名称
  • autoAck: 设置是否自动确认
  • consumerTag:消费者标签,用来区分多个消费者
  • noLocal:设置true表示不能将一个Connection中生产者发送的消息发送给这个Connection中的消费者。
  • exclusive:设置是否排他,确保该队列仅对创建他的消费者可见。
  • arguments:设置消费者的其他参数
  • callback:设置消费者的回调函数。

每个Channel都拥有自己独立的线程,最常用的做法是一个Channel对应一个消费者,也就意味着消费者彼此之间没有关联,也可以在Channel中维持多个消费者,但是,如果Channel中一个消费者一直在运行,那其他消费者的callback会被耽搁。
在这里插入图片描述

拉模式

通过channel.basicGet方法可以单条获取消息,其返回值是GetResponse。核心方法如下:

GetResponse basicGet(String queue,boolean autoAck) throws IOException;

如果autoAck为false,那么同样需要调用channel.basicAck来确认消息已经被成功接受。

GetResponse response = channel.basicGet(QueueName,false);System.out.println(new String(response.getBody()));channel.basicAck(response.getEnvelope().getDeliveryTag(),false);

注意:Basic.Consume将信道(Channel)置为接收模式,直到取消队列的订阅为止,接收模式期间,RabbitMQ会不断将消息推送给消费者,推送消息的个数受到Basic.QoS的限制。如果只想从队列里获取单条消息而不是持续订阅,建议使用Basic.Get进行消费。但是不能将Basic.Get放在一个循环里代替Basic.Consume,这样做会严重影响MQ性能。
在这里插入图片描述

消费端的确认和拒绝

消息确认

RabbitMQ为了保证消息从队列可靠的到达消费者,提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显示的回复确认信号后才从内存(或者磁盘)中移除消息(即使消息配置了持久化,在被ack以后仍然要被删除)。当autoAck等于true时,RabbitMQ会自动把发送出去的消息设置为确认,然后从内存中删除,而不管消费者是否真正消费这些消息。

把消息确认设置为false,消费者就有足够的时间处理消息,不用担心处理过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待持有消息直到消费者显示调用Basic.Ack命令为止。

当autoAck参数置为false,对于MQ服务端而言,队列中的消息分成了两个部分:

  1. 等待投递给消费者的消息
  2. 已经投递给消费者,但是还没有收到消费者确认信号的消息:如果一直没有收到确认信号,消费此消息的消费者已经断开连接,则MQ会重新安排该消息进入队列,等待投递给下一个消费者。

MQ不会为未确认的消息设置过期时间,他判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。如此设计,是因为RabbitMQ允许消费者消费一条消息的时间可以很久很久。

消息拒绝

Channel类中的basicReject方法定义如下

void basicReject(long deliveryTag,boolean requeue) throws IOException;

其中,deliveryTag可以看作消息的编号,如果requeue为true,则RabbitMQ会重新将这条消息存入队列,以便发送给下一个订阅的消费者。

如果想要批量拒绝消息,可以使用Basic.Nack这个命令。

void basicNack(long deliveryTag,boolean multiple,boolean requeue) throws IOException;

multiple参数为false表示只拒绝编号为deliveryTag的这条消息,如果设置为true,表示拒绝编号deliveryTag之前所有未被消费者确认的消息。

关闭连接

在应用程序使用完毕之后,需要关闭连接,释放资源

channel.close();
connection.close();

AMQP协议中connection和channel采用同样的方式来管理网络失败、内部错误和显示关闭连接。connection和channel所具备的生命周期如下:

  • Open:开启状态,代表当前对象可以使用
  • Closing:正在关闭状态,当前对象被显示通知调用关闭方法,这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成
  • Closed:已经关闭,当前对象已经接收到所有内部对象以完成关闭动作的通知,并且其也关闭了自身。

与关闭操作相关的方法

  • addShutdownListener(ShutdownListener listener): 当connection或者channel状态转变为closed的时候调用ShutdownListener,而如果将一个ShutdownListener注册到一个已经处于Closed状态的对象(特指Connection或者Channel对象),会立刻调用ShutdownListener。
  • removeShutdownListener(ShutdownListener listener)
  • getCloseReason: 获取connection或者channel关闭原因
  • isOpen:检测当前对象是否开启
  • close(int closeCode,String closeMessage):显示通知连接执行关闭操作。

代码清单
在这里插入图片描述
当触发ShutdownListener时候,可以获取到ShutdownSingalException,这个信号包含了关闭的原因。

ShutdownSingalException 提供多个方法来分析关闭原因,isHardError方法可以知道是Connection错误还是Channel错误;getReason可以获取Cause相关的信息。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/33416.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

汽车抬头显示器HUD阳光倒灌实验太阳光模拟器

简述 HUD阳光倒灌实验是评估汽车抬头显示器&#xff08;HUD&#xff09;在强烈日照条件下的性能表现的一种测试方法。该实验通过模拟太阳光照射&#xff0c;检测HUD在阳光直射下的显示效果&#xff0c;以确保驾驶者在强烈日照下仍能清晰地看到HUD显示的信息&#xff0c;从而提…

CentOS配置本地yum源

版本说明 操作系统版本&#xff1a;CentOS7.9 虚拟机版本 虚拟机打快照 首先给虚拟机打个快照&#xff0c;点击图下所示位置 命名快照之后&#xff0c;点击拍摄快照 可以参考图下所示进行管理和恢复快照 迁移原有yum源 先进入到/etc/yum.repos.d/ &#xff0c;可以看到有很多…

C++编程(一)C++与C语言的一些区别

文章目录 一、QtCreator基本使用&#xff08;一&#xff09;编码格式&#xff1a;&#xff08;二&#xff09;C编程1. 文件后缀2. 编译3. 头文件 二、名字空间&#xff08;一&#xff09;概念以及访问方式1. 概念2. 访问方式&#xff08;1&#xff09;通过作用域限定符进行访问…

手写方法实现字符串例如:“123“与整型例如:123相互转化(面试必会)

目录 二、字符串类型转化为整型 1. 初始化变量 2.定义字符串索引值 3.思考如何将字符1转化为数字1 4. 转化思路 5.考虑字符串转化负数例&#xff1a;-123456 6.完整代码 四、最后 一、前言 在c语言和c中&#xff0c;有许许多多的数据类型相互转化的方法&#xff0c;这里…

【面试题】面试小技巧:如果有人问你 xxx 技术是什么?_面试问你对什么技术特别了解

前端工程越来越大&#xff0c;前面几种方案不能很好的支持单元测试。 在这样的背景下&#xff0c;React 诞生了。React 带来了新的思维模式&#xff0c;UI fn(props)&#xff0c;React 中一个组件就是一个函数或者一个类&#xff0c;一个函数或者一个类就是一个基础单位&…

msvcp120.dll丢失的解决方法,总结几种有效的解决方法

最近&#xff0c;我在使用计算机时遇到了一个问题&#xff0c;系统提示我丢失了msvcp120.dll文件。这让我感到非常困扰&#xff0c;因为这个问题导致我无法正常运行一些程序。经过一番搜索和尝试&#xff0c;我找到了几种修复这个问题的方法&#xff0c;并成功解决了这个问题。…

三人同行乐享模式:社交电商的新趋势

在数字化时代&#xff0c;社交电商正以其独特的优势崭露头角。其中&#xff0c;“三人同行乐享模式”就是一种创新的购物激励机制&#xff0c;它通过消费者的社交互动和分享&#xff0c;不仅促进了产品的销售&#xff0c;更加强了品牌的推广和影响力。 一、模式简介 此模式的核…

RockChip Android12 Settings二级菜单

一:概述 本文将针对Android12 Settings的二级菜单System进行说明。 二:System 1、Activity packages/apps/Settings/AndroidManifest.xml <activityandroid:name=".Settings$SystemDashboardActivity"android:label="@string/header_category_system&quo…

【消息队列】六万字长文详细带你RabbitMQ从入门到精通

目录 1、基础篇1.1 为什么要用消息队列MQ1.2 什么是消息队列&#xff1f;1.3 RabbitMQ体系结构介绍1.4 RabbitMQ安装1.5 Hello World1.5.1 目标1.5.2 具体操作 1.6 RabbitMQ用法1.6.1 Work Queues1.6.2 Publish/Subscribe1.6.3 Routing1.6.4 Topics1.6.5 工作模式小结 2. 进阶篇…

推荐三款必备软件,个个五星好评,你一定不要错过

WiseCare365 WiseCare365是一款由WiseCleaner推出的综合性Windows系统优化和加速工具。它集成了多种功能&#xff0c;旨在帮助用户清理、优化和维护电脑系统&#xff0c;提升电脑性能和安全性。 WiseCare365的主要功能包括&#xff1a; 系统清理&#xff1a;它可以清理各种缓存…

CSC公派|哲学老师赴英国红砖大学访学交流

T老师申报CSC公派访问学者&#xff0c;要求世界排名Top200的英国大学。我们在一个月内先后获得了利物浦大学和兰卡斯特大学的邀请函&#xff0c;这两所高校均位列Top200。最终T老师选择英国红砖高校之一的利物浦大学并申报成功顺利出国。 T老师背景&#xff1a; 申请类型&…

Vue父组件mounted执行完后再执行子组件mounted

// 创建地图实例 this.map new BMap.Map(‘map’) } } ... 现在这样可能会报错&#xff0c;因为父组件中的 map 还没创建成功。必须确保父组件的 map 创建完成&#xff0c;才能使用 this.$parent.map 的方法。 那么&#xff0c;现在的问题是&#xff1a;如何保证父组件 mo…

端到端的全人体关键点检测:手把手实现从YOLOPose到YOLOWhole

目录 一、搭建yolopose平台二、迁移训练任务2.1 任务拓展数据准备训练模型测试训练模型结论To-do list: 1、数据集,COCO-whole, Halpe;下载好; 2、模型搭建,先基于yolov8来检测人体姿态,17个点; 3、迁移任务,17个点,把它拓展到133个点; 4、优化133个点的模型; 一、搭…

深入理解RLHF技术

在《LLM对齐“3H原则”》这篇文章中&#xff0c;我们介绍了LLM与人类对齐的“3H”原则&#xff0c;但是这些对齐标准主要是基于人类认知进行设计的&#xff0c;具有一定的主观性。因此&#xff0c;直接通过优化目标来建模这些对齐标准较为困难。本文将介绍基于人类反馈的强化学…

ONLYOFFICE 8.1:全面升级,PDF编辑与本地化加强版

目录 &#x1f4d8; 前言 &#x1f4df; 一、什么是 ONLYOFFICE 桌面编辑器&#xff1f; &#x1f4df; 二、ONLYOFFICE 8.1版本新增了那些特别的实用模块&#xff1f; 2.1. 轻松编辑器 PDF 文件 2.2. 用幻灯片版式快速修改幻灯片 2.3. 无缝切换文档编辑、审阅和查…

RS-485和RS-422通信的3.3V低功耗收发器MAX3483

描述 国产MAX3485外观和丝印 该MAX3483ESA为15kV ESD保护、3.3V、低功耗收发器&#xff0c;用于RS-485和RS-422通信。 每个设备包含一个驱动器和一个接收器。 该MAX3483ESA具有压摆率限制驱动器&#xff0c;可最大限度地降低 EMI 并减少因端接不当电缆引起的反射&#xff0c;从…

【BSCP系列第2期】XSS攻击的深度剖析和利用(文末送书)

文章目录 前言一、官方地址二、开始&#xff08;15个&#xff09;1&#xff1a;Lab: DOM XSS in document.write sink using source location.search inside a select element2&#xff1a;Lab: DOM XSS in AngularJS expression with angle brackets and double quotes HTML-e…

北邮《计算机网络》MAC子层笔记

文章目录 缩写复习MAC层所在层次动态分配信道算法们的简要介绍信道的五条基本假设多路访问的协议&#xff08;理论上的协议&#xff09;aloha协议CSMA协议其他冲突避免协议无线局域网协议 &#xff0c;MACA 以太网协议802.3&#xff08;实际协议&#xff0c;刚刚是理论&#xf…

小白学python(第一天)

在有了C语言的基础后&#xff0c;我们学python会变得相当容易&#xff0c;毕竟c生万物&#xff0c;废话不多说&#xff0c;直接进入我们的正题 课前准备 Python环境的搭建以及Pycharm的安装 python环境安装 Download Python | Python.org 因为我的电脑是windows&#xff0c;…

C++精解【6】

文章目录 eigenMatrix基础例编译时固定尺寸运行指定大小 OpenCV概述 eigen Matrix 基础 所有矩阵和向量都是Matrix模板类的对象。向量也是矩阵&#xff0c;单行或单列。Matrix模板类6个参数&#xff0c;常用就3个参数&#xff0c;其它3个参数有默认值。 Matrix<typename…