微服务技术栈SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式(四):消息队列MQ

文章目录

  • 一、消息队列MQ
  • 二、RabbitMQ
    • 2.1 单机部署
    • 2.2 消息模型
  • 三、SpringAMAP
    • 3.1 简单消息队列
    • 3.2 工作消息队列
    • 3.3 发布-订阅模型:FanoutExchange 广播交换机
    • 3.4 发布-订阅模型:DirectExchange 路由交换机
    • 3.5 发布-订阅模型:TopicExchange 话题交换机
    • 3.6 消息转换器


一、消息队列MQ

同步调用的优点:时效性较强,可以立即得到结果;
同步调用的问题:耦合度高;性能和吞吐能力下降;有额外的资源消耗;有级联失败问题;

异步调用常见实现就是事件驱动模式
异步通信的优点:耦合度低;吞吐量提升;故障隔离;流量削峰;
异步通信的缺点:依赖于Broker的可靠性、安全性、吞吐能力;架构复杂了,业务没有明显的流程线,不好追踪管理;

在这里插入图片描述

二、RabbitMQ

RabbitMQ的官网:https://www.rabbitmq.com/
在这里插入图片描述
在这里插入图片描述

2.1 单机部署

我们在Centos7虚拟机中使用Docker来安装。

  1. 下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

在课前资料已经提供了镜像包:
上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar
  1. 安装MQ

执行下面的命令来运行MQ容器:

docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
  1. 通过 主机IP:15672 访问RabbitMQ的管理界面
    在这里插入图片描述

2.2 消息模型

在这里插入图片描述

在官网 https://www.rabbitmq.com/ 中,选择文件 -> 入门,可看见案例demo
在这里插入图片描述
下面演示:官网的基本消息队列模型
在这里插入图片描述

publisher

package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.101.6");factory.setPort(5672);factory.setVirtualHost("/");  //虚拟主机factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

consumer

package cn.itcast.mq.helloworld;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.101.6");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

在这里插入图片描述

三、SpringAMAP

在这里插入图片描述

3.1 简单消息队列

在这里插入图片描述

流程如下:

  1. 在父工程中引入spring-amqp的依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
  1. 在publisher和consumer服务中编写application.yml,添加mq连接信息
spring:rabbitmq:host: 192.168.150.101 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /  # 虚拟主机
  1. 在publisher服务中新建一个测试类,编写测试方法,然后运行测试方法发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";  // 队列名称String message = "hello, spring amqp!"; // 消息rabbitTemplate.convertAndSend(queueName, message);}
}
  1. 在consumer服务中新建一个类,编写消费逻辑,然后启动服务。 定义类,添加@Component注解;类中声明方法,添加@RabbitListener注解,方法参数就时消息。注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");}
}

3.2 工作消息队列

在这里插入图片描述
在这里插入图片描述

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}
}
  1. 设置两个消费者
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);}
  1. 由于消费者存在消息预取机制【即:消费者会将队列中的消息提前取出来,再处理】导致两个消费者处理消息的数量一致【即:一半一半】,因此需要在消费者的application设置prefetch=1如下【用来保证每次处理完一条消息再取消息】,这样消费者1比消费者2处理的消息更多。
spring:rabbitmq:host: 192.168.150.101 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /  # 虚拟主机listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成后才能获取下一个消息,该用于解决消息预取机制

3.3 发布-订阅模型:FanoutExchange 广播交换机

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendFanoutExchange() {// 交换机名称String exchangeName = "itcast.fanout";// 消息String message = "hello, every one!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "", message);}
}
  1. 声明一个交换机,两个消息队列,并完成绑定,然后设置两个消费者接收消息。最后测试发现两个消费者可以接收发布者的消息
package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {// 1.声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}// 2.声明第1个队列 注意:此方法名是该队列的唯一ID@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}// 3.绑定队列1到交换机 注意:参数名要与上述定义的方法名保持一致@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 声明第2个队列 注意:此方法名是该队列的唯一ID@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}// 绑定队列2到交换机 注意:参数名要与上述定义的方法名保持一致@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");}

在这里插入图片描述

3.4 发布-订阅模型:DirectExchange 路由交换机

在这里插入图片描述

在这里插入图片描述

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "hello, blue!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);}
}
  1. 设置两个消费者,并且用@RabbitListener注解的方式声明 Binding Queue Exchange Key。当发送者发送key=blue的消息时,只有消费者1收到。
