消息队列-RabbitMQ(二)

接上文《消息队列-RabbitMQ(一)》

在这里插入图片描述

@Configuration
public class RabbitMqConfig {// 消息的消费方json数据的反序列化@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}// 定义使用json的方式转换数据@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate amqpTemplate = new RabbitTemplate();amqpTemplate.setConnectionFactory(connectionFactory);amqpTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return amqpTemplate;}}
@Service
public class TestConsumer {@RabbitListener(queuesToDeclare = {@Queue("simpleQueue")})public void simpleModel(User user) {System.out.println("接收消息message=" + user);}// value=@Queue 创建临时队列// exchange创建交换机@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))})public void receiveMessage1(User user) {System.out.println(String.format("消费者 【one】: %s", user));}@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))})public void receiveMessage2(User user) {System.out.println(String.format("消费者 【two】: %s", user));}}
@RestController
@RequestMapping("/rabbitMqReq")
public class RabbitMqController {@Autowiredprivate TestProducer testProducer;@PostMapping("/simple")public void simple(@RequestBody User user) {testProducer.simpleMessageSend();}@PostMapping("/fanoutMessageSend")public void fanoutMessageSend(@RequestBody User user) {testProducer.fanoutMessageSend();}
}
@Data
public class User {private String userId; //用户编号private String userName; //用户姓名
}
@Service
public class TestProducer {@Resourceprivate RabbitTemplate rabbitTemplate;public void simpleMessageSend() {System.out.println("simpleMessageSend");User user = new User();user.setUserId("1001");user.setUserName("张三");rabbitTemplate.convertAndSend("simpleQueue", user);}// fanout模型public void fanoutMessageSend() {for (int i = 0; i < 5; i++) {User user = new User();user.setUserId("1001");user.setUserName("张三");rabbitTemplate.convertAndSend("fanout-ex", "", user);}try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}}}
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
public class App
{public static void main(String[] args){SpringApplication.run(App.class, args);System.out.println("(♥◠‿◠)ノ゙  启动成功   ლ(´ڡ`ლ)゙ ");}
}
# 开发环境配置
server:# 服务器的HTTP端口,默认为8080port: 8899servlet:# 应用的访问路径context-path: /tomcat:# tomcat的URI编码uri-encoding: UTF-8# tomcat最大线程数,默认为200max-threads: 800# Tomcat启动初始化的线程数,默认值25min-spare-threads: 30
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>RabbitMQDemo</artifactId><version>1.0-SNAPSHOT</version><name>RabbitMQDemo</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.1.RELEASE</version><relativePath /></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><fastjson.version>1.2.70</fastjson.version></properties><dependencies><!-- SpringBoot 核心包 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- AMQP客户端 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 阿里JSON解析器 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency></dependencies><repositories><!--阿里云镜像仓库--><repository><id>public</id><name>aliyun nexus</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases></repository><!--oracle驱动没有发布到中央仓库,只能从此仓库下载--><repository><id>jeecg</id><name>jeecg Repository</name><url>http://maven.jeewx.com/nexus/content/repositories/jeecg</url><snapshots><enabled>false</enabled></snapshots></repository></repositories></project>

什么是事务?
数据库事务。原子性、一致性、隔离性、持久性。

本地事务?
整个服务操作只能涉及一个单一的数据库资源。

分布式事务?
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务就是为了保证不同数据库的数据一致性。

分布式事务应用架构?
微服务架构的分布式应用环境下,越来越多的应用要求对多个数据库资源,多个服务的访问都能纳入到同一个事务当中,分布式事务应运而生。

分布式事务有几种解决方案: 两阶段提交/XA MQ
第一阶段(prepare):即所有的参与者RM准备执行事务并锁住需要的资源。参与者ready时,向TM报告已准备就绪。
第二阶段 (commit/rollback):当事务管理者™确认所有参与者(RM)都ready后,向所有参与者发送commit命令。

https://blog.csdn.net/qq_43410878/article/details/123656765 RabbitMQ使用详解

RabbitMQ与springboot整合
1、添加依赖 spring-boot-starter-amqp
2、配置RabbitConfig,包含RabbitListenerContainerFactory、RabbitTemplate

简单模型
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void simpleMessageSend() {rabbitTemplate.convertAndSend("simpleQueue", new User(1, "张"));
}

}

@RabbitListener(queuesToDeclare = {@Queue(“simpleQueue”)})
public void simpleModel(User user) {
log.info(“message: {}”, user);
}

发布订阅模型
// fanout模型
@Test
public void fanoutMessageSend() {
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(“fanout-ex”, “”, new User(i, “张三”));
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// value=@Queue 创建临时队列
// exchange创建交换机
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

直连模式(direct)
@Test
public void directMessageSend() {
//rabbitTemplate.convertAndSend(“direct-ex”, “success”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“direct-ex”, “error”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”, “success”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

topic模型
@Test
public void topicMessageSend() {
//rabbitTemplate.convertAndSend(“topic-ex”, “company”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“topic-ex”, “company.java”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.java.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.html.*”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage3(User user) {
System.out.println(String.format(“消费者 【three】: %s”, user));
}

消息的手动确认
spring:
rabbitmq:
listener:
simple:
# 提交方式为手动
acknowledge-mode: MANUAL

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

面试:a. 如何保证消息不丢失?

​ b. 如何保证消息的不重复消费?

​ c. 如何使用mq来是实现分布式事务?

​ d. 在工作中mq用在哪里?支付回调。

RabbitMQ如何做到消息不丢失?

1.持久化

发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。

2.确认机制

消费者通过basic.ack命令向RabbitMQ服务器确认已经消费了消息。如果消费者处理消息时发生错误或宕机,RabbitMQ会重新将消息发送给其他消费者。RabbitMQ在接收到消费者确认消息前会将消息保存在内存中,在确认后才会删除消息。

3.发布者确认

RabbitMQ支持发布者确认机制,即发布者在将消息发送到队列后,等待RabbitMQ服务器的确认消息。成功保存到队列的消息会返回确认消息给发布者,如果无法保存则返回Nack(Negative Acknowledgement)消息。通过发布者确认机制,可以确保消息成功发送到RabbitMQ服务器。

4.备份队列

RabbitMQ支持备份队列(Alternate Exchange)机制,即在消息发送到队列之前,先将消息发送到备份队列。如果主队列无法接收消息,RabbitMQ会将消息发送到备份队列中。备份队列通常是一个交换机,在创建队列时可以通过x-dead-letter-exchange属性指定备份队列。

三、注意事项 在使用RabbitMQ消息确认机制时,需注意以下几点:

1、确认模式的选择: 根据具体业务需求选择合适的确认模式。如果对消息传递的可靠性要求较高,建议使用手动确认模式或批量确认模式。

2、设置消息持久化: 在发布消息时,通过设置消息的持久化属性,可以确保即使RabbitMQ服务器意外关闭或重启,消息仍然能够被保存下来。

3、处理未确认消息: 如果消费者在处理消息时发生异常,导致无法发送确认信号给RabbitMQ,那么这些消息将保持在队列中,并可能被重新投递。需要设定适当的重试策略和错误处理机制来处理这些未确认的消息。

4、监听确认超时: 在使用批量确认模式时,如果长时间没有收到确认信号,需要设置合理的超时时间,并采取相应措施来处理超时的情况。

RabbitMQ消息确认机制详解:确保交付成功

消息确认机制是通过发布者(Producer)和消费者(Consumer)之间的交互来实现的。
当发布者发送消息到RabbitMQ后,会等待确认结果。如果消息成功被消费者接收并处理,消费者会发送一个确认信号给RabbitMQ,告知消息已经处理完成。而RabbitMQ则会根据接收到的确认信号,判断消息是否成功交付。
消息确认机制一般有两种模式:简单模式和批量模式。在简单模式中,每发送一条消息后就等待确认;而在批量模式中,可以一次发送多条消息,然后等待批量确认。无论是简单模式还是批量模式,都可以提高消息传递的可靠性。

https://blog.csdn.net/weixin_52278591/article/details/128841155 RabbitMq 消息确认机制详解

https://blog.csdn.net/u011942456/article/details/128198956 总结RabbitMq消息丢失和消息重复消费问题

https://www.zhihu.com/question/54756562?utm_id=0 rabbitmq是什么,主要用于哪些方面?

https://zhuanlan.zhihu.com/p/583520436?utm_id=0 用了8年MQ!聊聊消息队列的技术选型,哪个最香!

https://blog.csdn.net/qq_43410878/article/details/123656765 RabbitMQ使用详解

https://zhuanlan.zhihu.com/p/590948541 绝对详细的 RabbitMQ入门,看完本系列就够了

MQ的优势和劣势

https://cloud.tencent.com/developer/article/2113514 面试百问:使用MQ的优势、劣势以及问题

rabbitmq basicAck

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/94284.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Open Cascade旋转变换平行线

在本人开发的弯管自动CAM软件中&#xff0c;有一个问题一直没有解决&#xff0c;就是180度平行管路需要做角度微调&#xff0c;以便进行YBC预览。研究了一番后&#xff0c;搞定了这个问题&#xff0c;关键在于采用OCC库实现拓扑变换。 本文将介绍如何使用OpenCASCADE库来实现平…

3.物联网射频识别,(高频)RFID应用ISO14443-2协议,(校园卡)Mifare S50卡

问题&#xff1a; 1) 14443协议&#xff0c;RFID标签的默认通信速率是 106kbps&#xff0c;也可以通过协商&#xff0c;调整为 &#xff08;fc/6413.56M/64&#xff09;212、424、 848kbps。 2) 14443-3 A类卡&#xff0c;上电后&#xff0c;读写器发送REQA命令&#xff0c;标签…

激光雷达中实现F-P标准具高热稳定性的帕尔贴精密温控解决方案

摘要&#xff1a;法布里-珀罗标准具作为一种具有高温度敏感性的精密干涉分光器件&#xff0c;在具体应用中对热稳定性具有很高的要求&#xff0c;如温度波动不能超过0.01℃&#xff0c;为此本文提出了相应的高精度恒温控制解决方案。解决方案具体针对温度控制精度和温度均匀性控…

利用LVM制作swap交换分区

首先把一个磁盘进行分区制作成物理卷&#xff0c;也可以直接将一整块磁盘做成物理卷,我这里使用的是磁盘分区&#xff1a;pvcreate /dev/sdb1 然后将这个物理卷制作成卷组&#xff1a;vgcreate vg1 /dev/sdb1; 将这个卷组制作成逻辑卷&#xff1a;lvcreate -L 900M -n lv1 vg…

计算机竞赛 题目: 基于深度学习的疲劳驾驶检测 深度学习

文章目录 0 前言1 课题背景2 实现目标3 当前市面上疲劳驾驶检测的方法4 相关数据集5 基于头部姿态的驾驶疲劳检测5.1 如何确定疲劳状态5.2 算法步骤5.3 打瞌睡判断 6 基于CNN与SVM的疲劳检测方法6.1 网络结构6.2 疲劳图像分类训练6.3 训练结果 7 最后 0 前言 &#x1f525; 优…

angularjs开发环境搭建

Angularjs是一个前端页面应用开发框架&#xff0c;其使用TypeScript作为开发语言&#xff0c;Angularjs的特性包括&#xff0c;使用组件、模板以及依赖注入的开发框架构建可扩展的web应用&#xff0c;使用易于集成的类库支持页面路由、页面表单、前后端接口交互等各种不同特性&…

JVM:经典垃圾收集器

经典垃圾收集器 如果说收集算法是内存回收的方法论&#xff0c;那垃圾收集器就是内存回收的实践者 《Java虚拟机规范》中对垃圾收集器应该如何实现并没有做出任何规定&#xff0c;因此不同的厂商、不同版本的虚拟机所包含的垃圾收集器都可能会有很大差别&#xff0c;不同的虚拟…

【Java】微服务——Nacos注册中心

目录 1.Nacos快速入门1.1.服务注册到nacos1&#xff09;引入依赖2&#xff09;配置nacos地址3&#xff09;重启 2.服务分级存储模型2.1.给user-service配置集群2.2.同集群优先的负载均衡 3.权重配置4.环境隔离4.1.创建namespace4.2.给微服务配置namespace 5.Nacos与Eureka的区别…

Day 04 python学习笔记

Python数据容器 元组 元组的声明 变量名称&#xff08;元素1&#xff0c;元素2&#xff0c;元素3&#xff0c;元素4…….&#xff09; &#xff08;元素类型可以不同&#xff09; eg: tuple_01 ("hello", 1, 2,-20,[11,22,33]) print(type(tuple_01))结果&#x…

安装python中tensorflow和keras==2.2.0的路程

1.python中安装Keras2.3.0 你可以使用pip来安装特定版本的Keras。在命令行中运行以下命令&#xff1a; pip install keras2.3.0这将会下载并安装Keras的2.3.0版本及其相应的依赖项。请确保你的Python环境已经配置好&#xff0c;并且有足够的权限来安装软件包。2.python 中安装…

python中类的内置函数有哪些

在 Python 中&#xff0c;类的内置函数&#xff08;也称为魔术方法或特殊方法&#xff09;以双下划线开头和结尾&#xff0c;用于自定义类的行为。以下是一些常见的内置函数及其作用&#xff1a; __init__(self, ...): 构造函数&#xff0c;用于初始化对象。创建一个类的实例时…

javascript二维数组(3):指定数组元素的特定属性进行搜索

js中对数组&#xff0c; var data [{“name”: “《西游记》”, “author”: “吴承恩”, “cat”: “A级书刊”, “num”: 3},{“name”: “《三国演义》”, “author”: “罗贯中”, “cat”: “A级书刊”, “num”: 8},{“name”: “《红楼梦》”, “author”: “曹雪芹”,…

uniapp实现微信小程序隐私协议组件封装

uniapp实现微信小程序隐私协议组件封装。 <template><view class"diygw-modal basic" v-if"showPrivacy" :class"showPrivacy?show:" style"z-index: 1000000"><view class"diygw-dialog diygw-dialog-modal bas…

求各区域热门商品Top3 - HiveSQL

背景&#xff1a;这是尚硅谷SparkSQL练习题&#xff0c;本文用HiveSQL进行了实现。 数据集&#xff1a;用户点击表&#xff0c;商品表&#xff0c;城市表 题目: ① 求每个地区点击量前三的商品&#xff1b; ② 在①的基础上&#xff0c;求出每个地区点击量前三的商品后&a…

MySQL-MVCC(Multi-Version Concurrency Control)

MySQL-MVCC&#xff08;Multi-Version Concurrency Control&#xff09; MVCC&#xff08;多版本并发控制&#xff09;&#xff1a;为了解决数据库并发读写和数据一致性的问题&#xff0c;是一种思想&#xff0c;可以有多种实现方式。 核心思想&#xff1a;写入时创建行的新版…

Windows安装Docker并创建Ubuntu环境及运行神经网络模型

目录 前言在Windows上安装Docker在Docker上创建Ubuntu镜像并运行容器创建Ubuntu镜像配置容器&#xff0c;使其可以在宿主机上显示GUI 创建容器并运行神经网络模型创建容器随便找一个神经网络模型试试 总结 前言 学生党一般用个人电脑玩神经网络&#xff0c;估计很少有自己的服…

大数据开发面试笔记Day1

面试优先级 眼前一亮的人&#xff1a;所有项目都需要能快速入手开发的年轻人&#xff0c;而这种能力一定程度上来自于对开源项目的贡献&#xff1b;另外&#xff0c;找到该公司该部门的研发重点&#xff0c;以及与周围部门的协同研发项目&#xff0c;如果某一个项目碰巧非常适合…

TouchGFX之后端通信

在大多数应用中&#xff0c;UI需以某种方式连接到系统的其余部分&#xff0c;并发送和接收数据。 它可能会与硬件外设&#xff08;传感器数据、模数转换和串行通信等&#xff09;或其他软件模块进行交互通讯。 Model类​ 所有TouchGFX应用都有Model类&#xff0c;Model类除了存…

Python--控制台获取输入与正则表达式

前言一、控制台获取输入1.1 字符串输入1.2 整数输入1.3 浮点数输入1.4 布尔值输入1.5 列表输入1.6 汇总 二、正则表达式2.1 匹配数字2.2 模式检查2.3 替换字符2.4 切分字符串2.5 搜索并提取匹配的部分2.6 使用捕获组提取匹配的部分2.7 非贪婪匹配2.8 忽略大小写匹配2.9 使用预定…

【Java高级技术】单元测试——概述和快速入门

单元测试——概述和Junit框架快速入门 1.概述 就是针对最小的功能单元&#xff08;方法&#xff09;&#xff0c;编写测试代码对其进行正确性测试 可以用来对方法进行测试&#xff0c;它是第三方公司开源出来的&#xff08;很多开发工具已经集成了Junit框架&#xff0c;比如I…