微服务技术栈SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式(四):消息队列MQ

文章目录

  • 一、消息队列MQ
  • 二、RabbitMQ
    • 2.1 单机部署
    • 2.2 消息模型
  • 三、SpringAMAP
    • 3.1 简单消息队列
    • 3.2 工作消息队列
    • 3.3 发布-订阅模型:FanoutExchange 广播交换机
    • 3.4 发布-订阅模型:DirectExchange 路由交换机
    • 3.5 发布-订阅模型:TopicExchange 话题交换机
    • 3.6 消息转换器


一、消息队列MQ

同步调用的优点:时效性较强,可以立即得到结果;
同步调用的问题:耦合度高;性能和吞吐能力下降;有额外的资源消耗;有级联失败问题;

异步调用常见实现就是事件驱动模式
异步通信的优点:耦合度低;吞吐量提升;故障隔离;流量削峰;
异步通信的缺点:依赖于Broker的可靠性、安全性、吞吐能力;架构复杂了,业务没有明显的流程线,不好追踪管理;

在这里插入图片描述

二、RabbitMQ

RabbitMQ的官网:https://www.rabbitmq.com/
在这里插入图片描述
在这里插入图片描述

2.1 单机部署

我们在Centos7虚拟机中使用Docker来安装。

  1. 下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

在课前资料已经提供了镜像包:
上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar
  1. 安装MQ

执行下面的命令来运行MQ容器:

docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
  1. 通过 主机IP:15672 访问RabbitMQ的管理界面
    在这里插入图片描述

2.2 消息模型

在这里插入图片描述

在官网 https://www.rabbitmq.com/ 中,选择文件 -> 入门,可看见案例demo
在这里插入图片描述
下面演示:官网的基本消息队列模型
在这里插入图片描述

publisher

package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.101.6");factory.setPort(5672);factory.setVirtualHost("/");  //虚拟主机factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

consumer

package cn.itcast.mq.helloworld;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.101.6");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

在这里插入图片描述

三、SpringAMAP

在这里插入图片描述

3.1 简单消息队列

在这里插入图片描述

流程如下:

  1. 在父工程中引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  1. 在publisher和consumer服务中编写application.yml,添加mq连接信息
spring:rabbitmq:host: 192.168.150.101 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /  # 虚拟主机
  1. 在publisher服务中新建一个测试类,编写测试方法,然后运行测试方法发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";  // 队列名称String message = "hello, spring amqp!"; // 消息rabbitTemplate.convertAndSend(queueName, message);}
}
  1. 在consumer服务中新建一个类,编写消费逻辑,然后启动服务。 定义类,添加@Component注解;类中声明方法,添加@RabbitListener注解,方法参数就时消息。注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");}
}

3.2 工作消息队列

在这里插入图片描述
在这里插入图片描述

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}
}
  1. 设置两个消费者
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);}
  1. 由于消费者存在消息预取机制【即:消费者会将队列中的消息提前取出来,再处理】导致两个消费者处理消息的数量一致【即:一半一半】,因此需要在消费者的application设置prefetch=1如下【用来保证每次处理完一条消息再取消息】,这样消费者1比消费者2处理的消息更多。
spring:rabbitmq:host: 192.168.150.101 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /  # 虚拟主机listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成后才能获取下一个消息,该用于解决消息预取机制

3.3 发布-订阅模型:FanoutExchange 广播交换机

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendFanoutExchange() {// 交换机名称String exchangeName = "itcast.fanout";// 消息String message = "hello, every one!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "", message);}
}
  1. 声明一个交换机,两个消息队列,并完成绑定,然后设置两个消费者接收消息。最后测试发现两个消费者可以接收发布者的消息
package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {// 1.声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}// 2.声明第1个队列 注意:此方法名是该队列的唯一ID@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}// 3.绑定队列1到交换机 注意:参数名要与上述定义的方法名保持一致@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 声明第2个队列 注意:此方法名是该队列的唯一ID@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}// 绑定队列2到交换机 注意:参数名要与上述定义的方法名保持一致@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");}

在这里插入图片描述

3.4 发布-订阅模型:DirectExchange 路由交换机

在这里插入图片描述

在这里插入图片描述

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "hello, blue!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);}
}
  1. 设置两个消费者,并且用@RabbitListener注解的方式声明 Binding Queue Exchange Key。当发送者发送key=blue的消息时,只有消费者1收到。
