写在前面
1:分布式事务介绍
参考MySQL之分布式事务 。
2:seata实战
架构图:
可以看到seata在这里作为协调者的角色,协调所有事务的提交以及回滚,其中seata使用MySQL存储每个分支事务的执行状态信息,以便在需要回滚等操作时可以获取到相应的信息进行回滚操作。这里seata服务也是作为一个微服务节点来运行,因此也需要将信息注册到nacos中,这样可以方便的来做服务发现。
2.1:搭建seata服务
首先在这里 现在seata的运行jar包,在运行jar包之前还需要做一些配置操作。
- 首先持久化模式
这里我们使用的持久化模式是MySQL,修改如下:
## transaction log store, only used in server side
store {## store mode: file、db## 【改动点01】 - 替换成db类型mode = "db"
- 接着对应的修改数据库连接信息
store {mode = "db"## 【改动点02】 - 更改参数## database store propertydb {## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.datasource = "druid"## mysql/oracle/postgresql/h2/oceanbase etc.dbType = "mysql"driverClassName = "com.mysql.jdbc.Driver"## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection paramurl = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true"user = "root"password = ""minConn = 5maxConn = 30globalTable = "global_table"branchTable = "branch_table"lockTable = "lock_table"queryLimit = 100}
}
- 创建seata数据库和表
注意创建在seata数据库中。
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`status` TINYINT NOT NULL,`application_id` VARCHAR(32),`transaction_service_group` VARCHAR(32),`transaction_name` VARCHAR(128),`timeout` INT,`begin_time` BIGINT,`application_data` VARCHAR(2000),`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`xid`),KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8;-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(`branch_id` BIGINT NOT NULL,`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`resource_group_id` VARCHAR(32),`resource_id` VARCHAR(256),`branch_type` VARCHAR(8),`status` TINYINT,`client_id` VARCHAR(64),`application_data` VARCHAR(2000),`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`branch_id`),KEY `idx_xid` (`xid`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8;-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(`row_key` VARCHAR(128) NOT NULL,`xid` VARCHAR(96),`transaction_id` BIGINT,`branch_id` BIGINT NOT NULL,`resource_id` VARCHAR(256),`table_name` VARCHAR(32),`pk` VARCHAR(36),`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`row_key`),KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8;
- 创建undo_log
注意该表创建在应用对应的数据库中,咱们这里就是geekbang_coupon_db
,用来存储分支事务已经提交的事务的回滚操作信息:
CREATE TABLE IF NOT EXISTS `undo_log`
(`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'increment id',`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` DATETIME NOT NULL COMMENT 'create datetime',`log_modified` DATETIME NOT NULL COMMENT 'modify datetime',PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDBAUTO_INCREMENT = 1DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
- 开启服务发现
配置注册自身信息到nacos中:
registry {# 【改动点01】 - type变成nacostype = "nacos"# 【改动点02】 - 更换nacos {application = "seata-server-dyq"serverAddr = "192.168.10.62:8858"group = "myGroup"namespace = "dev"cluster = "default"username = ""password = ""}}
注意这里的group和namespace和咱们的应用保持一致,最后我们就可以通过bin目录下的启动脚本来启动应用了:
D:\programs\seata\seata\seata-server-1.4.2\bin>seata-server.bat
[0.003s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:D:\programs\seata\seata\seata-server-1.4.2\bin\\../logs/seata_gc.log instead.
[0.069s][info ][gc] Using G1
如果一切正常的话就可以在nacos中看到服务信息了:
2.2:seata AT模式
AT模式的角色如下:
TC:transaction coordinator,即seata server,扮演协调者的角色,负责协调全局事务的提交和回滚,维护全局和分支事务的状态。
TM:transaction manager,发一起一个全部事务,并对全局事务的提交和回滚进行决议,在AT方案中,TM由发起事务的微服务扮演。
RM:resource manager,资源管理器,向TC上报分支事务的状态,负责分支事务的提交和回滚,由参与的微服务扮演。
以custom模块调用template模块为例看下具体处理过程,可参考下图:
主要分为两个阶段:
1:各个微服务模块注册分支事务信息到seata,执行CRUD操作,seata会根据具体的操作生成对应的分支事务的回滚操作,并将信息存储到表undo_log中
2:如果是TM最终决议commit,则seata通知所有的rm提交事务,并删除undo_log表中的回滚数据,如果是TM最终决议rollabck,则所有的RM执行undo_log的回滚操作,回滚修改,最终也会删除undo_log的回滚数据。
接着我们来改造微服务,首先在custom和template模块中引入依赖:
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
和添加配置:
spring:...cloud:alibaba:seata:tx-service-group: seata-server-group
seata:application-id: coupon-template-serv-streamregistry:type: nacosnacos:application: seata-server-dyqserver-addr: 192.168.10.62:8868namespace: devgroup: myGroupcluster: defaultservice:vgroup-mapping:seata-server-group: default
接着在custom添加测试的接口,具体代码参看源码,用来调用template服务删除服务,做如下的事情:
1:custom调用template将coupon_template表对应templateId的信息标记为删除
2:cusotm服务本身标记coupon表tempalteId对应的信息标记为无效
首先在接口添加注解@GlobalTransactional(name = "coupon-customer-serv", rollbackFor = Exception.class)
,如下:
@DeleteMapping("templateCouponTemplate")
@GlobalTransactional(name = "coupon-customer-serv", rollbackFor = Exception.class)
public void templateCouponTemplate(@RequestParam("templateId") Long templateId) {customerService.deleteCouponTemplate(templateId);
}
来手动抛个异常:
@Override
@Transactional
public void deleteCouponTemplate(Long templateId) {templateService.deleteTemplate(templateId);couponDao.deleteCouponInBatch(templateId, CouponStatus.INACTIVE);// 模拟分布式异常throw new RuntimeException("AT分布式事务挂球了");
}
为了查看相关表生成的记录,我们可以在异常这行打个debug,然后看下数据,此时相当于custom调用template返回,custom的本地事务还没有提交时:
其中undo_log存储了回滚需要的信息,以json格式存储:
{"@class": "io.seata.rm.datasource.undo.BranchUndoLog","xid": "10.77.0.33:8091:7638587481204744198","branchId": 7638587481204744199,"sqlUndoLogs": ["java.util.ArrayList",[{"@class": "io.seata.rm.datasource.undo.SQLUndoLog","sqlType": "UPDATE","tableName": "coupon_template","beforeImage": {"@class": "io.seata.rm.datasource.sql.struct.TableRecords","tableName": "coupon_template","rows": ["java.util.ArrayList",[{"@class": "io.seata.rm.datasource.sql.struct.Row","fields": ["java.util.ArrayList",[{"@class": "io.seata.rm.datasource.sql.struct.Field","name": "id","keyType": "PRIMARY_KEY","type": 4,"value": 2},{"@class": "io.seata.rm.datasource.sql.struct.Field","name": "available","keyType": "NULL","type": -7,"value": true}]]}]]},"afterImage": {"@class": "io.seata.rm.datasource.sql.struct.TableRecords","tableName": "coupon_template","rows": ["java.util.ArrayList",[{"@class": "io.seata.rm.datasource.sql.struct.Row","fields": ["java.util.ArrayList",[{"@class": "io.seata.rm.datasource.sql.struct.Field","name": "id","keyType": "PRIMARY_KEY","type": 4,"value": 2},{"@class": "io.seata.rm.datasource.sql.struct.Field","name": "available","keyType": "NULL","type": -7,"value": false}]]}]]}}]]
}
2.3:seata TCC模式
包含try,confirm,cancel,三个阶段:
其中各个阶段所作的事情如下:
try:锁定资源,相当于prepare
confirm:执行具体的操作,相当于commit
cancel:解锁try锁定的资源,相当于rollback
为了实现TCC,我们首先需要注册TCC接口:
@LocalTCC
public interface CouponTemplateServiceTCC extends CouponTemplateService {@TwoPhaseBusinessAction(name = "deleteTemplateTCC",commitMethod = "deleteTemplateCommit",rollbackMethod = "deleteTemplateCancel")void deleteTemplateTCC(@BusinessActionContextParameter(paramName = "id") Long id);void deleteTemplateCommit(BusinessActionContext context);void deleteTemplateCancel(BusinessActionContext context);
}
为了支持try操作,在表中增加lock字段,如下:
alter table coupon_templateadd locked tinyint(1) default 0 null;
对应的实体也需要修改:
@Column(name = "locked", nullable = false)
private Boolean locked;
接着tcc三个阶段对应的方法如下:
// TCC 的T 通过修改lock字段值,完成锁定
@Override
@Transactional
public void deleteTemplateTCC(Long id) {CouponTemplate filter = CouponTemplate.builder().available(true).locked(false).id(id).build();CouponTemplate template = templateDao.findAll(Example.of(filter)).stream().findFirst().orElseThrow(() -> new RuntimeException("Template Not Found"));template.setLocked(true);templateDao.save(template);
}// TCC 的第一个C
@Override
@Transactional
public void deleteTemplateCommit(BusinessActionContext context) {Long id = Long.parseLong(context.getActionContext("id").toString());CouponTemplate template = templateDao.findById(id).get();template.setLocked(false);template.setAvailable(false);templateDao.save(template);log.info("TCC committed");
}// TCC 的第二个C
@Override
@Transactional
public void deleteTemplateCancel(BusinessActionContext context) {Long id = Long.parseLong(context.getActionContext("id").toString());Optional<CouponTemplate> templateOption = templateDao.findById(id);// 空回滚if (templateOption.isPresent()) {CouponTemplate template = templateOption.get();// 通过修改lock值解锁template.setLocked(false);templateDao.save(template);}log.info("TCC cancel");
}
done!!!
写在后面
参考文章列表
MySQL之分布式事务 。