(十三)RabbitMQ使用详解

RabbitMQ是基于AMQP的一款消息管理系统。AMQP(Advanced Message Queuing Protocol),是一个提供消息服务的应用层标准高级消息队列协议,其中RabbitMQ就是基于这种协议的一种实现。

常见mq:

  • ActiveMQ:基于JMS
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
  • Kafka:分布式消息系统,高吞吐量

Java Client

生产者和消费者都属于客户端, rabbitMQ的java客户端如下

图片

创建 maven 工程

 

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version>
</dependency>

AMQP协议的回顾

图片

1 消息模型

RabbitMq有5种常用的消息模型

1.1 基本消息模型

这是最简单的消息模型,如下图:

生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

再演示代码之前,我们先创建一个工程rabbitmq-demo,并编写一个工具类,用于提供与mq服务创建连接

public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {//定义连接工厂ConnectionFactory factory = new ConnectionFactory();//设置服务地址factory.setHost("192.168.18.130");//端口factory.setPort(5672);//设置账号信息,用户名、密码、vhostfactory.setUsername("admin");factory.setPassword("admin");// 通过工程获取连接Connection connection = factory.newConnection();return connection;}
}

生产者发送消息

接下来是生产者发送消息,其过程包括:1.与mq服务建立连接,2.建立通道,3.声明队列(有相同队列则不创建,没有则创建),4.发送消息,代码如下:

public class Send {private static final String QUEUE_NAME = "basic_queue";public static void main(String[] args) throws Exception {//消息发送端与mq服务创建连接Connection connection = ConnectionUtil.getConnection();//建立通道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "hello world";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("生产者已发送:" + message);channel.close();connection.close();}
}

消费者接受消息

消费者在接收消息的过程需要经历如下几个步骤: 1.与mqfuwu建立连接,2.建立通道,3.声明队列,4,接收消息,代码如下:

public class Consumer1 {private static final String QUEUE_NAME = "basic_queue";public static void main(String[] args) throws Exception {//消息消费者与mq服务建立连接Connection connection = ConnectionUtil.getConnection();//建立通道Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println("消费者1接收到消息:" + msg);}};channel.basicConsume(QUEUE_NAME, true, consumer);}
}

消息的接收与消费使用都需要在一个匿名内部类DefaultConsumer中完成

注意:队列需要提前声明,如果未声明就使用队列,则会报错。如果不清楚生产者和消费者谁先声明,为了保证不报错,生产者和消费者都声明队列,队列的创建会保证幂等性,也就是说生产者和消费者都声明同一个队列,则只会创建一个队列

1.2 Work Queues工作队列模型

在基本消息模型中,一个生产者对应一个消费者,而实际生产过程中,往往消息生产会发送很多条消息,如果消费者只有一个的话效率就会很低,因此rabbitmq有另外一种消息模型,这种模型下,一个生产发送消息到队列,允许有多个消费者接收消息,但是一条消息只会被一个消费者获取。

生产者发送消息

与基本消息模型基本一致,这里测试循环发布20条消息:

public class Send {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 循环发布任务for (int i = 1; i <= 20; i++) {// 消息内容String message = "task .. " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("生产者发送消息:" + message);Thread.sleep(500);}channel.close();connection.close();}
}

消费者1

public class Consumer1 {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body);System.out.println("消费者1接收到消息:" + msg);try {Thread.sleep(50);//模拟消费耗时} catch (InterruptedException e) {e.printStackTrace();}}};channel.basicConsume(QUEUE_NAME, true, consumer);}
}

消费者2

public class Consumer2 {private static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String msg = new String(body);System.out.println("消费者2接收到消息:" + msg);try {Thread.sleep(50);//模拟消费耗时} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, true, consumer);}
}

此时有两个消费者监听同一个队列,当两个消费者都工作时,生成者发送消息,就会按照负载均衡算法分配给不同消费者,如下图:

1.3 订阅模型

在之前的模型中,一条消息只能被一个消费者获取,而在订阅模式中,可以实现一条消息被多个消费者获取。在这种模型下,消息传递过程中比之前多了一个exchange交换机,生产者不是直接发送消息到队列,而是先发送给交换机,经由交换机分配到不同的队列,而每个消费者都有自己的队列:

解读:

1、1个生产者,多个消费者

2、每一个消费者都有自己的一个队列

3、生产者没有将消息直接发送到队列,而是发送到了交换机

4、每个队列都要绑定到交换机

5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的

X(exchange)交换机的类型有以下几种:

Fanout:广播,交换机将消息发送到所有与之绑定的队列中去Direct:定向,交换机按照指定的Routing Key发送到匹配的队列中去Topics:通配符,与Direct大致相同,不同在于Routing Key可以根据通配符进行匹配

注意:在发布订阅模型中,生产者只负责发消息到交换机,至于消息该怎么发,以及发送到哪个队列,生产者都不负责。一般由消费者创建队列,并且绑定到交换机

订阅模型之Fanout

在广播模式下,消息发送的流程如下:

