消息队列-RabbitMQ(二)

接上文《消息队列-RabbitMQ(一)》

在这里插入图片描述

@Configuration
public class RabbitMqConfig {// 消息的消费方json数据的反序列化@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}// 定义使用json的方式转换数据@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate amqpTemplate = new RabbitTemplate();amqpTemplate.setConnectionFactory(connectionFactory);amqpTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return amqpTemplate;}}
@Service
public class TestConsumer {@RabbitListener(queuesToDeclare = {@Queue("simpleQueue")})public void simpleModel(User user) {System.out.println("接收消息message=" + user);}// value=@Queue 创建临时队列// exchange创建交换机@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))})public void receiveMessage1(User user) {System.out.println(String.format("消费者 【one】: %s", user));}@RabbitListener(bindings = {@QueueBinding(value = @Queue,exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))})public void receiveMessage2(User user) {System.out.println(String.format("消费者 【two】: %s", user));}}
@RestController
@RequestMapping("/rabbitMqReq")
public class RabbitMqController {@Autowiredprivate TestProducer testProducer;@PostMapping("/simple")public void simple(@RequestBody User user) {testProducer.simpleMessageSend();}@PostMapping("/fanoutMessageSend")public void fanoutMessageSend(@RequestBody User user) {testProducer.fanoutMessageSend();}
}
@Data
public class User {private String userId; //用户编号private String userName; //用户姓名
}
@Service
public class TestProducer {@Resourceprivate RabbitTemplate rabbitTemplate;public void simpleMessageSend() {System.out.println("simpleMessageSend");User user = new User();user.setUserId("1001");user.setUserName("张三");rabbitTemplate.convertAndSend("simpleQueue", user);}// fanout模型public void fanoutMessageSend() {for (int i = 0; i < 5; i++) {User user = new User();user.setUserId("1001");user.setUserName("张三");rabbitTemplate.convertAndSend("fanout-ex", "", user);}try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}}}
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
public class App
{public static void main(String[] args){SpringApplication.run(App.class, args);System.out.println("(♥◠‿◠)ノ゙  启动成功   ლ(´ڡ`ლ)゙ ");}
}
# 开发环境配置
server:# 服务器的HTTP端口,默认为8080port: 8899servlet:# 应用的访问路径context-path: /tomcat:# tomcat的URI编码uri-encoding: UTF-8# tomcat最大线程数,默认为200max-threads: 800# Tomcat启动初始化的线程数,默认值25min-spare-threads: 30
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>RabbitMQDemo</artifactId><version>1.0-SNAPSHOT</version><name>RabbitMQDemo</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.1.RELEASE</version><relativePath /></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><fastjson.version>1.2.70</fastjson.version></properties><dependencies><!-- SpringBoot 核心包 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- AMQP客户端 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 阿里JSON解析器 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency></dependencies><repositories><!--阿里云镜像仓库--><repository><id>public</id><name>aliyun nexus</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><releases><enabled>true</enabled></releases></repository><!--oracle驱动没有发布到中央仓库,只能从此仓库下载--><repository><id>jeecg</id><name>jeecg Repository</name><url>http://maven.jeewx.com/nexus/content/repositories/jeecg</url><snapshots><enabled>false</enabled></snapshots></repository></repositories></project>

什么是事务?
数据库事务。原子性、一致性、隔离性、持久性。

本地事务?
整个服务操作只能涉及一个单一的数据库资源。

分布式事务?
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务就是为了保证不同数据库的数据一致性。

分布式事务应用架构?
微服务架构的分布式应用环境下,越来越多的应用要求对多个数据库资源,多个服务的访问都能纳入到同一个事务当中,分布式事务应运而生。

分布式事务有几种解决方案: 两阶段提交/XA MQ
第一阶段(prepare):即所有的参与者RM准备执行事务并锁住需要的资源。参与者ready时,向TM报告已准备就绪。
第二阶段 (commit/rollback):当事务管理者™确认所有参与者(RM)都ready后,向所有参与者发送commit命令。

https://blog.csdn.net/qq_43410878/article/details/123656765 RabbitMQ使用详解

RabbitMQ与springboot整合
1、添加依赖 spring-boot-starter-amqp
2、配置RabbitConfig,包含RabbitListenerContainerFactory、RabbitTemplate

简单模型
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void simpleMessageSend() {rabbitTemplate.convertAndSend("simpleQueue", new User(1, "张"));
}

}

@RabbitListener(queuesToDeclare = {@Queue(“simpleQueue”)})
public void simpleModel(User user) {
log.info(“message: {}”, user);
}

发布订阅模型
// fanout模型
@Test
public void fanoutMessageSend() {
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(“fanout-ex”, “”, new User(i, “张三”));
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// value=@Queue 创建临时队列
// exchange创建交换机
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

直连模式(direct)
@Test
public void directMessageSend() {
//rabbitTemplate.convertAndSend(“direct-ex”, “success”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“direct-ex”, “error”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”, “success”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

topic模型
@Test
public void topicMessageSend() {
//rabbitTemplate.convertAndSend(“topic-ex”, “company”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“topic-ex”, “company.java”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.java.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.html.*”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage3(User user) {
System.out.println(String.format(“消费者 【three】: %s”, user));
}

消息的手动确认
spring:
rabbitmq:
listener:
simple:
# 提交方式为手动
acknowledge-mode: MANUAL

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

面试:a. 如何保证消息不丢失?

​ b. 如何保证消息的不重复消费?

​ c. 如何使用mq来是实现分布式事务?

​ d. 在工作中mq用在哪里?支付回调。

RabbitMQ如何做到消息不丢失?

1.持久化

发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。

2.确认机制

消费者通过basic.ack命令向RabbitMQ服务器确认已经消费了消息。如果消费者处理消息时发生错误或宕机,RabbitMQ会重新将消息发送给其他消费者。RabbitMQ在接收到消费者确认消息前会将消息保存在内存中,在确认后才会删除消息。

3.发布者确认

RabbitMQ支持发布者确认机制,即发布者在将消息发送到队列后,等待RabbitMQ服务器的确认消息。成功保存到队列的消息会返回确认消息给发布者,如果无法保存则返回Nack(Negative Acknowledgement)消息。通过发布者确认机制,可以确保消息成功发送到RabbitMQ服务器。

4.备份队列

RabbitMQ支持备份队列(Alternate Exchange)机制,即在消息发送到队列之前,先将消息发送到备份队列。如果主队列无法接收消息,RabbitMQ会将消息发送到备份队列中。备份队列通常是一个交换机,在创建队列时可以通过x-dead-letter-exchange属性指定备份队列。

三、注意事项 在使用RabbitMQ消息确认机制时,需注意以下几点:

1、确认模式的选择: 根据具体业务需求选择合适的确认模式。如果对消息传递的可靠性要求较高,建议使用手动确认模式或批量确认模式。

2、设置消息持久化: 在发布消息时,通过设置消息的持久化属性,可以确保即使RabbitMQ服务器意外关闭或重启,消息仍然能够被保存下来。

3、处理未确认消息: 如果消费者在处理消息时发生异常,导致无法发送确认信号给RabbitMQ,那么这些消息将保持在队列中,并可能被重新投递。需要设定适当的重试策略和错误处理机制来处理这些未确认的消息。

4、监听确认超时: 在使用批量确认模式时,如果长时间没有收到确认信号,需要设置合理的超时时间,并采取相应措施来处理超时的情况。

RabbitMQ消息确认机制详解:确保交付成功

消息确认机制是通过发布者(Producer)和消费者(Consumer)之间的交互来实现的。
当发布者发送消息到RabbitMQ后,会等待确认结果。如果消息成功被消费者接收并处理,消费者会发送一个确认信号给RabbitMQ,告知消息已经处理完成。而RabbitMQ则会根据接收到的确认信号,判断消息是否成功交付。
消息确认机制一般有两种模式:简单模式和批量模式。在简单模式中,每发送一条消息后就等待确认;而在批量模式中,可以一次发送多条消息,然后等待批量确认。无论是简单模式还是批量模式,都可以提高消息传递的可靠性。

https://blog.csdn.net/weixin_52278591/article/details/128841155 RabbitMq 消息确认机制详解

https://blog.csdn.net/u011942456/article/details/128198956 总结RabbitMq消息丢失和消息重复消费问题

https://www.zhihu.com/question/54756562?utm_id=0 rabbitmq是什么,主要用于哪些方面?

https://zhuanlan.zhihu.com/p/583520436?utm_id=0 用了8年MQ!聊聊消息队列的技术选型,哪个最香!

https://blog.csdn.net/qq_43410878/article/details/123656765 RabbitMQ使用详解

https://zhuanlan.zhihu.com/p/590948541 绝对详细的 RabbitMQ入门,看完本系列就够了

MQ的优势和劣势

https://cloud.tencent.com/developer/article/2113514 面试百问:使用MQ的优势、劣势以及问题

rabbitmq basicAck

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/94284.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Open Cascade旋转变换平行线

在本人开发的弯管自动CAM软件中&#xff0c;有一个问题一直没有解决&#xff0c;就是180度平行管路需要做角度微调&#xff0c;以便进行YBC预览。研究了一番后&#xff0c;搞定了这个问题&#xff0c;关键在于采用OCC库实现拓扑变换。 本文将介绍如何使用OpenCASCADE库来实现平…

3.物联网射频识别,(高频)RFID应用ISO14443-2协议,(校园卡)Mifare S50卡

问题&#xff1a; 1) 14443协议&#xff0c;RFID标签的默认通信速率是 106kbps&#xff0c;也可以通过协商&#xff0c;调整为 &#xff08;fc/6413.56M/64&#xff09;212、424、 848kbps。 2) 14443-3 A类卡&#xff0c;上电后&#xff0c;读写器发送REQA命令&#xff0c;标签…

激光雷达中实现F-P标准具高热稳定性的帕尔贴精密温控解决方案

摘要&#xff1a;法布里-珀罗标准具作为一种具有高温度敏感性的精密干涉分光器件&#xff0c;在具体应用中对热稳定性具有很高的要求&#xff0c;如温度波动不能超过0.01℃&#xff0c;为此本文提出了相应的高精度恒温控制解决方案。解决方案具体针对温度控制精度和温度均匀性控…

计算机竞赛 题目: 基于深度学习的疲劳驾驶检测 深度学习

文章目录 0 前言1 课题背景2 实现目标3 当前市面上疲劳驾驶检测的方法4 相关数据集5 基于头部姿态的驾驶疲劳检测5.1 如何确定疲劳状态5.2 算法步骤5.3 打瞌睡判断 6 基于CNN与SVM的疲劳检测方法6.1 网络结构6.2 疲劳图像分类训练6.3 训练结果 7 最后 0 前言 &#x1f525; 优…

angularjs开发环境搭建

Angularjs是一个前端页面应用开发框架&#xff0c;其使用TypeScript作为开发语言&#xff0c;Angularjs的特性包括&#xff0c;使用组件、模板以及依赖注入的开发框架构建可扩展的web应用&#xff0c;使用易于集成的类库支持页面路由、页面表单、前后端接口交互等各种不同特性&…

JVM:经典垃圾收集器

经典垃圾收集器 如果说收集算法是内存回收的方法论&#xff0c;那垃圾收集器就是内存回收的实践者 《Java虚拟机规范》中对垃圾收集器应该如何实现并没有做出任何规定&#xff0c;因此不同的厂商、不同版本的虚拟机所包含的垃圾收集器都可能会有很大差别&#xff0c;不同的虚拟…

【Java】微服务——Nacos注册中心

目录 1.Nacos快速入门1.1.服务注册到nacos1&#xff09;引入依赖2&#xff09;配置nacos地址3&#xff09;重启 2.服务分级存储模型2.1.给user-service配置集群2.2.同集群优先的负载均衡 3.权重配置4.环境隔离4.1.创建namespace4.2.给微服务配置namespace 5.Nacos与Eureka的区别…

Day 04 python学习笔记

Python数据容器 元组 元组的声明 变量名称&#xff08;元素1&#xff0c;元素2&#xff0c;元素3&#xff0c;元素4…….&#xff09; &#xff08;元素类型可以不同&#xff09; eg: tuple_01 ("hello", 1, 2,-20,[11,22,33]) print(type(tuple_01))结果&#x…

uniapp实现微信小程序隐私协议组件封装

uniapp实现微信小程序隐私协议组件封装。 <template><view class"diygw-modal basic" v-if"showPrivacy" :class"showPrivacy?show:" style"z-index: 1000000"><view class"diygw-dialog diygw-dialog-modal bas…

求各区域热门商品Top3 - HiveSQL

背景&#xff1a;这是尚硅谷SparkSQL练习题&#xff0c;本文用HiveSQL进行了实现。 数据集&#xff1a;用户点击表&#xff0c;商品表&#xff0c;城市表 题目: ① 求每个地区点击量前三的商品&#xff1b; ② 在①的基础上&#xff0c;求出每个地区点击量前三的商品后&a…

MySQL-MVCC(Multi-Version Concurrency Control)

MySQL-MVCC&#xff08;Multi-Version Concurrency Control&#xff09; MVCC&#xff08;多版本并发控制&#xff09;&#xff1a;为了解决数据库并发读写和数据一致性的问题&#xff0c;是一种思想&#xff0c;可以有多种实现方式。 核心思想&#xff1a;写入时创建行的新版…

Windows安装Docker并创建Ubuntu环境及运行神经网络模型

目录 前言在Windows上安装Docker在Docker上创建Ubuntu镜像并运行容器创建Ubuntu镜像配置容器&#xff0c;使其可以在宿主机上显示GUI 创建容器并运行神经网络模型创建容器随便找一个神经网络模型试试 总结 前言 学生党一般用个人电脑玩神经网络&#xff0c;估计很少有自己的服…

TouchGFX之后端通信

在大多数应用中&#xff0c;UI需以某种方式连接到系统的其余部分&#xff0c;并发送和接收数据。 它可能会与硬件外设&#xff08;传感器数据、模数转换和串行通信等&#xff09;或其他软件模块进行交互通讯。 Model类​ 所有TouchGFX应用都有Model类&#xff0c;Model类除了存…

【计算机】CPU,芯片以及操作系统概述

1.CPU 什么是CPU? CPU&#xff08;Central Processing Unit&#xff09;是计算机系统的运算和控制核心&#xff0c;是信息处理、程序运行的最终执行单元&#xff0c;相当于系统的“大脑”。 CPU的工作流程&#xff1f; CPU 的工作流程分为以下 5 个阶段&#xff1a;取指令…

苹果ios系统ipa文件企业签名是什么?优势是什么?什么场合需要应用到?

企业签名是苹果开发者计划中的一种签名类型&#xff0c;允许企业开发者签署和分发企业内部使用的应用程序&#xff0c;而无需通过App Store进行公开发布。通过企业签名&#xff0c;企业可以在内部部署自己的应用程序&#xff0c;以满足特定的业务需求。 企业签名能够做到以下…

【JVM】 类加载机制、类加载器、双亲委派模型详解

文章目录 前言一、类加载机制二、类加载器三、双亲委派模型总结 前言 &#x1f4d5;各位读者好, 我是小陈, 这是我的个人主页 &#x1f4d7;小陈还在持续努力学习编程, 努力通过博客输出所学知识 &#x1f4d8;如果本篇对你有帮助, 烦请点赞关注支持一波, 感激不尽 &#x1f4d…

【改进哈里鹰算法(NCHHO)】使用混沌和非线性控制参数来提高哈里鹰算法的优化性能,解决车联网相关的路由问题(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

嵌入式Linux应用开发-驱动大全-同步与互斥①

嵌入式Linux应用开发-驱动大全-同步与互斥① 第一章 同步与互斥①1.1 内联汇编1.1.1 C语言实现加法1.1.2 使用汇编函数实现加法1.1.3 内联汇编语法1.1.4 编写内联汇编实现加法1.1.5 earlyclobber的例子 1.2 同步与互斥的失败例子1.2.1 失败例子11.2.2 失败例子21.2.3 失败例子3…

互联网Java工程师面试题·MyBatis 篇·第二弹

目录 16、Xml 映射文件中&#xff0c;除了常见的 select|insert|updae|delete标签之外&#xff0c;还有哪些标签&#xff1f; 17、Mybatis 的 Xml 映射文件中&#xff0c;不同的 Xml 映射文件&#xff0c;id 是否可以重复&#xff1f; 18、为什么说 Mybatis 是半自动 ORM 映射…

2023年中国体育赛事行业现状及趋势分析:体育与科技逐步融合,推动产业高质量发展[图]

体育赛事运营是指组织体育赛事或获取赛事版权&#xff0c;并进行赛事推广营销、运营管理等一系列商业运作的运营活动。体育赛事运营相关业务主要包括赛事运营与营销、赛事版权运营两个部分。 体育赛事运营行业分类 资料来源&#xff1a;共研产业咨询&#xff08;共研网&#x…