RabbbitMQ基本使用及其五种工作模型

初识MQ

同步通讯和异步通讯

        什么是同步通讯呢?举个例子,你认识了一个小姐姐,聊的很火热,于是你们慢慢开始打电话,视频聊天,这种方式就成为同步通讯,那什么是一部通讯呢,同样的,你认识了多个小姐姐,你和他们进行文字聊天,这时候你一个人可以和多个人聊天,这就是异步通讯。我们之前进行服务间调用时使用的RestTemplaste,Feign就是同步调用。

同步调用的优缺点

        优点:时效性强,可以立即得到回复,就像你打视频一样

        缺点:假如你一个项目中存在很多业务,相互之间进行调用,如果你增加了新的需求,此时因为原本代码是同步调用,代码耦合度很高,于是乎修改代码变得十分繁琐,并且一个业务可能会调用很多服务,只有上一个服务调用完了,才能到下一个服务,等待的过程中就造成了资源浪费,性能下降,如果当前调用的服务失败,还可能会导致级联失败,服务雪崩。


异步调用的优缺点

优点:

1,代码耦合度低,因为异步调用是采取事件驱动来实现,当请求进来之后到达Broker之后Broker通知各自微服务去执行,服务间不在需要相互调用。

2,吞吐量提升,因为异步调用不像同步调用那样每个服务需要等待上游完成调用。

3,故障隔离,服务之间相互不进行调用,即使你挂了也跟我没关系。

4,流量削峰,假如同时又大量请求,但是你的服务处理请求能力是有限的,于是Broker会净请求先拦截,看服务又能力处理多少请求,就拿多少请求,不会一次性全部发布订阅。

缺点:

1,对Broker的依赖十分高,对他的可靠性,安全性,吞吐能力要求很高,万一他挂了......

2,服务之间相互调用关系不清晰,业务没有明显的流程线,代码出问题不容易排查。

所以,需要根据场景来选择同步还是异步,一般大多数需要同步。

什么是MQ?

MQ(MessageQueue),中文是消息队列,也就是存放消息的队列,也就是时间驱动中的Broker.

常见的MQ对比:

我们这里选择RabbitMQ

RabbitMQ安装

RabbitMQ是基于Erlang语言开发的开源消息中间件,因此它具备强大的并发处理能力

官网地址:RabbitMQ: One broker to queue them all | RabbitMQicon-default.png?t=N7T8https://www.rabbitmq.com/

这里我们只用Docker来安装RabbitMQ(快捷方便)

1,拉取RabbitMQ的镜像

docker pull rabbitmq:3-management 

2,运行RabbitMQ容器

docker run \-e RABBITMQ_DEFAULT_USER=你的账户名\-e RABBITMQ_DEFAULT_PASS=你的密码\--name mq \  --hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

注意:账户名和密码是自己定义的,15672是RabbitMQ的管理端端口,5672是RabbitMQ通讯的端口。

3,在浏览器输入IP地址:15672,输入帐号名和密码登录

至此,安装成功

我们可以看到界面有好多目录,具体作用如下

channel:用来操作mq的工具

exchange:路由消息到队列中

queue:缓存消息

virtual host:虚拟主机,是对queue,exchange等资源的逻辑分组

MQ的整体结构:

消息发送者将消息发送到交换机,交换机将其路由到队列,消费者从队列中取走消息

MQ中常见消息模型

大致可以分为两类

第一类:基本消息队列(BasicQueue),工作消息队列(WorkQueue),他们都是最基本的消息队列

第二类:发布订阅(publish,Subscribe)根据交换机类型分为三种

Fanout Exchange(广播),Direct Exchange(路由),Topic Exchange(主题)

  

更多可参考官方文档: 

RabbitMQ Tutorials | RabbitMQ

RabbitMQ入门案例

我们使用RabbitMQ参考官方文档,完成一个hello world案例

引入依赖:

     <!--包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

创建一个测试类publisher,用来发送消息代码如下:

public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.*.*");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("***");factory.setPassword("****");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

Debug执行观察:

1,连接工厂初始化完成之后,他会创建一个连接,连接上RabbitMQ ,这时界面显示如下