@Component
public class SpringRabbitListener {/*** 4.发布-订阅模型:Direct 路由* 用注解的方式声明 Binding Queue Exchange Key*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))   // type表示哪种交换机public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");}
}

在这里插入图片描述

3.5 发布-订阅模型:TopicExchange 话题交换机

在这里插入图片描述
在这里插入图片描述
实现消费者1接收中国的所有消息,消费者2接收所有的新闻

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "今天天气不错,我的心情好极了!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.wearther", message);}
}
  1. 设置两个消费者,并且用@RabbitListener注解的方式声明 Binding Queue Exchange Key。当发送者发送key=china.wearther的消息时,只有消费者1收到。
@Component
public class SpringRabbitListener {/*** 5.发布-订阅模型:topic 路由* 用注解的方式声明 Binding Queue Exchange Key*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");}
}

3.6 消息转换器

上述传递的都是String类型的,而实际需要传递Object类型的数据,因此我么需要对消息进行转换

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Testpublic void testSendObjectQueue() {Map<String, Object> msg = new HashMap<>();msg.put("name","小明");msg.put("age",12);// 发送消息rabbitTemplate.convertAndSend("object.queue", msg);}
}
  1. 设置两个消费者,并且用@RabbitListener注解的方式声明 Binding Queue Exchange Key。当发送者发送key=china.wearther的消息时,只有消费者1收到。
@Component
public class SpringRabbitListener {/*** 6.消息转换器**/@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> msg){System.out.println("接收到object.queue的消息:" + msg);}
}
  1. 但这样存在一个问题publisher发布的数据被序列化,因此我们需要在publisher和consumer的pom文件(或者父工程的pom文件)中添加依赖,并且在Application中反序列化
        <!--JSON序列化--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
@SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class);}@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/732412.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Day29:安全开发-JS应用DOM树加密编码库断点调试逆向分析元素属性操作

目录 JS原生开发-DOM树-用户交互 JS导入库开发-编码加密-逆向调试 思维导图 JS知识点&#xff1a; 功能&#xff1a;登录验证&#xff0c;文件操作&#xff0c;SQL操作&#xff0c;云应用接入&#xff0c;框架开发&#xff0c;打包器使用等 技术&#xff1a;原生开发&#x…

推房子游戏c++

这段代码是一个推箱子游戏的实现。游戏中有一个地图&#xff0c;地图上有墙壁、人、箱子和目标位置。玩家通过键盘输入WASD或方向键来控制人物的移动&#xff0c;目标是将所有的箱子推到相应的目标位置上。 代码中的dt数组表示地图&#xff0c;每个位置上的字符表示对应的元素…

c语言在线聊天室

c语言基于tcp和多线程的在线聊天室(c语言通讯系统)功能需求 1.实现多线程 2.构建socke套接字实现一对一通信 3.实现多个电脑的通信 4.数据传输加密和解密 5.多人实时聊天 6.具备群聊和私聊的功能 实现原理: 服务端公网Ip暴露,客户端端口随机分配,通过服务端公网IP连接,服务端…

【开源】SpringBoot框架开发免税店商城管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、系统设计2.1 功能模块设计2.2 研究方法 三、系统展示四、核心代码4.1 查询免税种类4.2 查询物品档案4.3 新增顾客4.4 新增消费记录4.5 审核免税 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的免税店商城管理系…

【文本编辑】Typora v1.8.6 绿色版

下载地址 Typora v1.8.6 绿色版 简介 Typora 是一款简洁、直观的跨平台 Markdown 编辑器&#xff0c;旨在提供优雅的写作体验。与传统的 Markdown 编辑器不同&#xff0c;Typora 提供所见即所得的编辑界面&#xff0c;使用户可以即时预览 Markdown 文档的渲染效果&#xff0…

prometheus 原理(架构,promql表达式,描点原理)

大家好&#xff0c;我是蓝胖子&#xff0c;提到监控指标&#xff0c;不得不说prometheus&#xff0c;今天这篇文章我会对prometheus 的架构设计&#xff0c;promql表达式原理和监控图表的绘图原理进行详细的解释。来让大家对prometheus的理解更加深刻。 架构设计 先来看看&am…

性能测试干2年,还不会这个技术点?

nmon是一种在AIX与各种Linux操作系统上广泛使用的监控与分析工具&#xff0c;记录的信息比较全面&#xff0c;结合nmon_analyzer工具产生数据文件与图形化结果。 nmon可监控的数据类型 内存使用情况、磁盘适配器、文件系统中的可用空间、CPU使用率等等数据信息 特点 ①占用…

