背景
日常开发离不开分布式系统,自然避免不了分布式事务问题,Seata 是一款阿里开源的主流分布式事务解决方案,但实际工作引入seata感觉有点重,本人之前在商业银行做开发也很少团队使用。今天趁此机会做个demo,玩一玩了解下。
前置工作
seata下载安装
官网下载地址:
https://seata.apache.org/zh-cn/unversioned/download/seata-server
本人下载安装的是seata-server-1.4.2版本
修改/seata-server-1.4.2/conf/file.conf配置文件
改为db, 改成自己数据库用户名、密码
## transaction log store, only used in seata-server
store {## store mode: file、db、redismode = "db"## rsa decryption public keypublicKey = ""db {datasource = "druid"dbType = "mysql"driverClassName = "com.mysql.cj.jdbc.Driver"url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true"user = "root"password = "rootroot"minConn = 5maxConn = 100globalTable = "global_table"branchTable = "branch_table"lockTable = "lock_table"queryLimit = 100maxWait = 5000}
}
修改/seata-server-1.4.2/conf/registry.conf配置文件,写上nacos的配置参数
registry {# file 、nacos 、eureka、redis、zk、consul、etcd3、sofatype = "nacos"nacos {application = "seata-server"serverAddr = "127.0.0.1:8848"group = "DEFAULT_GROUP"namespace = ""cluster = "default"username = "nacos"password = "nacos"}file {name = "file.conf"}
}config {# file、nacos 、apollo、zk、consul、etcd3type = "nacos"nacos {serverAddr = "127.0.0.1:8848"namespace = ""group = "DEFAULT_GROUP"username = "nacos"password = "nacos"dataId = "seataServer.properties"}file {name = "file.conf"}
}
请务必留意上文配置中的dataId = seataServer.properties
这个dataId需要配到nacos中,下面我们开始配置这个
由于namespace未指定特殊值,留空表示使用public
我们在nacos配置列表项的public命名空间中,新增配置文件"seataServer.properties",配置类型选择为"Properties",新建时确保名称如下:
Data ID: seataServer.properties
Group: DEFAULT_GROUP
完事后启动seata的server服务
sh seata-server.sh
然后在编辑区贴入示例内容页中的内容。
贴入的内容有部分关键地方需要修改,请留意
nacos下载安装
从官网下载的nacos,配置standlone模式运行,完毕后环境如下:
访问地址:http://192.168.5.210:8848/nacos
用户名:nacos
密码:nacos
配置内容:
#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html
#Transport configuration, for client and server
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none#Transaction routing rules configuration, only for the client
service.vgroupMapping.order-seata-service-group=default
service.vgroupMapping.goods-seata-service-group=default
#If you use a registry, you can ignore it
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false#Transaction rule configuration, only for the client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h#Log rule configuration, for client and server
log.exceptionRate=100#Transaction storage configuration, only for the server. The file, DB, and redis configuration values are optional.
store.mode=db
store.lock.mode=file
store.session.mode=file
#Used for password encryption
store.publicKey=#If `store.mode,store.lock.mode,store.session.mode` are not equal to `file`, you can remove the configuration block.
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100#These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block.
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=rootroot
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000#These configurations are required if the `store mode` is `redis`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `redis`, you can remove the configuration block.
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.sentinel.masterName=
store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=
store.redis.queryLimit=100#Transaction rule configuration, only for the server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false#Metrics configuration, only for the server
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
准备数据库表
我用的本地mysql数据库
/*Navicat Premium Data TransferSource Server : localhostSource Server Type : MySQLSource Server Version : 80033 (8.0.33)Source Host : localhost:3306Source Schema : mail_orderTarget Server Type : MySQLTarget Server Version : 80033 (8.0.33)File Encoding : 65001Date: 25/04/2024 07:23:32
*/SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for t_order
-- ----------------------------
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键id',`order_no` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '订单编号',`goods_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '商品名称',`goods_total_count` int DEFAULT NULL COMMENT '下单商品数量',`sku_number` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '商品sku编码',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`modify_time` datetime DEFAULT NULL COMMENT '修改时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=44 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-- ----------------------------
-- Records of t_order
-- ----------------------------
BEGIN;
COMMIT;-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (`branch_id` bigint NOT NULL COMMENT 'branch transaction id',`xid` varchar(100) NOT NULL COMMENT 'global transaction id',`context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` longblob NOT NULL COMMENT 'rollback info',`log_status` int NOT NULL COMMENT '0:normal status,1:defense status',`log_created` datetime(6) NOT NULL COMMENT 'create datetime',`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='AT transaction mode undo table';-- ----------------------------
-- Records of undo_log
-- ----------------------------
BEGIN;
COMMIT;SET FOREIGN_KEY_CHECKS = 1;
/*Navicat Premium Data TransferSource Server : localhostSource Server Type : MySQLSource Server Version : 80033 (8.0.33)Source Host : localhost:3306Source Schema : mail_goodsTarget Server Type : MySQLTarget Server Version : 80033 (8.0.33)File Encoding : 65001Date: 25/04/2024 07:24:42
*/SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;-- ----------------------------
-- Table structure for t_goods
-- ----------------------------
DROP TABLE IF EXISTS `t_goods`;
CREATE TABLE `t_goods` (`id` bigint NOT NULL COMMENT '主键id',`goods_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '商品名称',`goods_price` int NOT NULL COMMENT '商品价格',`goods_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '商品类型',`goods_stock` int NOT NULL COMMENT '商品库存',`sku_number` varchar(255) NOT NULL COMMENT 'sku编码',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-- ----------------------------
-- Records of t_goods
-- ----------------------------
BEGIN;
INSERT INTO `t_goods` (`id`, `goods_name`, `goods_price`, `goods_type`, `goods_stock`, `sku_number`) VALUES (1, '可乐', 2, 'DRINK', 100, '022000');
COMMIT;-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (`branch_id` bigint NOT NULL COMMENT 'branch transaction id',`xid` varchar(100) NOT NULL COMMENT 'global transaction id',`context` varchar(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` longblob NOT NULL COMMENT 'rollback info',`log_status` int NOT NULL COMMENT '0:normal status,1:defense status',`log_created` datetime(6) NOT NULL COMMENT 'create datetime',`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='AT transaction mode undo table';-- ----------------------------
-- Records of undo_log
-- ----------------------------
BEGIN;
COMMIT;SET FOREIGN_KEY_CHECKS = 1;
详细步骤
项目结构
基于springBoot快速搭建3个微服务,网关gateway,订单服务order,商品服务goods,,RPC之间通讯用的openfeign,浏览器会先通过网关访问订单服务,商品服务。
引入seata相关jar包依赖
订单,商品相关业务服务,引入seata依赖
<!-- seata --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-seata</artifactId><version>2.2.0.RELEASE</version><exclusions><exclusion><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.4.2</version></dependency>
自定义SeataInterceptor
package com.goods.interceptor;import io.seata.core.context.RootContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;/*** Copyright 2022 skyworth** @Author: wuhao* @CreateTime: 2023-06-22 10:52* @Description: 事务传播拦截器 从请求header中获取远程调用xid* @Version: 1.0**/
@Slf4j
public class SeataInterceptor extends HandlerInterceptorAdapter {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {String xid = RootContext.getXID();// 获取全局事务编号String rpcXid = request.getHeader("TX_XID");log.info("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);if (xid == null && rpcXid != null) {//设置全局事务编号, 应用开启一个全局事务后,RootContext会自动绑定当前事务的XID,事务结束后也会自动解绑XID。所以在应用运行的过程中可以直接调用 RootContext.getXID()方法获取全局事务的唯一标识。//将XID和当前进程绑定RootContext.bind(rpcXid);log.info("bind[{}] to RootContext", rpcXid);}return true;}
}
自定义WebMvcConfig
、package com.goods.config;import com.goods.interceptor.SeataInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;/*** Copyright 2022 skyworth** @Author: wuhao* @CreateTime: 2023-06-22 10:57* @Description: 启动的时候添加一个拦截器 拦截针对的路径为:/*** @Version: 1.0**/
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new SeataInterceptor()).addPathPatterns("/**").excludePathPatterns("/login");}
}
核心业务代码:从order服务发起一个请求,在一个函数内模拟下订单场景:先创建订单,再扣减库存,然后mock异常,让seata进行数据回滚
/*** 下订单场景:先创建订单,再扣减库存,* @return*///@Transactional(rollbackFor = Exception.class)@GlobalTransactionalpublic String addOrder() {log.info("创建订单,开始");Order order = new Order();order.setOrderNo("20230617");order.setGoodsTotalCount(10);order.setGoodsName("可乐");order.setSkuNumber("33220011");order.setModifyTime(LocalDateTime.now());order.setCreateTime(LocalDateTime.now());orderDao.save(order);log.info("创建订单,结束");ReduceGoodsRequest request = ReduceGoodsRequest.builder().reduceCount(10).id(1L).build();log.info("商品扣减库存,开始,rpc调用商品服务,入参:{}",JSONObject.toJSONString(request));ReduceGoodsResponse response = goodsFeignClient.reduce(request);log.info("商品扣减库存,完成,rpc调用商品服务,出参:{}",JSONObject.toJSONString(response));//mock异常,检查让分布式事务全回滚int i = 10/0;return "执行完成";}
测试
启动gateway, order, goods, 3分微服务,执行上述下订单函数,异常时,是否数据回滚。
查看order服务日志得出:RmBranchRollbackProcessor已经对全局事务xid:2711681301535092746的事务分支进行回滚操作了
branchType=AT,这表示分支事务的类型是AT(可能是基于补偿的自动事务模式)。resourceId=jdbc:mysql://127.0.0.1:3306/mail_order,这表示分支事务关联的资源ID
再回头看mysql数据库:订单表,商品表无数据变更,证明seata已经自动把数据回滚了。
参考文献
steata官网:https://seata.apache.org/zh-cn/docs/overview/what-is-seata/```