2,之后他会创建一个Channel ,用于操作RabbbitMQ

3,之后会创建一个我们定义好的消息队列simple.queue:

4,之后发送消息hello,rabbitmq!

5,完成之后关闭连接,但是消息依旧存在与消息队列中

之后创建一个测试类consumer,用来接受消息代码如下:

public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.*.*");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("****");factory.setPassword("****");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

 跟publisher一样创建工厂,建立连接,这里需要说明的是,之所以还要创建一个队列是因为在实际执行过程中,发布者和消费者又可能执行顺序不一致,所以我们消费者也需要创建一个队列,不过这个队列只会有一个,如果创建了就不再创建。之后消费者接受消息处理,消息队列中消息清空。

根据上述代码我们可以看到官方的demo确实是有带你复杂繁琐,实在是很不友好啊,于是我们使用一种简单的方式来操作RabbitMQ!


SpringAMQP

AMQP(Advanced Message Queuing Protocal):高级消息队列协议,是应用程序之间传递业务消息的开放标准/规范,和语言和平台无关。

SpringAMQP:是基于AMQP协议定义的一套API规范,提供了模板用来发送和接受消息,包含两部分,其中spring-amop是基础抽象,spring-rabbit是底层的默认实现。

SpringAMQP实现基础消息队列功能

消息发送

1,引入依赖

  <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2,在配置文件中配置RabbitMQ的信息

spring:rabbitmq:host: 192.168.121.10 #主机名port: 5672 #端口virtual-host: /  #虚拟主机username: *** #用户名password: **** #密码

3,使用RabbitTemplate来发送消息(spring提供的发送消息的模板)

这里我们编写一个测试类来测试

@SpringBootTest
@RunWith(SpringRunner.class)
public class AMQPTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendMessage(){String queueName="simple.queue";//定义队列名String message="hello SpringAMQP!!";rabbitTemplate.convertAndSend(queueName,message);}
}

 可以看到消息成功发送

消息接收

依旧是先引依赖,添加RabbitMQ相关配置

之后书写监听消息代码,创建一个类,交给spring管理,定义一个方法加@RabbitListener注解指定接受消息的队列,可以传递数组,指定多个队列

@Component
public class ListenerMessage {@RabbitListener(queues = {"simple.queue"})//可以监听多个队列public void ListenerSimpleQueue(String message){System.out.println("接收到的消息是;"+message);}
}

可以看到成功接受消息

 WordQueue 工作队列

Work模型-多个消费者绑定到同一个队列,同一个消息只会被同一个消费者处理

我们来做一个测试,在一秒内发送50条消息,定义两个消费者,同时处理,一个消费者每秒处理50条消息,另一个消费者每秒处理20个消息,按照常理来说应该是多劳多得,也就是处理能力强的处理更多消息,弱的处理更少消息,我们修改之前的代码:

修改之后的消息发送代码:

