文章目录
- 消息队列(Message Queue)
- 什么场景下,使用消息队列?
- 消息队列 概述
- RabbitMQ 消息队列
- RabbitMQ 概念
- 名词 概念
- RabbitMQ 流程
- RabbitMQ 安装
- RabbitMQ 页面介绍
- Exchange 交换机类型
- Spring Boot 整合RabbitMQ
- AmqpAdmin 与 RabbitTemplate 的使用
- 整合的 序列化问题
- Spring Boot 整合的 RabbitListener 监听
- @RabbitListener 注解
- @RabbitHandler 注解
- @PostConstruct 注解 和 @Primary 注解 使用
- RabbitMQ 发送端(生产者) 消息确认机制
- 事务消息(了解即可)
- RabbitMQ消息确认的 三个阶段
- 第一个阶段:可靠抵达 - confirmCallback
- 第二个阶段:可靠抵达 - ReturnCallback
- 可靠抵达 消费端(消费者) Ack消息确认机制
- 第三个阶段:ack 消息确认机制
- 电商 订单中心
- 订单中心 的 重要经验
- 订单登录 拦截
- Feign 远程调用丢失请求头问题(丢失请求头 等同于 登录失效了)
- Feign 异步情况丢失上下文问题
- 接口幂等性 处理(防重复提交)
- 幂等性 概念
- 幂等性 考虑情况
- 幂等解决方案
- token机制
- 各种锁机制
- 数据库悲观锁
- 数据库乐观锁
- 业务层分布式锁
- 各种唯一约束
- 数据库唯一约束
- redis set 防重
- 防重表
- 全局请求唯一id
- 分布式事务
- 本地事务
- 分布式事务的 问题 以及 理论
- CAP定理
- BASE 理论
- 分布式事务的 多种方案
- 2PC模式
- 柔性事务 - TCC 事务补偿型 方案(常用)
- 柔性事务 - 最大努力通知型方案(常用)
- 柔性事务 - 可靠消息 + 最终一致性方案(常用)
- Seata 框架
- Seata 介绍
- Seata 环境搭建
- Seata 的几个模式
- 订单服务采用 最终一致性方案 解决分布式事务问题
- RabbitMQ 延时队列(实现定时任务)
- 延迟队列的 设计和实现
- 锁和解锁库存 架构实现
- 订单服务 库存解锁 场景
- 订单服务 定时关单 场景
- RabbitMQ 消息积压、丢失、重复解决方案
- 如何保证消息可靠性 - 消息丢失
- 消息丢失场景 一:消息发送出去,由于网络原因没有抵达服务器。
- 消息丢失场景 二:消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时,Broker尚未持久化完成,宕机。
- 消息丢失场景 三:自动ACK的状态下。消费者收到消息,但没来得及处理消息,服务器宕机了。
- 如何保证消息可靠性 - 消息重复
- 如何保证消息可靠性 - 消息积压
消息队列(Message Queue)
什么场景下,使用消息队列?
- 需要
异步处理
的内容。
- 可能某个操作要调用多个服务流程去完成,这几者之间也没有强依赖性,无需等待返回。
应用解耦
没有使用消息队列:订单系统 调用 库存系统接口,库存系统维护升级,那么订单系统调用的接口也要维护升级, 所以很麻烦!
使用消息队列:订单系统 无需依靠着库存系统的接口,只需要给消息队列发送消息,库存系统去消息队列消费消息即可,实现应用上的解耦。
流量控制
(也叫做 流量削峰)
业务中涉及到某一时间段内,有大量请求需要处理,可以通过消息队列来做到流量控制。
例如:电商系统里面的 秒杀业务。
消息队列 概述
消息服务中两个重要概念:
消息代理(message broker)和 目的地(destination)
当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
消息队列主要有两种形式的目的地
:
- 队列(queue):点对点消息通信(point - to - point)
- 过程:消息发送给消息代理,消息代理将其放入一个队列,接收者从里面获取消息,消毒读取后移除队列。
- 消息只有唯一的发送者 ,一个或多个接收者。
- 主题(topic):发布(publish)/订阅(subscribe)消息通信。
- 过程:消息发送到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达同时收到消息。
消息队列的 两种协议:(遵循了协议就能够调用 消息队列中间件)
Spring 集成支持:
Spring Boot 继承自动配置:
市面上的MQ产品:ActiveMQ、RabbitMQ、RocketMQ、Kafka
本次项目采用AMQP的Rabbitmq来做消息队列。
RabbitMQ 消息队列
RabbitMQ 概念
名词 概念
Message:消息,由消息头和消息体组成。消息体是不透明的,消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange:交换器,用来接受生产者发送的消息并将这些消息路由给服务器中的队列。四种交换器类型:direct(默认)、fanout、topic、headers,不同类型转发消息的策略也不同。
Queue:消息队列。
Binding:绑定,用于消息队列和交换机之间的关联。
Connection:网络连接,比如:TCP连接。
Channel:信道,多路复用连接中的独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接受消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以服用一条TCP连接。
Consumer:消费者
Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。每个虚拟主机本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。目的:起到隔离效果,比如:一个RabbitMQ中,有一台针对Java调用的虚拟主机,有一台针对python调用的虚拟主机,这样Java虚拟主机如果出现问题,也不会影响python这台(也有按照开发、测试、生产环境来的)。
Broker:表示消息队列服务器实体。
RabbitMQ 流程
RabbitMQ流程图:
RabbitMQ 安装
- 执行docker命令。
# 1. 启动 rabbitmq:management 容器
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 \
-p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
# 2. 自动重启
docker update rabbitmq --restart=always
rabbitmq:management 集成了:web管理后台的端口。
端口 解释:
- 启动成功后,访问 IP:15672 查看页面即可。
- 账号密码默认都是:guest
RabbitMQ 页面介绍
Overview 概述:
Admin 管理:
Exchanges 交换机:
Queues 队列:
Exchange 交换机类型
Direct Exchange:直接模式,直接 交换机。
- 将消息交给指定队列,路由(routing key)按照绑定(Binding)关系,将消息发到对应的队列中,这个是完全匹配,完全按照路由绑定关系,去找对应的消息队列。
- 这种叫做完全匹配、单播模式。例如:routing key 为 dog 的消息,不会转发 dog.puppy … 其他的。
Fanout Exchange:广播模式, 扇形 交换机
- 每个发到fanout类型交换器的消息都会分到所有绑定的队列上去。
Topic Exchange:主题模式 , 主题 交换机
- topic 交换机通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配。
- 就相当于会区分路由键,不同的路由会走向不同的队列。会涉及到:通配符(#、*)
Spring Boot 整合RabbitMQ
- 引入amqp启动类,RabbitAutoConfiguration就会自动生效。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- RabbitAutoConfiguration类,自动配置了RabbitTemplate、AmqpAdmin 等实例对象。
- @EnableRabbit 标识启动注解。
package com.atguigu.gulimall.order;import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;// 启动 注解
@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {public static void main(String[] args) {SpringApplication.run(GulimallOrderApplication.class, args);}}
- 配置rabbitmq属性。
# rabbitmq 配置信息
spring.rabbitmq.host=www.gulimall.com
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
AmqpAdmin 与 RabbitTemplate 的使用
package com.atguigu.gulimall.order;import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.Date;@SpringBootTest
class GulimallOrderApplicationTests {@AutowiredAmqpAdmin amqpAdmin;@AutowiredRabbitTemplate rabbitTemplate;// 发送message@Testpublic void sendMessage(){OrderReturnReasonEntity orderReturnReasonEntity = new OrderReturnReasonEntity();orderReturnReasonEntity.setId(1l);orderReturnReasonEntity.setCreateTime(new Date());orderReturnReasonEntity.setName("测试实体类");// 1. 发送消息// 如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现Serializable!!rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderReturnReasonEntity);}// 创建交换机@Testvoid createExchange() {// 参数:名字、是否持久化、是否自动删除DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);amqpAdmin.declareExchange(directExchange);}// 创建队列@Testvoid createQueue() {Queue queue = new Queue("hello-java-queue",true,false,false);amqpAdmin.declareQueue(queue);}// 创建绑定@Testvoid createBinding() {// 将exchange指定的交换机和destination目的地进行绑定,使用routingKey作为指定的路由键Binding binding = new Binding("hello-java-queue", // 目的地Binding.DestinationType.QUEUE, // 目的地类型"hello-java-exchange", // 指定交换机"hello.java", // 路由keynull // 参数);amqpAdmin.declareBinding(binding);}}
整合的 序列化问题
对于实体类序列化,默认为jdk序列化:
可以通过配置消息转换来实现Jackson2JSON序列化:
package com.atguigu.gulimall.order.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}}
Spring Boot 整合的 RabbitListener 监听
@RabbitListener 注解
@RabbitListener注解,参数:
- queues:声明需要监听的所有队列。
@RabbitListener注解:标注在 类 或者 方法上。(看源码注解能看出)
@RabbitHandler注解:标注在 方法上。
被@RabbitListener注解 ,标注方法上
:
- 第一个参数 Message message:原生消息相信信息,消息头 + 消息体
- 第二个参数 T<发送的消息的类型> T content :消息体里面对应的实体信息(一般为实体类)
- 第三个参数 Channel channel : 当前传输数据的通道。
注意:要引入rabbitmq的Channel。
package com.atguigu.gulimall.order.service.impl;import com.alibaba.fastjson.JSON;
import com.atguigu.common.utils.PageUtils;
import com.atguigu.common.utils.Query;
import com.atguigu.gulimall.order.dao.OrderItemDao;
import com.atguigu.gulimall.order.entity.OrderItemEntity;
import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import com.atguigu.gulimall.order.service.OrderItemService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import com.rabbitmq.client.Channel;
import java.util.Map;@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@Overridepublic PageUtils queryPage(Map<String, Object> params) {IPage<OrderItemEntity> page = this.page(new Query<OrderItemEntity>().getPage(params),new QueryWrapper<OrderItemEntity>());return new PageUtils(page);}/* @RabbitListener参数:* queues:声明需要监听的所有队列。** 被标注方法参数:* 第一个参数 Message message:原生消息相信信息,消息头 + 消息体* 第二个参数 T<发送的消息的类型> T content :消息体里面对应的实体信息(一般为实体类)* 第三个参数 Channel channel : 当前传输数据的通道。(有问题,没有Channel类)*/@RabbitListener(queues = {"hello-java-queue"})public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) {// body里面存储的是:发送的消息内容byte[] body = message.getBody();// fixme 这种方式太麻烦,可以用参数直接来接受消息体里面的内容。// OrderReturnReasonEntity orderReturnReasonEntity = JSON.parseObject(body.toString(), OrderReturnReasonEntity.class);System.out.println("body:" + content);// properties存储的是:发过来的消息头属性MessageProperties properties = message.getMessageProperties();System.out.println("接收到消息...内容:" + message.toString() + ",类型:" + message.getClass());}}
💡Tip:Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息场景:
- 订单服务启动多个;同一个消息,只能有一个客户端收到。
- 单个服务,一次只能处理一个消息,等消息完全处理完,方法运行结束,才可以接受到下一个消息。
@RabbitHandler 注解
被@RabbitListener注解 ,标注类上
参数:
- 重载方法,区分不同的消息的作用。
- 针对 接受的消息类型 会有多种的情况,可以使用@RabbitHandler来标识,不同方法来处理不同情况。
碰壁了,此处有问题,没有Channel类可能跟配置序列化有关系。
package com.atguigu.gulimall.order.service.impl;import com.atguigu.common.utils.PageUtils;
import com.atguigu.common.utils.Query;
import com.atguigu.gulimall.order.dao.OrderItemDao;
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.entity.OrderItemEntity;
import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import com.atguigu.gulimall.order.service.OrderItemService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import com.rabbitmq.client.Channel;
import java.util.Map;@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@Overridepublic PageUtils queryPage(Map<String, Object> params) {IPage<OrderItemEntity> page = this.page(new Query<OrderItemEntity>().getPage(params),new QueryWrapper<OrderItemEntity>());return new PageUtils(page);}/* @RabbitListener参数:* queues:声明需要监听的所有队列。** 被标注方法参数:* 第一个参数 Message message:原生消息相信信息,消息头 + 消息体* 第二个参数 T<发送的消息的类型> T content :消息体里面对应的实体信息(一般为实体类)* 第三个参数 Channel channel : 当前传输数据的通道。(有问题,没有Channel类)*/@RabbitHandlerpublic void receiveMessageOrderReturnReasonEntity(Channel channel, Message message, OrderReturnReasonEntity content) {System.out.println("接收到消息..." + content);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息处理完成..." + content);}@RabbitHandlerpublic void receiveMessageForOrderEntity(Channel channel, Message message,OrderEntity content) {System.out.println("接收到消息..." + content);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息处理完成..." + content);}}
@PostConstruct 注解 和 @Primary 注解 使用
@PostConstruct注解:相当于创建完当前这个对象后,之后调用的方法。可以翻译为:构造器之后。
例如:下面就是等 MyRabbitConfig 创建实例后,执行initRabbitTemplate方法,给RestTemplate对象配置相关内容。
package com.atguigu.gulimall.order.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {@Autowird // 此处报错,闭环错误。RabbitTemplate rabbitTemplate;// 使用JSON序列化机制,进行消息转换@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}/* @PostConstruct注解:相当于创建完MyRabbitConfig对象,之后调用的方法。翻译为构造器之后。*/@PostConstructpublic void initRabbitTemplate(){// 设置确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm...");System.out.println("CorrelationData:" + correlationData);System.out.println("ack:" + ack);System.out.println("cause:" + cause);}});}}
@Primary注解在Spring框架中表示当有多个相同类型的bean时,使用该注解赋予bean更高的优先级。比如在Spring IOC容器中有多个相同类型的bean时,当要注入该类型的bean,就可以使用@Primary来标注注入bean的优先,优先级高的bean先被注入。
RabbitMQ 发送端(生产者) 消息确认机制
事务消息(了解即可)
事务消息:
- 将所有的过程都锁定到一个事务中,一起成功,一起失败。
为了保证消息不丢失,可靠抵达,可以使用事务消息,但是性能却下降250倍,所以事务消息是不推荐使用的,为此引入确认机制
。
RabbitMQ消息确认的 三个阶段
先看官方文档:Reliability Guide — RabbitMQ
一个完整效果图:
- p:生产者,c:消费者。
p(provider) -> b(Broker):需要 confirmCallback
e(Exchange) -> q(Queue):需要 returnCallback
q(Queue) -> c(Consumer):需要 ack
第一个阶段:可靠抵达 - confirmCallback
实现 可靠抵达 步骤:
- 可靠抵达 配置,注意:不同版本的SpringBoot可能会不同。
## 开启发送端消息抵达Broker确认(使用的SpringBoot版本不支持弃用了)
# spring.rabbitmq.publisher-confirms=true
## 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
## 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-confirm-type=correlated
在Spring框架中,spring.rabbitmq.publisher-confirms 属性是用于开启或关闭消息发布确认的。从Spring AMQP 2.0开始,这个属性已经被弃用,并推荐使用 spring.rabbitmq.confirm-interval 和 spring.rabbitmq.publisher-returns 这两个新的属性来代替。
- rabbitTemplate的容器对象,配置好对应的setConfirmCallback方法,方便测试。
package com.atguigu.gulimall.order.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {RabbitTemplate rabbitTemplate;@Primary@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate();return rabbitTemplate;}// 使用JSON序列化机制,进行消息转换@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}public void initRabbitTemplate(){// 设置确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/* 第一个参数 correlationData:当前消息的唯一关联数据(这个是消息的唯一ID)* 第二个参数 ack:消息是否成功收到* 第三个参数 cause:失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm...correlationData:" + correlationData + ",ack:" + ack + ",cause:" + cause);}});}}
- 总结:只要消息抵达Broker就ack=true。
第二个阶段:可靠抵达 - ReturnCallback
- SpringBoot 配置
## 开启发送端消息抵达Broker确认(使用的SpringBoot版本不支持弃用了)
# spring.rabbitmq.publisher-confirms=true
## 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
## 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
## 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 添加消息的唯一ID方便测试。
- rabbitTemplate.convertAndSend方法,new CorrelationData(UUID.randomUUID().toString())的作用使用UUID来创建该消息的唯一ID。
@Autowired
RabbitTemplate rabbitTemplate;// new CorrelationData(UUID.randomUUID().toString()) 添加消息唯一ID,方便测试
rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderEntity,new CorrelationData(UUID.randomUUID().toString()));
- 配置 rabbitTemplate.setReturnsCallback方法。
package com.atguigu.gulimall.order.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {RabbitTemplate rabbitTemplate;@Primary@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate();return rabbitTemplate;}// 使用JSON序列化机制,进行消息转换@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}public void initRabbitTemplate(){// 设置确认回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/* 1、 只要消息抵达Broker就ack=true* 第一个参数 correlationData:当前消息的唯一关联数据(这个是消息的唯一ID)* 第二个参数 ack:消息是否成功收到* 第三个参数 cause:失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("confirm...correlationData:" + correlationData + ",ack:" + ack + ",cause:" + cause);}});// 设置消息抵达队列的确认回调rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {/* 该方法只有失败才会调用,成功不会被调用。* ReturnedMessage对象里面的参数解释:* private final Message message; 投递失败的消息详情信息* private final int replyCode; 回复的状态码* private final String replyText; 回复的文本内容* private final String exchange; 当时这个消息发给哪个交换机* private final String routingKey; 当时这个消息用哪个路由键*/@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("失败 消息,Message:" + returnedMessage.getMessage()+ ",replyCode:" + returnedMessage.getReplyCode() + ",replyText:" + returnedMessage.getReplyText()+ ",exchange:" + returnedMessage.getExchange() + ",routingKey:" + returnedMessage.getRoutingKey());}});}}
失败测试不成功,待解决。
可靠抵达 消费端(消费者) Ack消息确认机制
第三个阶段:ack 消息确认机制
💡Tip:ack全称:acknowledge 收到通知。
- 默认自动ack确认的,只要消息接收到,客户端会自动确认,(队列)服务端就会自动移除这个消息。
- 存在问题:当收到很多消息,自动回复给服务器ack,然而,服务器宕机了。这就导致了消息的丢失。
- 将ack设置为手动确认,之后进行测试。
- 手动确认模式,只要我们没有明确的告诉mq,货物被签收。没有Ack,消息就一直是Unacked状态。即使出现了宕机,消息也不会丢失,会重新变为ready。
## 将ack确认设置为手动模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 通过Channel来手动确认消息。
- channel.basicAck(deliveryTag, false) 肯定确认
- channel.basicNack(deliveryTag,false,true); 否定确认
- channel.basicReject(deliveryTag,true) 也是否定确认,但是不能批量操作。
package com.atguigu.gulimall.order.service.impl;import com.atguigu.common.utils.PageUtils;
import com.atguigu.common.utils.Query;
import com.atguigu.gulimall.order.dao.OrderItemDao;
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.atguigu.gulimall.order.entity.OrderItemEntity;
import com.atguigu.gulimall.order.entity.OrderReturnReasonEntity;
import com.atguigu.gulimall.order.service.OrderItemService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.Map;@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@RabbitHandlerpublic void receiveMessageOrderReturnReasonEntity(Channel channel, Message message, OrderReturnReasonEntity content) {System.out.println("接收到消息..." + content);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息处理完成..." + content);long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("deliveryTag: " + deliveryTag);/* channel.basicAck 等同于 签收获取,手动确认。* 第一个参数:可以理解为一个货物标签,就是消息的标签,是哪个消息被确认了。* 第二个参数:是否批量确认。*/// 签收货物,非批量模式try {if (deliveryTag % 2 == 0) {// 肯定确认channel.basicAck(deliveryTag, false);} else {// 否定确认/* channel.basicNack 等同于 拒绝签收,拒绝确认。* 第一个参数:可以理解为一个货物标签,就是消息的标签,是哪个消息被确认了。* 第二个参数:是否批量确认。* 第三个参数:是否重新回归队列。*/channel.basicNack(deliveryTag,false,true);// channel.basicReject(deliveryTag,true);// 效果一样,但是不能批量操作。 }} catch (IOException e) {// 网络中断e.printStackTrace();}}}
- 总结:
电商 订单中心
订单中心 的 重要经验
在电商系统中,订单中心很重要,涉及到3流,分别是信息流、资金流、物流。订单中心就相当于是三者的中间整合商。
订单的作用:把感兴趣的商品整合一起,生成一个支付单,然后完成一个发货的物流过程。
所以,订单模块是电商系统的枢纽,在订单这个环节商需求获取多个模块的数据和信息。同时对这多个信息进行加工处理,流向下个环节。
订单所涉及到的信息如下:
订单总流程:
- 名词:实物订单、虚拟订单(话费)、库存锁定(下了单没支付,需要锁定库存)、库存解锁(超时未支付,解锁)
- 正常流程:订单生成 -》 支付订单 -》 卖家发货 -》 确认收货 -》 交易成功。
订单生成 流程包括:
- 创建订单 -》 验令牌(幂等性) -》 验价格(优惠、扣减等等) -》 锁库存(只要有异常回滚订单数据)
锁库存方式:
- 通过SQL条件来控制即可。
update `wms_ware_sku` set stock_locked = stock_locked + #{num}
where sku_id = #{skuId}
and ware_id = #{wareId}
and stock - stock_locked >= #{num}
订单登录 拦截
- 创建拦截器。
- 也是用到了ThreadLocal存储用户信息。
package com.atguigu.gulimall.order.interceptor;import com.atguigu.common.constant.AuthServerConstant;
import com.atguigu.common.vo.MemberRespVo;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;/* @author: xuyanbo* @description: TODO* @date: 2023/10/31 11:37*/
@Component
public class LoginUserInterceptor implements HandlerInterceptor {public static ThreadLocal<MemberRespVo> loginUser = new ThreadLocal<>();@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {MemberRespVo userInfo = (MemberRespVo) request.getSession().getAttribute(AuthServerConstant.LOGIN_USER);if (userInfo != null) {loginUser.set(userInfo);return true;} else {// 没登录去登录request.getSession().setAttribute("msg","请先进行登录");response.sendRedirect("http://auth.gulimall.com/login.html");return false;}}}
- 注册拦截器,并且配置路径。
package com.atguigu.gulimall.order.config;import com.atguigu.gulimall.order.interceptor.LoginUserInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;/* @author: xuyanbo* @description: TODO* @date: 2023/10/31 11:39*/
@Configuration
public class OrderWebConfiguration implements WebMvcConfigurer {@AutowiredLoginUserInterceptor interceptor;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(interceptor).addPathPatterns("/");}}
Feign 远程调用丢失请求头问题(丢失请求头 等同于 登录失效了)
问题原因:
解决办法:
feign在发起远程调用之前,会经过一大堆的拦截器,我们也可以添加一个拦截器,将相关信息维护上:
- 添加一个Feign远程调用拦截器。
- 在拦截器里面,使用RequestContextHolder获取到请求以及请求头相关信息。
- 案例代码如下:
- ServletRequestAttributes attributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes(); 是重点。
RequestContextHolder的原理是ThreadLocal
package com.atguigu.gulimall.order.config;import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;/* @author: xuyanbo* @description: TODO* @date: 2023/10/31 14:56*/
@Configuration
public class GuliFeignConfig {// 添加一个拦截器,并且声明名字@Bean("requestInterceptor")public RequestInterceptor requestInterceptor(){return new RequestInterceptor() {@Overridepublic void apply(RequestTemplate template) {// 1. 使用RequestContextHolder拿到刚进来的这个请求ServletRequestAttributes attributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes();HttpServletRequest request = attributes.getRequest();// 2. 同步请求头数据,CookieString cookie = request.getHeader("Cookie");// 3. 给Feign的请求里面,同步当前请求的cookie信息template.header("Cookie",cookie);}};}}
Feign 异步情况丢失上下文问题
因为,RequestContextHolder 的原理是 ThreadLocal ,当我们使用异步的方式进行Feign的远程调用,相当于创建了多个子线程,而不是主线程了,这时RequestInterceptor拦截器里面的RequestContextHolder就无法获取到请求的相关信息了,因为请求信息在主线程的RequestContextHolder中。
解决办法:
- 将主线程的RequestContextHolder请求属性提前拿出来,赋值给多个子线程的RequestContextHolder请求属性就可以了。
@Override
public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {OrderConfirmVo confirmVo = new OrderConfirmVo();MemberRespVo memberRespVo = LoginUserInterceptor.loginUser.get();System.out.println("主线程..." + Thread.currentThread().getId());// 主线程RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();CompletableFuture<Void> getAddressFuture = CompletableFuture.runAsync(() -> {System.out.println("member子线程..." + Thread.currentThread().getId());// 在member子线程,设置请求信息RequestContextHolder.setRequestAttributes(requestAttributes);// 1. 远程查询所有的收货地址列表List<MemberAddressVo> address = memberFeignService.getAddress(memberRespVo.getId());confirmVo.setAddress(address);}, executor);CompletableFuture<Void> cartFuture = CompletableFuture.runAsync(() -> {System.out.println("cart子线程..." + Thread.currentThread().getId());// 在cart子线程,设置请求信息RequestContextHolder.setRequestAttributes(requestAttributes);// 2. 远程查询购物车所有选中的购物项List<OrderItemVo> items = cartFeignService.getCurrentUserCartItems();confirmVo.setItems(items);}, executor);CompletableFuture.allOf(getAddressFuture,cartFuture).get();// 3. 查询用户积分Integer integration = memberRespVo.getIntegration();confirmVo.setIntegration(integration);// 4. 其他数据自动计算// TODO 5. 防重令牌return confirmVo;
}
接口幂等性 处理(防重复提交)
幂等性 概念
幂等性概念:幂等性是指对同一个资源的多个请求,在业务逻辑上具有相同的结果。
幂等性 考虑情况
场景案例:
- 订单业务:用户点了多次订单,发起多次订单请求,出现了多个订单。
- 支付场景:用户点了多次支付,发起了多次支付请求,结果扣款了多次。
哪些情况需要防止:
- 用户多次点击按钮。
- 用户页面回退再次提交。
- 微服务互相调用,由于网络问题,导致请求失败,feign触发重试机制。
- 其他业务情况。
什么情况需要幂等?
- 有一些操作是天然幂等的。
例如:
select * from tableA from id = ?
update tab1 set col1 = 1 where col2 = 2
delete from user where userId = 1
insert into user(userId,name) values (1,0) userId作为唯一主键,只会插入一条用户数据也是具备幂等性的。
无论执行多少次,结果都一样,不会改变状态,这些就是天然幂等的,具有幂等性的。
- 不具有幂等性的情况。
update tab1 set col1 = col1 + 1 where col2 = 2 ,每次操作执行结果都会发生变化,这就不是幂等性的。
insert into user(userId,name) values (1,0) userId,name都不是唯一主键,可以重复,这样的也是不具备幂等性的。
幂等解决方案
token机制
- 服务端提供了发送 token 的接口。我们在分析业务的时候,哪些业务是存在幂等问题的, 就必须在执行业务前,先去获取 token,服务器会把 token 保存到 redis 中。
- 然后调用业务接口请求时,把 token 携带过去,一般放在请求头部。
- 服务器判断 token 是否存在 redis 中,存在表示第一次请求,然后删除 token,继续执行业务。
- 如果判断 token 不存在 redis 中,就表示是重复操作,直接返回重复标记给 client,这样就保证了业务代码,不被重复执行。
例如:本项目,就是thymleaf渲染页面前,就给页面封装了一个防重令牌(token),这样就是前端一个令牌,后端redis存了一个令牌。
危险性:
- 先删除 token 还是后删除 token;
- (1) 先删除可能导致,业务确实没有执行,重试还带上之前 token,由于防重设计导致, 请求还是不能执行。
- (2) 后删除可能导致,业务处理成功,但是服务闪断,出现超时,没有删除 token,别人继续重试,导致业务被执行两边
- (3) 我们最好设计为先删除 token,如果业务调用失败,就重新获取 token 再次请求。
- Token 获取、比较和删除必须是原子性
- (1) redis.get(token) 、token.equals、redis.del(token)如果这两个操作不是原子,可能导 致,高并发下,都 get 到同样的数据,判断都成功,继续业务并发执行
- (2) 可以在 redis 使用 lua 脚本完成这个操作
if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
// 1. 验证令牌[令牌的对比和删除必须保证原子性]
// 0 令牌失败,1删除成功
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
String orderToken = vo.getOrderToken();
Long result = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberRespVo.getId()), orderToken
);
// 原子验证令牌和删除令牌
if (result == 0l) {// 令牌验证失败return response;
} else {// 令牌验证成功// 去创建订单,验令牌,验价格,锁库存...
}
各种锁机制
数据库悲观锁
悲观锁使用时,一般伴随着事务一起使用,数据锁定时间可能会很长,需要根据实际情况选用。另外注意的时,id字段一定是主键或者唯一索引,不然可能造成锁表的结果,处理起来会非常麻烦。
数据库乐观锁
该方法适合更新的场景中:(带版本号)
update t_goods set count = count + 1 , version = version + 1
where good_id = 2 and version = 1
根据version版本,也就是再操作库存钱先获取到当前商品的version版本号,然后操作的时候带上此version号。
例如:我们第一次操作库存时,得到version为1,调用库存服务version变成了2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传入的version还是1,在执行上面的sql语句时,就不会执行;因为version已经变为了2了,where条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。
乐观锁主要使用于处理读多写少的问题。
业务层分布式锁
如果多个机器可能在同一时间同时处理相同的数据,比如:多台机器定时任务都拿到了相同数据处理,我们就可以加分布式锁,锁定此数据,处理完成后释放锁。获取到锁的必须先判断这个数据是否背处理过。
各种唯一约束
数据库唯一约束
插入数据,按照唯一索引来进行插入,比如:订单号,这样相同的订单不可能有两条记录插入。
redis set 防重
很多数据需要处理,只能被处理一次,比如:我们可以计算数据的MD5将其放入redis的set,每次处理数据,先看这个MD5是否已经存在,存在就不处理。
防重表
使用订单号orderNo作为去重表的唯一索引,把唯一索引插入去重表,在进行业务操作,且他们在同一事务中。这个保证了重复请求时,因为去重表有唯一约束,导致请求失败,避免了幂等问题。这里注意的是,去重表和业务表应该在同一库中,这样就保证了在同一个事务,即使业务操作失败了,也会把去重表的数据回滚。这个很好的保证了数据一致性。
全局请求唯一id
调用接口时,生成一个唯一id,redis将数据保存到集合中(去重),存在即处理过,可以使用nginx设置每一个请求的唯一id:
proxy_set_header X-Request-Id $request_id;
特别是 Feign服务,触发重发请求,拿着以前的老请求再重新发,这样可以给这个请求设置一个全局唯一ID,就算重复发了,也能检测出来是否处理过。
💡Tips:也适用于链路追踪
分布式事务
本地事务:在分布式系统,只能控制自己的回滚,控制不了其他服务的回滚。
分布式事务:最大的问题就是:网络问题 + 分布式机器。
本地事务
数据库事务的四个特性:ACID
- 原子性(atomicity)
- 一致性(Consistency)
- 隔离性(isolation)
- 持久性(Durability)
事务的隔离级别:
- read uncommitted 读未提交:别的事务会读到其他未提交事务的数据,问题:脏读。
- read committed 读已提交:一个事务可以读取另一个已提交的事务,但多次读取会造成不一样的结果,问题:不可重复读问题。Oracle 和 SQL server 默认隔离级别。
- repeatable read 可重复读:存在幻读问题。MySQL默认隔离级别。
- serializable 序列化:等同于 串行 效率低。
事务的传播行为:
- 一般都是required行为。
同一对象内事务方法互调默认失效,原因 绕过了代理对象,事务使用代理对象来控制的:
解决:使用代理对象来调用事务方法。
- 引入aop-starter ; spring-boot-starter-aop; 引入了aspectj
- @EnableAspectJAutoProxy(exposeProxy = true);开启aspectj 动态代理功能。以后所有动态代理都是对外暴露代理对象。
- 本类互调用调用对象
OrderServiceImpl orderService = (OrderServiceImpl)AopContext.currentProxy();
orderService.b(); // 调用orderService对象的b方法
orderService.c(); // 调用orderService对象的c方法。
分布式事务的 问题 以及 理论
CAP定理
CAP原则又称为CAP定理:
- Consistency 一致性:在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
- Availability 可用性:在集群中一部分节点故障后,集群整体是否还能影响客户端的读写请求。(对数据更新具备高可用性)
- Partition tolerance 分区容错性:大多数分布式系统分布在多个自网络。每个子网络就叫做一个区(partition)分区容错意思是:区间通信可能失败,比如,一台服务器放在中国,另一台服务器放在美国,这就是两个区,他们之间可能无法通信。
CAP原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。
分布式系统里面,分区容错肯定是要满足的,然而 一致性 和 可用性 这二者是相互冲突的。
分布式系统中实现一致性的raft算法:Raft (采用领导发布的效果原理)
还有paxos算法。
💡Tip:总结:一般市场上,都是基于AP的,无法保证c(强一致性),但是可以保证最终一致性。
BASE 理论
是对CAP理论的延伸,思想是即使无法做到强一致性(CAP的一致性就是强一致性
),但可以采用适当的采用弱一致性,即最终一致性
。
BASE是指:
- Basically Available 基本可用:是指分布式系统在出现故障的时候,允许损失部分可用性(例如:响应时间、功能上的可用性),允许损失部分可用性。需要注意的是:基本可用绝不等价于系统不可用。
- Soft State 软状态:软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。
- Eventual Consistency 最终一致性(弱一致性):最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。
分布式事务的 多种方案
2PC模式
场景:不适用于高并发,适用于一般分布式事务,该模式已经被取代延申。
数据库支持的 2PC【2 phase commit 二阶提交】,又叫做 XA Transactions。
MySQL 从 5.5 版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。
其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:
第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是 否可以提交.
第二阶段:事务协调器要求每个数据库提交数据。 其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务 中的那部分信息。
后来,出现了3PC模式,了解即可。
柔性事务 - TCC 事务补偿型 方案(常用)
场景:不推荐在高并发场景下,也是常用的分布式事务解决。
刚性事务:遵循 ACID 原则,强一致性。
柔性事务:遵循 BASE 理论,最终一致性;
与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。
分三个阶段:
第一阶段 prepare 行为:调用各个服务的 Try 逻辑。
第二阶段 commit 行为:调用各个服务的 Confirm 逻辑。
第三阶段 rollback 行为:有一个服务异常,则进行回滚操作 调用各个服务的 Cancel 逻辑。
此处,所谓的 补偿 ,举个例子:try逻辑是 某个数据-2 了,那么Cancel 补偿逻辑里面就是 +2 .
柔性事务 - 最大努力通知型方案(常用)
场景:基于消息服务的,适用高并发场景。
按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。这种方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。这种方案也是结合 MQ 进行实现,例如:通过 MQ 发送 http 请求,设置最大通知次数。达到通知次数后即不再通知。
案例:银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对账文件),支付宝的支付成功异步回调
就是不断的通知,告诉你结果。
柔性事务 - 可靠消息 + 最终一致性方案(常用)
场景:基于消息服务的,适用高并发场景。
实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。
也是出现问题,发送消息,服务接受到消息后就进行回滚,与上面那个相比多了消息这一步。
Seata 框架
Seata 介绍
官方地址:https://seata.io/zh-cn/
Seata是一款开源的分布式事务解决方案,致力于提高高性能和简单易用的分布式事务服务。提供了多种模式:AT、TCC、SAGA 和 XA事务模式。
AT:auto自动模式
TCC:Try、Confirm、Cancel
Seata很好理解,首先,弄明白下面三个名词:
TC:事务协调者,维护全局和分支。
TM:事务管理器,处理全局事务的 开始、提交、回滚操作。(所谓的全局事务是 谁是主业务发起的远程调用,那么TM就在谁的上面。)
RM:资源管理器,维护分支事务。
Seata 环境搭建
- 创建 UNDO_LOG 表
SEATA AT 模式需要 UNDO_LOG 表:(文档描述很清楚)
- 给每个微服务对应的 数据库 都创建一个 UNDO_LOG 表。
- 目的:记录日志的状态,确定是否回滚。
- 安装事务协调器:seata-server。
- 根据指示来就行,此处我们下载的1.0.0 GA 版本的seata。
- 在common-server公共服务中,导入Seata相关依赖:
<!-- 分布式事务seata -->
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
- 查找一个seata-all的依赖,查看版本:确保与事务协调器版本一致。也可以去SpringCloud 官方去看对应版本。
- 本项目是 1.5.2 ,那么事务协调器对应的也是1.5.2版本!
- 从github上面,下载seata-server(事务协调器),先解压。
- 相关配置文件解释:
register.conf 注册相关配置:
- registry 配置注册中心,type配置什么类型的注册中心。
config 配置配置中心,type也是用的什么类型的配置中心:
默认:file类型,seata服务默认有个file.conf文件。也可以改成nacos。
store 配置:配置seata的存储方式。
- 本次采用file文件方式存储。
不同版本可能不太一样,但是效果都差不多的:
- 下面是1.5.2 seata 版本:
- 配置好config、registry、store后,进入bin,启动seata项目。
- 并查看nacos是否注册成功。
- 所有想要用到分布式事务的客户端微服务,都要适用seata DataSourceProxy代理自己的数据源。(seata github上面有介绍)
- 参考官方地址:https://github.com/seata/seata-samples/tree/master/springcloud-jpa-seata
package com.atguigu.gulimall.order.config;import com.zaxxer.hikari.HikariDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;import javax.sql.DataSource;/* @author: xuyanbo* @date: 2023/11/2 18:57*/
@Configuration
public class MySeataConfig {// 数据源自带的配置属性@AutowiredDataSourceProperties dataSourceProperties;@Beanpublic DataSource dataSource(DataSourceProperties dataSourceProperties){// 根据源码创建数据源以及设置相关配置HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();if (StringUtils.hasText(dataSourceProperties.getName())){dataSource.setPoolName(dataSourceProperties.getName());}// 用seata的代理数据源,来配置即可。return new DataSourceProxy(dataSource);}}
- 每个客户端微服务,配置seata的相关信息。
- 不同版本可能不太一样,旧版是引入file.conf 和 registry.conf文件。
- 1.5.2版本的seata,是通过配置application.yml实现:
💡Tips:seata.tx-service-group=服务名 和 service.vgroup-mapping.服务名 的配置,用来映射seata-server的识别。
spring:datasource:username: rootpassword: rooturl: jdbc:mysql://www.gulimall.com:3306/gulimall_omsdriver-class-name: com.mysql.cj.jdbc.Driver
mybatis-plus:mapper-locations: classpath:/mapper//*.xmlglobal-config:db-config:id-type: auto # 主键自增
server:port: 9000# Seata 配置
seata:tx-service-group: gulimall-order #这里每个服务都是对应不同的映射名,在配置中心可以看到registry:type: nacosnacos:server-addr: localhost:8848group: DEFAULT_GROUPservice:vgroup-mapping:#这里也要注意 key为映射名,gulimall-order: default
- 给主事务服务(订单服务)添加@GlobalTransactional 和 @Transactional注解。
- 子事务服务(仓储服务)添加 @Transactional注解 即可。
@GlobalTransactional
@Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo vo) {SubmitOrderResponseVo response = new SubmitOrderResponseVo();confirmVoThreadLocal.set(vo);
...
- 启动测试分布式事务。
Seata 的几个模式
Seata 默认是AT模式:是2PC的演变,属于补偿性质。
官方案例都已经给出,几种模式的案例:
AT模式:相当于自动解锁,不适用于高并发的分布式事务,仅仅适用于一般的分布式事务。
要根据实际情况来,应用属于自己的分布式事务。
订单服务采用 最终一致性方案 解决分布式事务问题
因为,订单服务属于高并发服务,使用其他分布式方案可能会出现严重问题。
因此,考虑使用 可靠消息 + 最终一致性方案
,进而保证高并发。
RabbitMQ 延时队列(实现定时任务)
场景:比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。
旧版本:Spring的schedule定时任务轮询数据库
缺点:消耗系统内存、增加了数据库的压力、存在较大的时间误差。
新版本 RabbitMQ版本解决:rabbitmq的消息TTL和死信Exchange结合。
消息的TTL(Time To Live)就是消息的存活时间。
RabbitMQ可以对队列
和消息
分别设置TTL。
- 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
- 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的事件有可能不一样(不同的队列设置)。这里单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。
理解DLX(Dead Letter Exchanges):
什么是死信(消息)?
- 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。(basic.reject/basic.nack)requeue=false
- 上面的消息的TTL到了,消息过期了。
- 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
效果图:
- 涉及两种:消息设置过期时间 和 队列设置过期时间。
💡Tips:
推荐使用的是队列设置过期时间
。因为,在消息设置过期时间中,rabbitmq采用惰性检查机制
。例如:第一个消息5分钟过期,第二个消息1分钟过期,那么第二个消息就必须等第一个消息过期了才能被检测到。
延迟队列的 设计和实现
基本上是每一个微服务对应一个交换机就够了,交换机命名方式如:服务名-事件-exchange 对应的一系列服务。
业务流程图:
画设计流程图:(与业务流程图一个效果)
💡Tips:注意,交换机是被复用了,不要严格定义什么死信交换机之类的。其实,都是普通队列和交换机,只不过复用出了不同效果而已。
SpringCloud集成RabbitMQ了相关内容,直接通过@Bean进行注入,创建即可:
package com.atguigu.gulimall.order.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/* @author: xuyanbo* @date: 2023/11/3 14:13*/
@Configuration
public class MyMQConfig {/* 通过@Bean方式,将Binding、Queue、Exchange 自动创建RabbitMQ对应的交换机、队列、绑定等。*/// 创建死信队列@Beanpublic Queue orderDelayQueue(){Map<String, Object> arguments = new HashMap<>();/* x-dead-letter-exchange 绑定死信交换机* x-dead-letter-routing-key 绑定死信路由* x-message-ttl 绑定过期时间*/arguments.put("x-dead-letter-exchange","order-event-exchange");arguments.put("x-dead-letter-routing-key","order.release.order");arguments.put("x-message-ttl",60000);Queue queue = new Queue("order.delay.queue",true,false,false,arguments);return queue;}// 普通队列@Beanpublic Queue orderReleaseOrderQueue(){Queue queue = new Queue("order.release.order.queue",true,false,false);return queue;}// 声明交换机(根据路由复用了,区分好路由和绑定关系即可)@Beanpublic Exchange orderEventExchange(){TopicExchange topicExchange = new TopicExchange("order-event-exchange",true,false);return topicExchange;}// 声明绑定死信队列关系@Beanpublic Binding orderCreateOrderBingding(){return new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}// 声明绑定正常消费队列关系@Beanpublic Binding orderReleaseOrderBingding(){return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}}
💡Tips:RabbitMQ中,已经创建了Binding、Queue、Exchange ,在重新启动微服务执行@Bean时,并不会重新创建也不会修改属性之类的。
解决办法:手动删除即可。
这样就可以测试一下,延迟队列的效果:
// 1. 随便写个接口:
@Autowired
RabbitTemplate rabbitTemplate;@GetMapping("/test/orderCreate")
@ResponseBody
public String createOrderTest(){// 订单下单成功OrderEntity entity = new OrderEntity();entity.setOrderSn(UUID.randomUUID().toString());entity.setModifyTime(new Date());rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",entity);return "ok";
}
// 2. 写个监听器测试是否成功
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity entity, Channel channel , Message message) throws IOException {System.out.println("收到的过期的订单信息,准备关闭订单:" + entity.getOrderSn());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
锁和解锁库存 架构实现
业务流程图,如下:
- 引入rabbitmq依赖。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- rabbitmq相关配置
spring.rabbitmq.host=www.gulimall.com
spring.rabbitmq.virtual-host=/
- 添加 @EnableRabbit 启动类
- 添加rabbitmq的序列化机制转换
package com.atguigu.gulimall.ware.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {// 使用JSON序列化机制,进行消息转换@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}}
- 给库存服务,添加一系列的交换机、队列、绑定。
package com.atguigu.gulimall.ware.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/* @author: xuyanbo* @description: TODO* @date: 2023/10/27 14:43*/
@Configuration
public class MyRabbitConfig {// 使用JSON序列化机制,进行消息转换@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}// fixme 这个监听的作用就是为了触发一下rabbitmq,触发成功就会创建这些交换机或者绑定之类的。@RabbitListener(queues = "stock.release.stock.queue")public void handler(){}// 库存服务的默认交换机@Beanpublic Exchange stockEventExchange(){return new TopicExchange("stock-event-exchange",true,false);}// 普通队列@Beanpublic Queue stockReleaseStockQueue(){return new Queue("stock.release.stock.queue",true,false,false);}// 延迟队列(死信队列)@Beanpublic Queue stockDelayQueue(){Map<String, Object> arguments = new HashMap<>();/* x-dead-letter-exchange 绑定死信交换机* x-dead-letter-routing-key 绑定死信路由* x-message-ttl 绑定过期时间*/arguments.put("x-dead-letter-exchange","stock-event-exchange");arguments.put("x-dead-letter-routing-key","stock.release");// 2分钟arguments.put("x-message-ttl",120000);return new Queue("stock.delay.queue",true,false,false,arguments);}// 正常队列的绑定关系@Beanpublic Binding stockReleaseBinding(){return new Binding("stock.release.stock.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.release.#",null);}// 死信队列的绑定关系@Beanpublic Binding stockLockedBinding(){return new Binding("stock.delay.queue",Binding.DestinationType.QUEUE,"stock-event-exchange","stock.locked",null);}}
💡Tips:记得添加一个监听方法,不然没办法触发一下rabbitmq创建这些交换机或者绑定之类的。
- 启动服务查看创建。
订单服务 库存解锁 场景
库存解锁的场景:
- 下订单成功,订单过期没有支付被系统自动取消、被用户手动取消。都要解锁库存。
- 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁。
数据结构图:
查询数据库关于这个订单的锁定库存信息。两种情况:
- 有:证明库存锁定成功了。 解锁就要看订单情况。
- 没有这个订单。必须解锁。
- 有这个订单。就要看订单状态。订单已取消:解锁库存。 没取消:不能解锁。
- 没有:库存锁定失败了,库存回滚了。这种情况无需解锁。
还有重要一点,Rabbitmq监听必须设置为手动ack模式:
只要解锁库存的消息失败。一定告诉服务解锁失败,一定要启动手动ack模式,
# 配置:spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 代码:channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
案例代码:
- 一定要使用这种try - catch 方式来手动回复,简洁又方便。
- 这样抛出异常等同于消息没有处理成功,进行拒绝并重新放回队列。
- 没有抛出异常等同于消息处理成功,ack手动返回true
package com.atguigu.gulimall.ware.listener;import com.atguigu.common.to.mq.StockLockedTo;
import com.atguigu.gulimall.ware.service.WareSkuService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;/* @author: xuyanbo* @description: TODO* @date: 2023/11/4 10:36*/
@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {@AutowiredWareSkuService wareSkuService;/* 1. 库存自动解锁:* 下订单成功,库存锁定成功。接下来的业务调用失败,导致订单回滚。之前锁定的库存就要自动解锁。* 2. 订单失败。* 锁库存失败** 只要解锁库存的消息失败。一定告诉服务解锁失败,一定要启动手动ack模式, channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);。* @param to* @param message*/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo to, Message message , Channel channel) throws IOException {// 这样抛出异常等同于消息没有处理成功,进行拒绝并重新放回队列。// 没有抛出异常等同于消息处理成功,ack手动返回truetry {System.out.println("收到解决库存的消息");wareSkuService.unlockStock(to);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
/* 解锁* 1. 查询数据库关于这个订单的锁定库存信息。* 两种情况:* 有:证明库存锁定成功了。* 解锁:看订单情况。* 1. 没有这个订单。必须解锁。* 2. 有这个订单。就要看订单状态。* 已取消:解锁库存。* 没取消:不能解锁。* 没有:库存锁定失败了,库存回滚了。这种情况无需解锁。*/
@Override
public void unlockStock(StockLockedTo to) {StockDetailTo detail = to.getDetail();Long detailId = detail.getId();WareOrderTaskDetailEntity byId = orderTaskDetailService.getById(detailId);if (byId != null) {// 解锁Long id = to.getId();WareOrderTaskEntity taskEntity = orderTaskService.getById(id);String orderSn = taskEntity.getOrderSn(); // 根据订单号查询订单状态R r = orderFeignService.getOrderStatus(orderSn);if (r.getCode() == 0) {// 订单数据返回成功OrderVo data = r.getData(new TypeReference<OrderVo>() {});// 订单不存在 或者 4订单已经取消 都要解锁库存if (data == null || data.getStatus() == 4) {// 订单已经被取消了。才能解锁库存if (byId.getLockStatus() == 1) {unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);} }} else {// 消息拒接以后重新放到队列里面,让别人继续消息解锁throw new RuntimeException("远程服务失败");}} else {// 无需解锁}
}private void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {// 库存解锁wareSkuDao.unlockStock(skuId, wareId, num);// update `wms_ware_sku` set stock_locked = stock_locked - #{num}// where sku_id = #{skuId} and ware_id = #{wareId}// 更新库存工作单的状态WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();entity.setId(taskDetailId);entity.setLockStatus(2); // 变为已解锁orderTaskDetailService.updateById(entity);
}
订单服务 定时关单 场景
架构图:
其实,就是设置一个延迟队列即可。
注意:订单关闭了,必须向库存服务发送一个消息,解锁库存。
整体流程也是通过rabbitmq监听器 等信息实现:
- 注意:此处用到了两个@RabbitHandler注解,来区分了哪个是订单到时后的主动解锁,哪个是锁库存之后的解锁。
RabbitMQ 消息积压、丢失、重复解决方案
如何保证消息可靠性 - 消息丢失
消息丢失场景 一:消息发送出去,由于网络原因没有抵达服务器。
解决办法:
- 做好容错方法(try - catch)发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式。
// 发给MQ一个
try {// TODO 保证消息一定会发送出去,每一个消息都可以做好日志记录。给数据库保存每一个消息的详细信息。// TODO 定期扫描数据库将失败的消息再发送一遍。// 执行这个方法的时候,执行完了,网络延迟或者失败了。就要抛出异常,走catch。rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderTo);
} catch (Exception e) {// TODO 将没发送成功的消息进行重试发送。...
}
- 做好日志记录,每个消息状态是否都被服务器收到都应该记录。
可以设置一个MQ消息表
,将每一个消息保存下来,定期扫描数据库将失败的消息再发送一遍
CREATE TABLE `mq_message` (`message_id` char(32) NOT NULL,`content` text,`to_exchane` varchar(255) DEFAULT NULL,`routing_key` varchar(255) DEFAULT NULL,`class_type` varchar(255) DEFAULT NULL,`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',`create_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8mb4 COMMENT='MQ消息表';
- 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发。
消息丢失场景 二:消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时,Broker尚未持久化完成,宕机。
解决方式:
publisher也必须加入确认回调机制,通过生成者和消费者的确认机制解决该问题
,确认成功的消息,修改数据库消息状态。
消息丢失场景 三:自动ACK的状态下。消费者收到消息,但没来得及处理消息,服务器宕机了。
解决方式:
一定开启手动ACK,消费成功才移除,失败或者没来的及处理就noAck并重新入队。
如何保证消息可靠性 - 消息重复
消息重复场景:
- 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者。
- 消息消费事变, 由于重试机制,自动又将消息发送出去。
- 成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送。
解决方法:
- 消费者的业务消费接口应该设置为
幂等性
的。比如:扣库存有工作单的状态标志。 - 使用
防重表(redis/mysql)
,发送消息每一个都有业务的唯一标识,处理过就不用处理过了。 - RabbitMQ的每一个消息都有
redelivered字段
,可以获取是否是被重新投递过来的,而不是第一次投递过来的。
如何保证消息可靠性 - 消息积压
消费积压场景:
- 消费者宕机积压。
- 消费者消费能力不足积压。
- 发送者发送流量太大。
解决方式:
- 上线更多的消费者,进行正常消费。
- 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理。