Java零基础-数组的访问和遍历

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一个人虽可以走的更快&#xff0c;但一群人可以走的更远。 我是一名后…

使用CSS制作动态的环形图/饼图

使用纯 CSS Animation conic-gradient 实现一个环形图。 饼图的实现思路和环形图一样&#xff0c;去掉中间的圆形遮盖 after 伪类元素即可。 一、构建基础样式 构建圆形节点和中间的遮盖元素。 <style>body {background-color: rgb(130, 226, 255);}.circle {top: 16…

持续更新 | 与您分享 Flutter 2024 年路线图

作者 / Michael Thomsen Flutter 是一个拥有繁荣社区的开源项目&#xff0c;我们致力于确保我们的计划公开透明&#xff0c;并将毫无隐瞒地分享从问题到设计规范的所有内容。我们了解到许多开发者对 Flutter 的功能路线图很感兴趣。我们往往会在一年中不断更改并调整这些计划&a…

Clock Verification IP

Clock Verification IP IP 参数及接口 IP 例化界面 相关函数 start_clock //产生时钟 <hierarchy_path>.IF.start_clockstop_clock //停止时钟 <hierarchy_path>.IF.stop_clockset_initial_value //设置时钟初始值为 0 <hierarchy_path>IF.set_initia…

Python和Google Colab进行卫星图像二维小波变化和机器学习

2D 小波分解是图像处理中的一种流行技术,使用不同的滤波器将图像分解为不同的频率分量(“近似”和“细节”系数)。该技术对于各种图像处理任务特别有用,例如压缩、去噪、特征提取和边缘检测。 在本文中,我们将演示如何在 Google Colab 中使用 Python 下载高分辨率样本卫星…

解决火狐浏览器访问地址受限制问题(This address is restricted)

问题如下图&#xff1a; This address is restrictedThis address uses a network port which is normally used for purposes other than Web browsing. Firefox has canceled the request for your protection. 此地址受到限制 此地址使用通常用于 Web 浏览以外的目的的网…

【Pytorch】进阶学习:基于矩阵乘法torch.matmul()实现全连接层

【Pytorch】进阶学习&#xff1a;基于矩阵乘法torch.matmul()实现全连接层 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448…

深入了解304缓存原理:提升网站性能与加载速度

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

微信小程序开发系列(十八)·wxml语法·声明和绑定数据

目录 1. 双大括号写法用法一&#xff1a;展示内容 步骤一&#xff1a;创建一个data对象 步骤二&#xff1a;双大括号写法的使用 步骤三&#xff1a;拓展 2. 双大括号写法用法二&#xff1a;绑定属性值 步骤一&#xff1a;给对象赋一个属性值 步骤二&#xff1a;双大括…

激光打标机红光与激光不重合:原因及解决方案

激光打标机红光和激光不在一个位置的问题可能由多种原因导致。以下是一些可能的原因和解决方法&#xff1a; 1. 激光器光路调整不当&#xff1a;激光器光路调整不当会导致激光束偏移&#xff0c;从而使红光与激光不重合。解决方法是重新调整激光器的光路&#xff0c;确保激光束…

【文档智能】再谈基于Transformer架构的文档智能理解方法论和相关数据集

前言 文档的智能解析与理解成为为知识管理的关键环节。特别是在处理扫描文档时&#xff0c;如何有效地理解和提取表单信息&#xff0c;成为了一个具有挑战性的问题。扫描文档的复杂性&#xff0c;包括其结构的多样性、非文本元素的融合以及手写与印刷内容的混合&#xff0c;都…

C# winform 重启电脑

一、重启电脑指令 windows7系统的启动文件夹为“开始菜单”——“所有程序”里面就有“启动”文件夹&#xff0c;其位置是 “C:\Users\Administrator\AppData\Roaming\Microsoft\Windows\Start Menu\Programs\Startup” 如果没有&#xff0c;则需要将其中的"administrator…

【正点原子STM32探索者】CubeMX+Keil开发环境搭建

文章目录 一、简单开箱二、资料下载三、环境搭建3.1 安装Keil MDK3.2 激活Keil MDK3.3 安装STM32CubeMX3.4 安装STM32F4系列MCU的Keil支持包 四、GPIO点灯4.1 查阅开发板原理图4.2 创建STM32CubeMX项目4.3 配置系统时钟和引脚功能4.4 生成Keil项目4.5 打开Keil项目4.6 编译Keil…