@Component
public class SpringRabbitListener {/*** 4.发布-订阅模型:Direct 路由* 用注解的方式声明 Binding Queue Exchange Key*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))   // type表示哪种交换机public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");}
}

在这里插入图片描述

3.5 发布-订阅模型:TopicExchange 话题交换机

在这里插入图片描述
在这里插入图片描述
实现消费者1接收中国的所有消息,消费者2接收所有的新闻

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "今天天气不错,我的心情好极了!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.wearther", message);}
}
  1. 设置两个消费者,并且用@RabbitListener注解的方式声明 Binding Queue Exchange Key。当发送者发送key=china.wearther的消息时,只有消费者1收到。
@Component
public class SpringRabbitListener {/*** 5.发布-订阅模型:topic 路由* 用注解的方式声明 Binding Queue Exchange Key*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");}
}

3.6 消息转换器

上述传递的都是String类型的,而实际需要传递Object类型的数据,因此我么需要对消息进行转换

  1. 前面步骤同上,编写publisher
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Testpublic void testSendObjectQueue() {Map<String, Object> msg = new HashMap<>();msg.put("name","小明");msg.put("age",12);// 发送消息rabbitTemplate.convertAndSend("object.queue", msg);}
}
  1. 设置两个消费者,并且用@RabbitListener注解的方式声明 Binding Queue Exchange Key。当发送者发送key=china.wearther的消息时,只有消费者1收到。
@Component
public class SpringRabbitListener {/*** 6.消息转换器**/@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> msg){System.out.println("接收到object.queue的消息:" + msg);}
}
  1. 但这样存在一个问题publisher发布的数据被序列化,因此我们需要在publisher和consumer的pom文件(或者父工程的pom文件)中添加依赖,并且在Application中反序列化
        <!--JSON序列化--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
@SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class);}@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/732412.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Day29:安全开发-JS应用DOM树加密编码库断点调试逆向分析元素属性操作

目录 JS原生开发-DOM树-用户交互 JS导入库开发-编码加密-逆向调试 思维导图 JS知识点&#xff1a; 功能&#xff1a;登录验证&#xff0c;文件操作&#xff0c;SQL操作&#xff0c;云应用接入&#xff0c;框架开发&#xff0c;打包器使用等 技术&#xff1a;原生开发&#x…

推房子游戏c++

这段代码是一个推箱子游戏的实现。游戏中有一个地图&#xff0c;地图上有墙壁、人、箱子和目标位置。玩家通过键盘输入WASD或方向键来控制人物的移动&#xff0c;目标是将所有的箱子推到相应的目标位置上。 代码中的dt数组表示地图&#xff0c;每个位置上的字符表示对应的元素…

JAVA后端开发面试基础知识(六)——Redis

1. 内存淘汰策略 noeviction&#xff1a;当内存不足以容纳新写入数据时&#xff0c;新写入操作会报错allkeys-lru&#xff1a;当内存不足以容纳新写入数据时&#xff0c;在键空间中&#xff0c;移除近少使用的key。(这个是最常用的)allkeys-random&#xff1a;当内存不足以容纳…

c语言在线聊天室

c语言基于tcp和多线程的在线聊天室(c语言通讯系统)功能需求 1.实现多线程 2.构建socke套接字实现一对一通信 3.实现多个电脑的通信 4.数据传输加密和解密 5.多人实时聊天 6.具备群聊和私聊的功能 实现原理: 服务端公网Ip暴露,客户端端口随机分配,通过服务端公网IP连接,服务端…

【开源】SpringBoot框架开发免税店商城管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、系统设计2.1 功能模块设计2.2 研究方法 三、系统展示四、核心代码4.1 查询免税种类4.2 查询物品档案4.3 新增顾客4.4 新增消费记录4.5 审核免税 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的免税店商城管理系…

【文本编辑】Typora v1.8.6 绿色版

下载地址 Typora v1.8.6 绿色版 简介 Typora 是一款简洁、直观的跨平台 Markdown 编辑器&#xff0c;旨在提供优雅的写作体验。与传统的 Markdown 编辑器不同&#xff0c;Typora 提供所见即所得的编辑界面&#xff0c;使用户可以即时预览 Markdown 文档的渲染效果&#xff0…

7. 阅读魔法书

题目&#xff1a;7.阅读魔法书 - 蓝桥云课 (lanqiao.cn) 代码&#xff1a; #include<iostream> #include<stdio.h> #include<algorithm> using namespace std; int main() {string str;cin>>str;int n,ans0;cin>>n;while(n--){string substr;ci…

prometheus 原理(架构,promql表达式,描点原理)

