本文介绍一下如何使用seata api实现AT、TCC模式的分布式事务。
AT模式示例
启动seata-server服务
在 (02)_源码启动seata-server服务 中。
创建undo日志表
script/client/at/db/mysql.sql
文件:
-- for AT mode you must to init this sql for you business database.
-- the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT mode undo table';
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);
添加nacos配置
本示例nacos使用1.2.1版本,创建seata-client.properties配置文件:
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none
service.vgroupMapping.default_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.metadataMaxAgeMs=30000
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.rm.sqlParserType=druid
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h
tcc.contextJsonParserType=fastjson
log.exceptionRate=100
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
创建registry.yml配置文件
在classpath下创建registry.yml配置文件:
config:type: nacosnacos:serverAddr: localhost:8848namespace: seata_examplegroup: SEATA_GROUPdataId: seata-client.propertiesusername: nacospassword: nacos
registry:type: nacosnacos:serverAddr: localhost:8848namespace: seata_examplegroup: SEATA_GROUPusername: nacospassword: nacosapplication: seata-servercluster: default
示例实现思路
- 编写一个业务类,写一个业务方法
- 使用@GlobalTransactional注解标注业务方法
- 业务方法中在两个不同的线程、使用不同的数据源获取连接做更新操作
- 使用TMClient、RMClient初始化TM和RM客户端
- 使用DataSourceProxy类包装原始的DataSource
- 创建业务类对象,并使用seata提供的ProxyUtil为其创建代理对象
- 通过代理对象调用业务方法
示例实现
业务类和业务方法
public class AtGlobalTransactionSimpleTest {// 为业务方法启用全局事务@GlobalTransactionalpublic void globalTransactionTest() throws SQLException {changeAccountBalance();changeProductRemain();// 此处抛出自定义的运行时异常,之后全局事务会回滚throw new TransactionException("测试异常");}// 以下两个方法使用jdbc操作数据库protected void changeAccountBalance() throws SQLException {System.out.printf("[%s] %s\n", Thread.currentThread().getName(), "changeAccountBalance");String sql = "update account set balance=? where id=?";try (Connection connection = accountDs.getConnection();PreparedStatement ps = connection.prepareStatement(sql)) {ps.setDouble(1, 1000);ps.setInt(2, 3);int rows = ps.executeUpdate();if (rows != 1) {throw new TransactionException("修改账户余额失败");}}}protected void changeProductRemain() throws SQLException {System.out.printf("[%s] %s\n", Thread.currentThread().getName(), "changeProductRemain");String sql = "update product set remaining_count=? where id=?";try (Connection connection = productDs.getConnection();PreparedStatement ps = connection.prepareStatement(sql)) {ps.setInt(1, 1000);ps.setInt(2, 2);int rows = ps.executeUpdate();if (rows != 1) {throw new TransactionException("修改产品库存失败");}}}
}// 模拟在不同的线程执行更新操作
public class AtGlobalTransactionMultiThreadTest extends AtGlobalTransactionSimpleTest {private static final ExecutorService CH_ACCOUNT_EXECUTOR;private static final ExecutorService CH_PRODUCT_EXECUTOR;static {CH_ACCOUNT_EXECUTOR = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100), new MyThreadFactory("ch-account"));CH_PRODUCT_EXECUTOR = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(100), new MyThreadFactory("ch-product"));}@Override@GlobalTransactionalpublic void globalTransactionTest()