  1. 可以有多个消费者,每个消费者都有自己的队列
  2. 每个队列都要与exchange绑定
  3. 生产者发送消息到exchange
  4. exchange将消息把消息发送到所有绑定的队列中去
  5. 消费者从各自的队列中获取消息

生产者发送消息

public class Send {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange,指定类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String message = "hello world";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println("生产者发送消息:" + message);channel.close();connection.close();}
}

消费者

public class Consumer1 {private static final String QUEUE_NAME = "fanout_queue_1";private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//消费者声明自己的队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 声明exchange,指定类型为directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//消费者将队列与交换机进行绑定channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{String msg = new String(body);System.out.println("消费者1获取到消息:" + msg);}});}
}

其他消费者只需修改QUEUE_NAME即可

注意:exchange与队列一样都需要提前声明,如果未声明就使用交换机,则会报错。如果不清楚生产者和消费者谁先声明,为了保证不报错,生产者和消费者都声明交换机,同样的,交换机的创建也会保证幂等性。

订阅模型之Direct

在fanout模型中,生产者发布消息,所有消费者都可以获取所有消息。在路由模式(Direct)中,可以实现不同的消息被不同的队列消费,在Direct模式下,交换机不再将消息发送给所有绑定的队列,而是根据Routing Key将消息发送到指定的队列,队列在与交换机绑定时会设定一个Routing Key,而生产者发送的消息时也需要携带一个Routing Key。

如图所示,消费者C1的队列与交换机绑定时设置的Routing Key是“error”, 而C2的队列与交换机绑定时设置的Routing Key包括三个:“info”,“error”,“warning”,假如生产者发送一条消息到交换机,并设置消息的Routing Key为“info”,那么交换机只会将消息发送给C2的队列。

生产者发送消息

public class Send {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 声明exchange,指定类型为directchannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String message = "新增一个订单";//生产者发送消息时,设置消息的Routing Key:"insert"channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());System.out.println("生产者发送消息:" + message);channel.close();connection.close();}
}

消费者1

public class Consumer1 {private static final String QUEUE_NAME = "direct_queue_1";private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//消费者声明自己的队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//消费者将队列与交换机进行绑定,并且设置Routing Key:"insert"channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{String msg = new String(body);System.out.println("消费者1获取到消息:" + msg);}});}
}

其他消费者需要修改队列名QUEUE_NAME和Routing Key,上述生成者发送的消息,消费者1是可以获取到的

发布订阅之Topics

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

     #:匹配一个或多个词*:匹配不多不少恰好1个词

举例:

     audit.#:能够匹配audit.irs.corporate 或者 audit.irsaudit.*:只能匹配audit.irs

Topics生产者代码与Direct大致相同,只不过子声明交换机时,将类型设为BuiltinExchangeType.TOPIC(topic),

消费者代码也与Direct大致相同,也是在声明交换机时设置类型为topic,代码不再演示

Spring AMQP

Spring AMQP是对AMQP的一种封装,目的是能够让我们更简便的使用消息队列,下面介绍一下Spring AMQP在Spring boot中的使用方法

依赖和配置

添加AMQP的启动器:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在application.yml中添加RabbitMQ的地址:

spring:rabbitmq:host: 192.168.18.130username: adminpassword: admin

消费者

消费者需要定义一个类,类中定义监听队列的方法

@Component
public class Listener {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spring.test.queue", durable = "false"),exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),key = "insert"))public void listen(String msg){System.out.println("消费者接受到消息:" + msg);}
}

注解:

@Component:保证监听类被spring扫描到

@RabbitListener:

@RabbitListener包含很多内容,在发布订阅模式中,我们可以使用其中的“QueueBinding[] bindings”,其中QueueBinding底层如下:

其中Queue表示队列,Exchange表示交换机,key表示Routing Key

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spring.test.queue", durable = "false"),exchange = @Exchange(value = "spring.test.exchange", type = ExchangeTypes.DIRECT),key = "insert"))

@Queue会创建队列

@Exchange会创建交换机

@QueueBinding会绑定队列和交换机

生产者发送消息

可以通过注解引入AmqpTemplate:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAMQPTest {@Resourceprivate AmqpTemplate template;@Testpublic void testSendMsg() throws InterruptedException {String message = "hello spring";template.convertAndSend("spring.test.exchange", "insert", message);System.out.println("生产者发送消息:" + message);Thread.sleep(10000);//等待10s,让测试方法延迟结束,防止消费者未来得及获取消息}
}

RabbitMQ如何防止消息丢失

1. 消息确认机制(ACK)

RabbitMQ有一个ACK机制,消费者在接收到消息后会向mq服务发送回执ACK,告知消息已被接收。这种ACK分为两种情况:

  • 自动ACK:消息一旦被接收,消费者会自动发送ACK
  • 手动ACK:消息接收后,不会自动发送ACK,而是需要手动发送ACK

如果消费者没有发送ACK,则消息会一直保留在队列中,等待下次接收。但这里存在一个问题,就是一旦消费者发送了ACK,如果消费者后面宕机,则消息会丢失。因此自动ACK不能保证消费者在接收到消息之后能够正常完成业务功能,因此需要在消息被充分利用之后,手动ACK确认

自动ACK,basicConsume方法中将autoAck参数设为true即可:

手动ack,在匿名内部类中,手动发送ACK:

当然,如果设置了手动ack,但又不手动发送ACK确认,消息会一直停留在队列中,可能造成消息的重复获取

2. 持久化

消息确认机制(ACK)能够保证消费者不丢失消息,但假如消费者在获取消息之前mq服务宕机,则消息也会丢失,因此要保证消息在服务端不丢失,则需要将消息进行持久化。队列、交换机、消息都要持久化。

队列持久化

exchange持久化

消息持久化

3. 生产者确认

生成者在发送消息过程中也可能出现错误或者网络延迟灯故障,导致消息未成功发送到交换机或者队列,或重复发送消息,为了解决这个问题,rabbitmq中有多个解决办法:

事务:

用事务将消息发送代码包围起来:

Confirm模式:

如下所示,在发送代码前执行channel.confirmSelect(),如果消息未正常发送,就会进入if代码块,可以进行重发也可以对失败消息进行记录

异步confirm方法:

顾名思义,就是生产者发送消息后不用等待服务端回馈发送状态,可以继续执行后面的代码,对于失败消息重发进行异步处理:

Spring AMQP中添加配置:

生产者确认机制,确保消息正确发送,如果发送失败会有错误回执,从而触发重试

spring:rabbitmq:publisher-confirms: true

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322342.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

在ASP.NET Core Web API下事件驱动型架构的实现&#xff08;一&#xff09;&#xff1a;一个简单的实现中&#xff0c;我介绍了事件驱动型架构的一种简单的实现&#xff0c;并演示了一个完整的事件派发、订阅和处理的流程。这种实现太简单了&#xff0c;百十行代码就展示了一个…

和某ZYC巨佬和XXY巨佬的随机挑战2总结

前言 一切的起点在那个炎热的酷暑&#xff0c;菜的一批的WYCWYCWYC坐在最容易被∗*∗的左下角。这时他永远都想不到&#xff0c;他与巨佬之间的挑战&#xff0c;即将开始。 正题 规则 随机跳333到蓝题&#xff0c;然后写完。 完成记录 题目博客 T1:P3100−[USACO14JAN]T1:P31…

(十四)消息中间件MQ详解及四大MQ比较

一、消息中间件相关知识 1、概述 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能&#xff0c;成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件&#xff0c;如老牌的ActiveMQ、RabbitMQ&a…

g4e基础篇#3 Git安装与配置

现在你已经对Git有了最基本的了解&#xff0c;现在让我们开始动手开始安装和配置Git环境。Git工具包括Git命令行工具&#xff0c;图形化工具和服务器环境&#xff1b;在我们这个教程中&#xff0c;我们会使用以下软件配置我们的环境&#xff1a;• Windows 操作系统&#xff08…

[认证授权] 6.Permission Based Access Control

在前面5篇博客中介绍了OAuth2和OIDC&#xff08;OpenId Connect&#xff09;&#xff0c;其作用是授权和认证。那么当我们得到OAuth2的Access Token或者OIDC的Id Token之后&#xff0c;我们的资源服务如何来验证这些token是否有权限来执行对资源的某一项操作呢&#xff1f;比如…

微软发布PowerShell Core第一个版本:支持多平台开发

微软旗下的PowerShell团队正式宣布推出PowerShell Core 6.0&#xff0c;非常诡异的是这明明是Core的第一个版本&#xff0c;但是却用了一个6.0后缀的版本号。“这是我们对PowerShell做出的最大最重要的改变&#xff01;”微软技术研究员兼PowerShell创始人Jeffrey Snover在Twit…

.NET Core单文件发布静态编译AOT CoreRT

.NET Core单文件发布静态编译AOT CoreRT&#xff0c;将.NET Core应用打包成一个可执行文件并包含运行时。支持Windows, MacOS and Linux x64 w/ RyuJIT codegen。示例项目&#xff1a;https://github.com/dotnet/corert/tree/master/samples/WebApi下面来实际体验。首先确保安装…

2019纪中暑假游记+总结

Travels总篇\texttt{Travels总篇}Travels总篇 7/4\texttt{7/4}7/4 下午才去纪中&#xff0c;早上就一大早和同学出去玩&#xff0c;看了蜘蛛侠然后到3点多才出发。 因为走南沙大桥所以很快就到了(具体有多快忘了&#xff0c;反正路上一点都不塞车)。就愉快的去整理宿舍洗个早…

使用xUnit为.net core程序进行单元测试(上)

一. 导读为什么要编写自动化测试程序&#xff08;Automated Tests&#xff09;&#xff1f;可以频繁的进行测试可以在任何时间进行测试&#xff0c;也可以按计划定时进行&#xff0c;例如&#xff1a;可以在半夜进行自动测试。肯定比人工测试要快。可以更快速的发现错误。基本上…

select2删除选中项,allowClear设置

转载自 select2删除选中项&#xff0c;allowClear设置 在使用select2过程中&#xff0c;有时候需要删除我们选中的选项&#xff0c;如下图&#xff1a; 这时候就需要设置select2的allowClear属性了。 有两种方法&#xff1a; 第一种&#xff1a; 直接用select2定义的一个c…

LeetCode算法总结-回溯法与深度优先搜索

转载自 LeetCode算法总结-回溯法与深度优先搜索 回溯法&#xff08;探索与回溯法&#xff09;是一种选优搜索法&#xff0c;又称为试探法&#xff0c;按选优条件向前搜索&#xff0c;以达到目标。但当探索到某一步时&#xff0c;发现原先选择并不优或达不到目标&#xff0c;就…

入门干货之用DVG打造你的项目主页-Docfx、Vs、Github

由于这三项技术涉及到的要点以及内容较多&#xff0c;希望大家有空能自己挖掘一下更多更深的用法。0x01、介绍VS&#xff0c;即VS2017以及以上版本&#xff0c;宇宙最好的IDE&#xff0c;集成了宇宙最有前景的平台&#xff0c;前阶段也支持了宇宙最好的语言。Github&#xff0c…

ASP.NET Core中使用IOC三部曲(一.使用ASP.NET Core自带的IOC容器)

前言本文主要是详解一下在ASP.NET Core中,自带的IOC容器相关的使用方式和注入类型的生命周期.这里就不详细的赘述IOC是什么 以及DI是什么了.. emm..不懂的可以自行百度.正文今天我们主要讲讲如何使用自带IOC容器,emm..虽然自带的功能不是那么强大,但是胜在轻量级..而且..不用引…

P4130,jzoj1214-[NOI2007]项链工厂【线段树】

正题 题目链接:https://www.luogu.org/problemnew/show/P4130 题目大意 一个环形颜色珠子链&#xff0c;位置(注意不是上面的珠子)从最上顺时针下来位置依次标号1∼n1\sim n1∼n。 然后要求支持以下操作 Rk:R\ k:R k:将所有珠子顺时针旋转kkk个。F:F:F:将所有珠子以111向下翻…

LeetCode常用算法模式大厂面试题整理

转载自 LeetCode常用算法模式&大厂面试题整理 文章目录 1、滑动窗口 2、双指针 3、快慢指针 4、合并区间 5、循环排序 6、就地反转链表 7、堆-优先队列问题 8、Top K 9、归并 10、单调栈 11、回溯法 BATJ等大厂面试真题汇总 1、滑动窗口 1 一个左指针&#xff0c;一个右…

ABPZero系列教程之拼多多卖家工具

此系列文章围绕着拼多多卖家工具来介绍ABPZero的使用&#xff0c;内容包括手机登录、手机注册、拼团提醒、微信公众号绑定帐号、有拼团发送消息到微信公众号&#xff08;只要关注过微信公众号并已绑定系统帐号&#xff09;。学习此系列必备&#xff1a;手机验证码&#xff1a;使…

g4e基础篇#4 了解Git存储库(Repo)

Git 存储库看上去就是一个文件夹&#xff0c;只是在这个文件夹中不仅仅保存了所有文件的当前版本&#xff0c;也同时保存了所有的历史记录&#xff0c;这些额外的信息都保存在当前文件夹下面的.git子目录中。因为前面我们所描述的git跟踪改动的特殊方式 &#xff0c;git可以在很…

net的retrofit--WebApiClient库

# 库简介WebApiClient是开源在github上的一个httpClient客户端库&#xff0c;内部基于HttpClient开发&#xff0c;是一个只需要定义c#接口(interface)&#xff0c;并打上相关特性&#xff0c;即可异步调用http-api的框架 &#xff0c;支持.net framework4.5、netcoreapp2.0和ne…

Sentinel(一)之简介

转载自 Sentinel: 分布式系统的流量防卫兵 Sentinel 是什么&#xff1f; 随着微服务的流行&#xff0c;服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式服务架构的流量控制组件&#xff0c;主要以流量为切入点&#xff0c;从限流、流量整形、熔断降级、系统负…