大家好&#xff0c;我是蓝胖子&#xff0c;提到监控指标&#xff0c;不得不说prometheus&#xff0c;今天这篇文章我会对prometheus 的架构设计&#xff0c;promql表达式原理和监控图表的绘图原理进行详细的解释。来让大家对prometheus的理解更加深刻。 架构设计 先来看看&am…

性能测试干2年,还不会这个技术点?

nmon是一种在AIX与各种Linux操作系统上广泛使用的监控与分析工具&#xff0c;记录的信息比较全面&#xff0c;结合nmon_analyzer工具产生数据文件与图形化结果。 nmon可监控的数据类型 内存使用情况、磁盘适配器、文件系统中的可用空间、CPU使用率等等数据信息 特点 ①占用…

java入门 类型学习

文章目录 一、比较float型与double型二、Java与C语言的不同三、float变量的声明四、字符串与字符类型五、算术混合运算的精度 一、比较float型与double型 float x 0.4f; double y 0.4;请问各位是不是感觉这两者好像是一样大的吧&#xff1f; 其实不然 因为&#xff1a;实际…

用logrote和split分割nohup.out日志

背景&#xff1a;用nohup运行jar包时候&#xff0c;会产生大量的日志文件&#xff0c;影响磁盘存储&#xff0c;生产环境大概1天有30个g 解决方案&#xff1a; 1、用split分割日志&#xff0c;代码在下面&#xff08;可以先测试一下&#xff09;&#xff0c;然后加入到定时任务…

Java零基础-数组的访问和遍历

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一个人虽可以走的更快&#xff0c;但一群人可以走的更远。 我是一名后…

LeetCode2834. Find the Minimum Possible Sum of a Beautiful Array

文章目录 一、题目二、题解 一、题目 You are given positive integers n and target. An array nums is beautiful if it meets the following conditions: nums.length n. nums consists of pairwise distinct positive integers. There doesn’t exist two distinct ind…

使用CSS制作动态的环形图/饼图

使用纯 CSS Animation conic-gradient 实现一个环形图。 饼图的实现思路和环形图一样&#xff0c;去掉中间的圆形遮盖 after 伪类元素即可。 一、构建基础样式 构建圆形节点和中间的遮盖元素。 <style>body {background-color: rgb(130, 226, 255);}.circle {top: 16…

持续更新 | 与您分享 Flutter 2024 年路线图

作者 / Michael Thomsen Flutter 是一个拥有繁荣社区的开源项目&#xff0c;我们致力于确保我们的计划公开透明&#xff0c;并将毫无隐瞒地分享从问题到设计规范的所有内容。我们了解到许多开发者对 Flutter 的功能路线图很感兴趣。我们往往会在一年中不断更改并调整这些计划&a…

python3.9 处理excel来实现类似excel中的vlookup功能

#本次工作中需要处理两个excel中的数据&#xff0c;使用vlookup查询后显示N/A然后就选择了python# import openpyxl# excel表格的绝对路径 path r"C:\Users\Administrator\Desktop\device.xlsx"# 打开表格对象 workbook openpyxl.load_workbook(path) # 打印Excel表…

PHP语言常见面试题:什么是PHP中的函数?如何定义和调用一个函数?

在PHP中&#xff0c;函数是一组可以重复使用的代码块&#xff0c;用于执行特定的任务。函数可以接收输入&#xff08;参数&#xff09;&#xff0c;执行一系列操作&#xff0c;并可能返回输出结果。通过使用函数&#xff0c;你可以将代码组织成可重用的模块&#xff0c;提高代码…

Clock Verification IP

Clock Verification IP IP 参数及接口 IP 例化界面 相关函数 start_clock //产生时钟 <hierarchy_path>.IF.start_clockstop_clock //停止时钟 <hierarchy_path>.IF.stop_clockset_initial_value //设置时钟初始值为 0 <hierarchy_path>IF.set_initia…

LeetCode 2386.找出数组的第 K 大和:逆向思维(小根堆)

【LetMeFly】2386.找出数组的第 K 大和&#xff1a;逆向思维&#xff08;小根堆&#xff09; 力扣题目链接&#xff1a;https://leetcode.cn/problems/find-the-k-sum-of-an-array/ 给你一个整数数组 nums 和一个 正 整数 k 。你可以选择数组的任一 子序列 并且对其全部元素求…

Android性能优化 - ANR的分析和解决

一、ANR概念 1.定义 “Application Not Responding”的缩写&#xff0c;即“应用程序无响应”。如果你应用程序在UI线程被阻塞太长时间&#xff0c;就会出现ANR。 2.类型 ① KeyDispatchTimeout&#xff08;常见&#xff09; input事件在5S内没有处理完成发生了ANR。logca…