@Testpublic void sendWorkMessage() throws InterruptedException {String queueName="simple.queue";//定义队列名String message="Work Message__";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName,message+i);Thread.sleep(20);}}

修改之后的消息接收代码:

@Component
public class ListenerMessage {@RabbitListener(queues = {"simple.queue"})//可以监听多个队列public void ListenerWorkSimpleQueue1(String message) throws InterruptedException {System.out.println("consumer1接收到的消息是;"+message+"  时间:"+LocalDateTime.now());Thread.sleep(20);}@RabbitListener(queues = {"simple.queue"})//可以监听多个队列public void ListenerWorkSimpleQueue2(String message) throws InterruptedException {System.err.println("consumer2接收到的消息是;"+message+"  时间:"+LocalDateTime.now());Thread.sleep(200);}
}

代码执行结果显示,consumer1和consumer2处理的消息是一样的并不会向我们设想的那样,能力强的处理多能力弱的处理少,是什么原因导致的呢??

这是由于RabbitMQ的消息预取机制,就是说在消息到达消息队列的时候两个消费者会分别从消息队列中一次性取完所有的消息理论上来说是无上限的,所以我们需要修改机制让消费者一次性例如说取一个消息,等这个消息处理完之后在去取下一个即可,

在配置文件中修改如下:

spring:rabbitmq:host: 192.168.101.100 #主机名port: 5672 #端口virtual-host: /  #虚拟主机username: qmlx #用户名password: QMLX-9999 #密码listener:direct:prefetch: 1  #每次取一个消息,取完之后在取

 发布(Publish)订阅(Subscribe)

发布订阅模式和之前案例的区别就是上述模型一个消息只能发送给一个consumer,而发布订阅模型则是将同一个消息发送给多个消费者,实现方式是假如exchange(交换机)

 

常见的exchange类型包括:

Fanout:广播

Direct:路由

Topic:话题

注意:exchange只会负责消息路由,而不是存储,路由失败则消息丢失

🧇发布订阅-Fanout Exchange

Fanout Exchange他会将接收到的消息路由到每一个与其绑定的queue,广播模式。

实现思路:

1,在consumer服务中,编写一个配置类,声明两个队列,一个交换机,并将交换机绑定到队列上


@Configuration
public class FanoutConfig {@Beanpublic FanoutExchange fanoutExchange(){return  new FanoutExchange("fanout.exchange");}//定义两个队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}//将队列绑定在交换机上面@Beanpublic Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){//按照类型和名称传入参数return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

启动项目之后spring会自动加载交换机和队列

 2,编写consumer代码,监听定义好的两个队列


@Component
public class ListenerMessage {@RabbitListener(queues = {"fanout.queue1"})public void ListenerFanoutQueue1(String message){System.out.println("consumer1接受到的消息是:"+message);}@RabbitListener(queues = {"fanout.queue2"})public void ListenerFanoutQueue2(String message){System.err.println("consumer2接受到的消息是:"+message);}
}

3,编写publisher代码,发送消息

之前消息是发送到队列中,现在消息发送给交换机

    @Testpublic void sendFanoutMessage() throws InterruptedException {String exchangeName="fanout.exchange";//定义队列名String message="FanoutExchange Message!!";rabbitTemplate.convertAndSend(exchangeName,null,message);}

可以看到两个消费者同时接受到了消息!!!

交换机的作用:

1,接受publisher发送的消息

2,将消息按照规则路由发送给每一个与之绑定的队列

3,不能缓存信息,路由失败则消息丢失

🧇发布订阅-Direct Exchange

Direct Exchange:交换机将接收到的消息根据路由规则到指定的Queue,称之为路由模式(routes)

        每一个Queue都和Exchange设定一个BindingKey

        发布者发送消息时,指定消息的BindingKey

        Exchange将消息路由到BindKey和消息的BindingKey一致的队列

实现思路:

1,指定消息接收者绑定交换机和队列,但是需要指定BindingKey

这次直接使用注解实现,不必那莫繁琐

@Component
public class ListenerDirectMessage {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listensterDirectQueue1(String message){System.out.println("消费者1接收到Direct的消息是:"+message);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listensterDirectQueue2(String message){System.out.println("消费者2接收到Direct的消息是:"+message);}
}

‘2,定义publisher发布消息

@Testpublic void sendDirectMessage() throws InterruptedException {String exchangeName="direct.exchange";//定义队列名String message="DirectExchange Message!!";String routinfKey="blue";rabbitTemplate.convertAndSend(exchangeName,routinfKey,message);rabbitTemplate.convertAndSend(exchangeName,null,message);}

发送时需要指定routingKey即可,同一个队列定义时可指定多个BindingKey

注意:Direct Exchange只会发送给routingKeyBindingKey一致的队列

🧇发布订阅-Topic Exchange

Topic Exchange:Topic Exchange和Direct key类似,区别在于他的routingKey必须是多个单词的列表,并且必须是以 分割,并且Queue和Exchange指定时课使用通配符

实现思路:

1,实现消费者代码


@Component
public class ListenerTopicMessage {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),key = "china.#"))public void listensterDirectQueue1(String message){System.out.println("消费者1接收到topic的消息是:"+message);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),key = "#.news"))public void listensterDirectQueue2(String message){System.out.println("消费者2接收到topic的消息是:"+message);}
}

