8.Seata分布式事务
8.1. Seata简介
- Seata是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。
8.2. Seata工作组件
- XID:全局事务的唯一标识,在微服务调用链中传递,绑定到服务的事务的上下文。
- TC:事务协调者,就是Seata,维护全局和分支事务的状态,驱动全局事务提交或回滚。(以Seata Server的形式独立部署)
- TM:事务管理器,标注全局@GlobalTransactional启动入口动作的微服务模块,是事务的发起者。定义全局事务的范围:开始全局事务、提交或回滚全局事务。
- RM:资源管理器,是MySQL数据库本身,管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
8.3. Seata工作流程
1.TM向TC申请开启全局事务,创建成功生成全局唯一的XID。
2.XID在微服务调用链中传播。
3.RM向TC注册分支事务,将其纳入XID对应的全局事务的管辖。
4.TM向TC发起针对XID的全局提交或回滚决议。
5.TC调度XID下管辖的全部分支事务完成提交或回滚请求。
8.4. Seata安装
8.4.1.安装流程
-
下载解压
- https://github.com/seata/seata/releases
-
创建数据库
- seata的server端需要创建一个数据库,用于存储事务信息
CREATE DATABASE seata;
USE seata;
-
导入表
-
https://github.com/apache/incubator-seata/blob/2.x/script/server/db/mysql.sql
-
执行sql脚本
-
-
修改配置文件
- 修改application.yml
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.server:port: 7091spring:application:name: seata-serverlogging:config: classpath:logback-spring.xmlfile:path: ${log.home:${user.home}/logs/seata}extend:logstash-appender:destination: 127.0.0.1:4560kafka-appender:bootstrap-servers: 127.0.0.1:9092topic: logback_to_logstashconsole:user:username: seatapassword: seata
seata:config:# support: nacos, consul, apollo, zk, etcd3type: nacosnacos:server-addr: 127.0.0.1:8848namespace: ""group: SEATA_GROUPusername: nacospassword: nacosregistry:# support: nacos, eureka, redis, zk, consul, etcd3, sofatype: nacosnacos:application: seata-serverserver-addr: 127.0.0.1:8848group: SEATA_GROUPnamespace: ""cluster: defaultusername: nacospassword: nacosstore:# support: file 、 db 、 redis 、 raftmode: dbdb:datasource: druiddb-type: mysql driver-class-name: com.mysql.dj.jdbc.Driverurl: jdbc:mysql://localhost:3307/seata?characterEncoding=utf8&useUnicode=true&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=trueuser: rootpassword: 123456minConn: 10maxConn: 100global-table: global_tablebranch-table: branch_tablelock-table: lock_tabledistributed-lock-table: distributed_lockquery-limit: 1000maxWait: 5000# server:# service-port: 8091 #If not configured, the default is '${server.port} + 1000'security:secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017tokenValidityInMilliseconds: 1800000ignore:urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.jpeg,/**/*.ico,/api/v1/auth/login,/metadata/v1/**
-
启动nacos
-
启动seata
- bin目录下执行seata-server.sh
8.4.2.验证
- 访问nacos,服务注册成功
- 访问seata,账号密码初始化均为seata,登录成功
8.5. Seata使用(订单下单案例)
1.创建三个数据库
CREATE DATABASE seata_order;
CREATE DATABASE seata_storage;
CREATE DATABASE seata_account;
2.创建表
3.mybatis一键生成实体和标准mapper
4.创建三个微服务
- 订单服务
- 库存服务
- 账户服务
5.新增库存和账户两个Feign接口
- 库存接口
@FeignClient(name = "seata-storage-service")
public interface StorageFeignApi {/*** 扣减库存*/@PostMapping("/storage/decrease")ResultData decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
- 账户接口
@FeignClient(name = "seata-account-service")
public interface AccountFeignApi {/*** 扣减账户余额*/@PostMapping("/account/decrease")ResultData decrease(@RequestParam("userId") Long userId, @RequestParam("money") Long money);
}
6.订单微服务
- 配置文件
server:port: 2001spring:application:name: seata-order-servicecloud:nacos:server-addr: localhost:8848datasource:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3307/seata_order?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8&rewriteBatchedStatements=true&allowPublicKeyRetrieval=trueusername: rootpassword: 123456mybatis:mapper-locations: classpath:mapper/*.xmltype-aliases-package: org.example.cloud.entitiesconfiguration:map-underscore-to-camel-case: trueseata:registry:type: nacosnacos:server-addr: localhost:8848namespace: ""group: SEATA_GROUPapplication: seata-servertx-service-group: default_tx_group # 事务组名称,由他获得TC服务的集群名称service:vgroup-mapping:# 事务组名称与集群名称的映射关系,例如当前事务组为ProjectA,这个ProjectA的集群名称为default,如果当前集群down了,只需要修改集群名称即可启动备用集群进行事务管理default_tx_group: default data-source-proxy-mode: ATlogging:level:io:seata: info
- 业务代码
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {@Resourceprivate OrderMapper orderMapper;@Resource// 订单微服务调用库存微服务private StorageFeignApi storageFeignApi;@Resource// 订单微服务调用账户微服务private AccountFeignApi accountFeignApi;@Overridepublic void create(Order order) {//xid全局事务id检查String xid = RootContext.getXID();//1. 新建订单log.info("----->开始新建订单: " + "\t" + "xid: " + xid);// 订单新建初始状态为0order.setStatus(0);int result = orderMapper.insertSelective(order);Order orderFromDB = null;if(result > 0) {// 从mysql查出记录orderFromDB = orderMapper.selectOne(order);log.info("----->新建订单成功: " + "\t" + "orderFromDB info: " + orderFromDB);System.out.println();log.info("----->调用storage: " + "\t");storageFeignApi.decrease(orderFromDB.getProductId(), orderFromDB.getCount());log.info("----->调用storage完成: " + "\t");log.info("----->调用account: " + "\t");accountFeignApi.decrease(orderFromDB.getUserId(), order.getMoney());log.info("----->调用account完成: " + "\t");System.out.println();// 修改订单状态,从0改为1log.info("----->修改订单状态: " + "\t");orderFromDB.setStatus(1);Example whereCondition = new Example(Order.class);Example.Criteria criteria = whereCondition.createCriteria();criteria.andEqualTo("userId", order.getId());criteria.andEqualTo("status", 0);int updateResult = orderMapper.updateByExampleSelective(orderFromDB, whereCondition);log.info("----->修改订单状态完成: " + "\t" + "updateResult: " + updateResult);log.info("----->orderFromDB info: " + orderFromDB);}System.out.println();System.out.println("----->结束新建订单: " + "\t" + "xid: " + xid);}
}
7.库存微服务
- mapper
public interface StorageMapper extends Mapper<Storage> {void decrease(@Param("productId") Long productId, @Param("count") Integer count);
}
<update id="decrease">update t_storageset used = used + #{count},residue = residue - #{count}where product_id = #{productId}
</update>
8.账户微服务
- mapper
public interface AccountMapper extends Mapper<Account> {void decrease(@Param("userId") Long userId, @Param("money") Long money);
}
<update id="decrease">update t_accountset used = used + #{money},residue = residue - #{money}where user_id = #{userId}
</update>
8.6. Seata测试(订单下单案例)
-
未添加@GlobalTransactional注解
- 账户微服务超时异常
- 账户微服务和库存微服务均未回滚
- 订单状态为0,未进行更新
- 账户微服务业务代码异常
- 账户微服务和库存微服务均未回滚
- 订单状态为0,未进行更新
- 账户微服务超时异常
// OrderServiceImpl.java
@GlobalTransactional(name = "zzyy-create-order", rollbackFor = Exception.class)
public void create(Order order) {//xid全局事务id检查String xid = RootContext.getXID();//1. 新建订单log.info("----->开始新建订单: " + "\t" + "xid: " + xid);// 订单新建初始状态为0order.setStatus(0);int result = orderMapper.insertSelective(order);Order orderFromDB = null;if(result > 0) {// 从mysql查出记录orderFromDB = orderMapper.selectOne(order);log.info("----->新建订单成功: " + "\t" + "orderFromDB info: " + orderFromDB);System.out.println();log.info("----->调用storage: " + "\t");storageFeignApi.decrease(orderFromDB.getProductId(), orderFromDB.getCount());log.info("----->调用storage完成: " + "\t");log.info("----->调用account: " + "\t");accountFeignApi.decrease(orderFromDB.getUserId(), orderFromDB.getMoney());log.info("----->调用account完成: " + "\t");System.out.println();// 修改订单状态,从0改为1log.info("----->修改订单状态: " + "\t");orderFromDB.setStatus(1);Example whereCondition = new Example(Order.class);Example.Criteria criteria = whereCondition.createCriteria();criteria.andEqualTo("id", orderFromDB.getId());criteria.andEqualTo("status", 0);int updateResult = orderMapper.updateByExampleSelective(orderFromDB, whereCondition);log.info("----->修改订单状态完成: " + "\t" + "updateResult: " + updateResult);log.info("----->orderFromDB info: " + orderFromDB);}System.out.println();System.out.println("----->结束新建订单: " + "\t" + "xid: " + xid);
}
- 添加@GlobalTransactional注解
- 账户微服务超时异常
-
超时前,账户微服务和库存微服务均未回滚,逻辑正常更新,但订单状态为0,未进行更新,且undolog记录存在
-
seata后台
-
超时后,账户微服务和库存微服务均回滚,订单记录消失,undolog记录被删除
-
seata后台
-
- 账户微服务超时异常
8.7.原理总结与面试题
8.7.1 AT模式如何做到对业务无侵入
1.两阶段提交协议的演变
-
一阶段:业务数据和回滚日志记录在同一个事务中提交,释放数据库锁资源。
-
二阶段:
- 提交异步化,非常快速地完成。
- 基于undo_log回滚日志,反向补偿。
2.一阶段加载
在一阶段,Seata会拦截业务SQL,
(1)解析SQL语义,找到业务SQL要更新的业务数据,在业务数据被更新前,将其保存成“before image”,
(2)执行业务SQL更新业务数据,在业务数据更新之后,
(3)将其保存成“after image”,最后生成行锁。
上述操作在一个数据库事务完成,保证原子性。
3.二阶段提交
- 二阶段如果是顺利提交,因为业务SQL在一阶段已经提交,所以Seata只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。
- 二阶段如果是回滚的话,Seata需要回滚一阶段已经执行的业务SQL,回滚的方式就是用“before image”还原业务数据,还原之前要首先校验脏写,对比“数据库当前业务数据”和“after image”,如果数据不一致,说明有脏写,需要转人工处理,如果数据一致,则用“before image”去覆盖“当前业务数据”,完成数据回滚。最后清理掉一阶段保存的快照数据和行锁。