2,实现publisher代码

@Testpublic void sendtopictMessage() throws InterruptedException {String exchangeName="topic.exchange";//定义队列名String message="TopicExchange Message!!";String routinfKey="china.news";rabbitTemplate.convertAndSend(exchangeName,routinfKey,message);//rabbitTemplate.convertAndSend(exchangeName,null,message);}

至此RabbitMQ的安装使用及其五种基本工作模式搞定!!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/818489.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ant Design 表单基础用法综合示例

Ant Design 的表单组件设计得非常出色,极大地简化了表单开发的复杂度,让开发者能够快速构建出功能丰富、交互友好的表单界面。 接下来总结一下 Ant Design 中表单的基本用法。 Form 组件 用于定义整个表单,可以设置表单的布局方式、提交行为等。通常会将表单字段组件嵌套在 F…

利用栈删除数组中重复元素

先将数据排序&#xff08;降序或升序&#xff09; 建立一个“栈”&#xff0c;三种情况&#xff1a; 1.栈为空&#xff1a;压入一个元素 2.栈不为空 且 栈顶元素不等于将入栈元素&#xff1a;压入一个元素 3.栈不为空 且 栈顶元素等于将入栈元素&#xff1a;删除将压入元素…

【学习笔记十一】EWM上架目标仓位确定过程及配置

一、EWM确定目标区域概述 1.EWM从仓库处理类型获取源仓库类型&#xff08;Source storage type&#xff09;和源仓位&#xff08;Source Bin&#xff09;2.EWM根据仓库类型&#xff08;storage type&#xff09;、仓库分区&#xff08;storage section&#xff09;和上架策略&a…

Matlab|基于广义Benders分解法的综合能源系统优化规划

目录 1 主要内容 广义benders分解法流程图&#xff1a; 优化目标&#xff1a; 约束条件&#xff1a; 2 部分代码 3 程序结果 4 下载链接 1 主要内容 该程序复现文章《综合能源系统协同运行策略与规划研究》第四章内容基于广义Benders分解法的综合能源系统优化规划&…

Python开源工具库使用之词云Wordcloud

文章目录 前言一、基本使用1.1 文本生成词云1.2 配置项 二、进阶用法2.1 自定义形状2.2 自定义着色2.3 自定义词频2.4 中文 三、实际案例3.1 工作报告词云3.2 周杰伦歌词词云 四、总结4.1 优点和局限性4.2 展望未来发展 参考 前言 当我们需要将大量文本数据可视化展示时&#…

单链表和文件操作使用练习:通讯录

1. 项目文件组成&#xff08;vs2022&#xff09; 1. Contact.h和Contact.c分别为实现通讯录的头文件和源文件。 2. SList.h和SList.c分别为实现单链表的头文件和源文件。 3. test.c为测试用的源文件&#xff0c;用于调用通讯录提供的函数。 4. Contact.txt用于存储联系人信息。…

【热门话题】PyTorch:深度学习领域的强大工具

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 PyTorch&#xff1a;深度学习领域的强大工具一、PyTorch概述二、PyTorch核心特性…

【华为OD机试】围棋的气【C卷|100分】

题目描述 围棋棋盘由纵横各19条线垂直相交组成,棋盘上一共19 x 19 = 361 个交点, 对弈双方一方执白棋,一方执黑棋,落子时只能将棋子置于交点上。 “气”是围棋中很重要的一个概念,某个棋子有几口气,是指其上下左右方向四个相邻的交叉点中, 有几个交叉点没有棋子,由此可…

RabbitMQ消息模型之Direct消息模型

Direct消息模型 * 路由模型&#xff1a; * 一个交换机可以绑定多个队列 * 生产者给交换机发送消息时&#xff0c;需要指定消息的路由键 * 消费者绑定队列到交换机时&#xff0c;需要指定所需要消费的信息的路由键 * 交换机会根据消息的路由键将消息转发到对应的队…

解锁创意无限,体验全新Adobe Illustrator 2021 for mac/Win中文版

在数字化创意的浪潮中&#xff0c;Adobe Illustrator 2021中文版无疑是设计师们的得力助手。这款软件集高效、便捷、创新于一体&#xff0c;无论是Mac还是Windows用户&#xff0c;都能在其中找到属于自己的创意空间。 Adobe Illustrator 2021中文版延续了其强大的矢量图形处理…

循环双链表的操作

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 每一个裂缝都是为透出光而努力&#…

FFmpeg: 自实现ijkplayer播放器--04消息队列设计

文章目录 播放器状态转换图播放器状态对应的消息&#xff1a; 消息对象消息队列消息队列api插入消息获取消息初始化消息插入消息加锁初始化消息设置消息参数消息队列初始化清空消息销毁消息启动消息队列终止消息队列删除消息 消息队列&#xff0c;用于发送&#xff0c;设置播放…

[lesson33]C++中的字符串类

C中的字符串类 历史遗留问题 C语言不支持真正意义上的字符串C语言用字符数组和一组函数实现字符串操作C语言不支持自定义类型&#xff0c;因此无法获得字符串类型 解决方案 从C到C的进化过程引入自定义类型在C中可以通过类完成字符串类型的定义 标准库中的字符串类 C语言直…

学鸿蒙开发的优劣势,你清楚吗?建议你了解一下!

随着科技的不断发展和智能设备的普及&#xff0c;鸿蒙系统作为华为自主研发的操作系统&#xff0c;正逐渐受到市场的关注。2024年&#xff0c;学鸿蒙开发是否有前途&#xff0c;成为了很多开发者关心的问题。本文将从多个角度分析鸿蒙系统的发展前景&#xff0c;以及学习鸿蒙开…

Android使用shape属性绘制边框内渐变色

目录 先上效果图实现方法shape属性介绍代码结果 先上效果图 这是使用AndroidStudio绘制的带有渐变色的边框背景色 实现方法 项目中由于UI设计需求&#xff0c;需要给按钮、控件设置带有背景色效果的。以下是UI效果图。 这里我们使用shape属性来绘制背景效果。 shape属性介…

Leetcode-48-旋转图像

题目说明 给定一个 n n 的二维矩阵表示一个图像。 将图像顺时针旋转 90 度。 说明&#xff1a;你必须在原地旋转图像&#xff0c;这意味着你需要直接修改输入的二维矩阵。请不要使用另一个矩阵来旋转图像。 示例 1: 给定 matrix [ [1,2,3], [4,5,6], [7,8,9] ], 原地旋转输入…

如何在横向渗透攻击中寻到一线生机

横向渗透&#xff0c;作为计算机网络中的一种攻击技术&#xff0c;展现出了攻击者如何巧妙地利用同一级别系统间的漏洞和弱点&#xff0c;扩大其网络访问权限。与纵向渗透不同&#xff0c;横向渗透不关注权限的垂直提升&#xff0c;而是更侧重于在同一层级内扩展影响力。 横向…

Python数据容器(一)

一.数据容器入门 1.Python中的数据容器&#xff1a;一种可以容纳多份数据的数据类型&#xff0c;容纳的每一份数据称之为1个元素&#xff0c;每一个元素&#xff0c;可以是任意类型的数据&#xff0c;如字符串、数字、布尔等。 2.数据容器根据特点的不同&#xff0c;如&#…

VTK —— 一、Windows10下编译VTK源码,并用Vs2017代码测试(附编译流程、附编译好的库、vtk测试源码)

效果 编译 1、下载VTK8.2.0源码        2、解压源码后&#xff0c;进入目录创建build目录&#xff0c;同时在build内创建install目录 (下图install目录是在cmake第一次后才手动创建&#xff0c;建议在创建build时创建)        3、打开CMake&#xff0c;如下图填入…

卷积神经网络结构组成与解释

卷积神经网络结构组成与解释 卷积神经网络是以卷积层为主的深度网路结构&#xff0c;网络结构包括有卷积层、激活层、BN层、池化层、FC层、损失层等。卷积操作是对图像和滤波矩阵做内积&#xff08;元素相乘再求和&#xff09;的操作。 1. 卷积层 常见的卷积操作如下&#x…