rabbit和mysql事务_分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性...

分布式事务

我们知道在单数据库系统中,实现数据的一致性,通过数据库的事务来处理比较简单。在微服务或分布式系统中,各个独立的服务都会有自己的数据库,而不是在同一个数据库中,所以当一组事务(如商品交易中,商品的库存、用户的账户资金和交易记录等)的处理是分布在不同数据库中的,分布式事务就是为了解决在多个数据库节点中保证这些数据的一致性。

分布式事务里有个BASE理论,在分布式数据库中,存在强一致性和弱一致性。

强一致性的好处是,对于开发者来说比较友好,数据始终可以读取到最新值,但这种方式需要复杂的协议,并且需要牺牲很多的性能。

弱一致性,对于开发者来说相对没有那么友好,无法保证读取的值是最新的,但是不需要引入复杂的协议,也不需要牺牲很多的性能。

弱一致性是当今企业采用的主流方案,它并不能保证所有数据的实时一致性,所以有时候实时读取数据是不可信的。它只是在正常的流程中,加入了提供修复数据的可能性,从而减少数据不一致的可能性,大大降低数据不一致的可能性。

什么时候使用分布式事务

对于像电商中用户隐私信息、商品信息、交易记录以及资金等数据,这些具备价值的核心数据,关系到用户隐私和财产的内容,应该考虑使用分布式事务来保证一致性。

但对于用户评价、自身装饰和其他一些非重要的个性化信息,可以采用非事务的处理。因为一个正常的系统出现不一致的情况是小概率事件,而非大概率事件,对于一些小概率的数据丢失,一般来说是允许的。之所以这样选择,主要基于两点,一个是开发者的开发难度;另一个是用户的体验,过多的分布式事务会造成性能的不断丢失

弱一致性分布式事务解决方案有如下几种:

状态表

RabbitMQ可靠事件

最大尝试

TCC模式

幂等性

在分布式事务中,各个访问操作的接口,都需要保证幂等性。

所谓幂等性,是指在HTTP协议中,一次和多次请求某一个资源,对于资源本身应该具有同样的结果,也就是其执行任意多次时,对资源本身所产生的影响,与执行一次时的相同。

实现方式有以下几种:

唯一索引 -- 防止新增脏数据

token机制 -- 防止页面重复提交

悲观锁 -- 获取数据的时候加锁(锁表或锁行)

乐观锁 -- 基于版本号version实现, 在更新数据那一刻校验数据

分布式锁 -- redis(jedis、redisson)或zookeeper实现

状态机 -- 状态变更, 更新数据时判断状态

※说明:如何实现接口的幂等性,可以分篇在接口的幂等性文章里解说。

状态表实现分布式事务

这里拿电商的商品交易为例,讲述下思路:

需要商品数据库:商品表、商品交易明细表;资金数据库:用户账户表、账户交易明细表

主要流程包括:

商品表减商品库存、

商品交易明细表中添加新的交易记录、

用户账户表中扣减用户账户表的资金、

资金交易明细表中记录账户交易明细表

需要准备一个状态表,用redis的Hset数据类型比较合适

这里假设相关的明细记录表中,有4个状态:

1--准备交易,

2--交易成功,

3--被冲正,

4--冲正记录

e2aa75306c65

交易流程

流程说明

在商品服务中,商品减库存后,记录商品交易明细,如果没有异常,就将商品交易记录的状态位设置为“1—准备提交”,并且记录在Redis的状态表中。

商品服务通过RESTFUL调用资金服务,如果成功,就将账户交易明细表的记录的状态位设置为“1—准备提交”,并且记录在Redis的状态表中。

最后,读取Redis相关的所有状态位,确定是否所有的操作都为“1—准备提交”状态,如果是,则更新产品服务的记录状态为“2—提交成功”,然后发起资金服务调用,将对应的记录(可通过业务流水号关联)的状态也更新为“2—提交成功”,这样就完成了整个交易。

如果不全部为“1—准备提交”状态,则发起各库的冲正交易,冲掉原有的记录,并且归还商品库存和账户金额。发起冲正交易,把原明细记录状态更新为3--被冲正,并往明细表中添加对应的新记录,状态为4--冲正记录

RabbitMQ可靠事件

使用RabbitMQ等消息队列中间件的可靠事件,来实现分布式事务,这里结合SpringBoot

前面有介绍过SpringBoot整合多数据库的文章,这里可以用到,具体参考《Spring Boot学习:MyBatis配置Druid多数据源》,切换数据源使用@DataSource注解,如下

@DataSource(value = DataSourceType.MASTER) //切换到商品数据库

@DataSource(value = DataSourceType.SLAVE) //切换到账户数据库

在此基础上我们加入RabbitMQ实现分布式事务功能

在pom.xml文件中加入依赖

org.springframework.boot

spring-boot-starter-amqp

yml配置文件中,关于RabbitMQ的配置如下:

# Spring 配置

spring:

rabbitmq:

host: localhost

port: 5672

username: admin

password: 123456

#使用发布者确认模式,发布消息者会得到一个“消息是否被服务提供者接收”的确认消息

publisher-confirms: true

#RabbitMQ 队列名称配置

rabbitmq:

queue:

fund: fund

3.创建RabbitMQ配置文件RabbitConfig.java

package com.zhlab.demo.config;

import org.springframework.amqp.core.Queue;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @ClassName RabbitConfig

* @Description //RabbitMQ消息队列配置

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:10

**/

@Configuration

public class RabbitConfig {

// 读取配置属性

@Value("${rabbitmq.queue.fund}")

private String fundQueueName = null;

// 创建RabbitMQ消息队列

@Bean(name="fundQueue")

public Queue createFundQueue() {

return new Queue(fundQueueName);

}

}

创建数据传输对象FundParams.java

package com.zhlab.demo.model;

import java.io.Serializable;

/**

* @ClassName FundParams

* @Description //FundParams

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:30

**/

public class FundParams implements Serializable {

// 序列化版本号

public static final long serialVersionUID = 989878441231256478L;

private Long xid; // 业务流水号

private Long userId; // 用户编号

private Double amount; // 交易金额

public FundParams() {

}

public FundParams(Long xid, Long userId, Double amount) {

this.xid = xid;

this.userId = userId;

this.amount = amount;

}

public Long getXid() {

return xid;

}

public void setXid(Long xid) {

this.xid = xid;

}

public Long getUserId() {

return userId;

}

public void setUserId(Long userId) {

this.userId = userId;

}

public Double getAmount() {

return amount;

}

public void setAmount(Double amount) {

this.amount = amount;

}

}

创建商品服务 业务逻辑PurchaseService.java

package com.zhlab.demo.service.goods;

import com.zhlab.demo.db.DataSourceType;

import com.zhlab.demo.db.annotation.DataSource;

import com.zhlab.demo.model.FundParams;

import com.zhlab.demo.utils.SnowFlakeUtil;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Service;

/**

* @ClassName PurchaseService

* @Description //商品 业务逻辑

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:24

**/

@Service

public class PurchaseService implements RabbitTemplate.ConfirmCallback {

//实现RabbitTemplate.ConfirmCallback接口

//需要实现它定义的confirm方法,这样它便可以作为一个发布者检测消息是否被消费者所接收的确认类

// SnowFlake算法生成ID

SnowFlakeUtil worker = new SnowFlakeUtil(003);

// RabbitMQ模板

@Autowired

private RabbitTemplate rabbitTemplate;

// 读取配置属性

@Value("${rabbitmq.queue.fund}")

private String fundQueueName;

// 购买业务方法

@DataSource(value = DataSourceType.MASTER) //切换到商品数据库

public Long purchase(Long productId, Long userId, Double amount) {

rabbitTemplate.setConfirmCallback(this);//设置了回调类为当前类

// SnowFlake算法生成序列号,用户跨服务的关联,这里用本地自定义方法,可以借助Leaf TinyID等分布式ID生成服务中间件

Long xid = worker.nextId();

// 传递给消费者的参数

FundParams params = new FundParams(xid, userId, amount);

// 发送消息给资金服务做扣款

this.rabbitTemplate.convertAndSend(fundQueueName, params); // ④

System.out.println("执行产品服务逻辑");

return xid;

}

/**

* 确认回调,会异步执行

* @param correlationData --相关数据

* @param ack -- 是否被消费

* @param cause -- 失败原因

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

/*

* ack代表是否成功。

* 如果投递消息失败,就会先停滞1秒,然后尝试进行冲正交易,冲掉原有交易,这样就可以使得数据平整

*/

if (ack){ // 消息投递成功

System.out.println("执行交易成功");

} else { // 消息投递失败

try {

// 停滞1秒(稍微等待可能没有完成的正常流程),然后发起冲正交易

Thread.sleep(1000);

} catch (Exception ex) {

ex.printStackTrace();

}

System.out.println("尝试产品减库存冲正交易。");

System.out.println("尝试账户扣减冲正交易。");

//在confirm方法中,如果参数ack为false,则说明消息传递失败,就要尝试执行冲正交易,把数据还原回来

System.out.println(cause); // 打印消息投递失败的原因

}

}

}

创建账户服务业务逻辑AccountService.java

package com.zhlab.demo.service.fund;

import com.zhlab.demo.db.DataSourceType;

import com.zhlab.demo.db.annotation.DataSource;

import com.zhlab.demo.model.FundParams;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Service;

/**

* @ClassName AccountService

* @Description //账户 业务逻辑

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:25

**/

@Service

public class AccountService {

/* 消息监听,取YAML文件配置的队列名

*因为消息被消费,所以触发PurchaseService类的confirm方法

*spring.rabbitmq.listener.simple.acknowledge-mode = manual

*如果配置为手动,这里就需要手动确认消息,默认为自动的

*自动确认:这种模式下,当发送者发送完消息之后,它会自动认为消费者已经成功接收到该条消息。

*这种方式效率较高,当时如果在发送过程中,如果网络中断或者连接断开,将会导致消息丢失

*手动确认:消费者成功消费完消息之后,会显式发回一个应答(ack信号),

*RabbitMQ只有成功接收到这个应答消息,才将消息从内存或磁盘中移除消息。

*这种方式效率较低点,但是能保证绝大部分的消息不会丢失,当然肯定还有一些小概率会发生消息丢失的情况

*主要方法:basicAck、basicNack、basicReject根据具体业务情况使用,配合redis做幂等检验

*/

@RabbitListener(queues = "${rabbitmq.queue.fund}")

@DataSource(value = DataSourceType.SLAVE) //切换到账户数据库

public void dealAccount(FundParams params) {

//TODO具体业务逻辑需自己实现

System.out.println("扣减账户金额逻辑......");

}

}

7.写个测试接口来测试一下,创建MqController.java

package com.zhlab.demo.controller;

import com.zhlab.demo.service.goods.PurchaseService;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

/**

* @ClassName MqController

* @Description //RabbitMQ可靠消息 接口测试

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 下午 2:25

**/

@RestController

@RequestMapping("/mq")

public class MqController {

@Autowired

private PurchaseService purchaseService;

@GetMapping("/test")

public String testMq() {

return purchaseService.purchase(1L, 1L, 200.0) + "";

}

}

以上就是基于RabbitMQ可靠消息 实现的分布式事务处理,逻辑和说明都在注释里了。

※说明:这样的确认方式,只是保证了事件的有效传递,但是不能保证消费类能够没有异常或者错误发生,当消费类有异常或错误发生时,数据依旧会存在不一致的情况。这样的方式,只是保证了消息传递的有效性,降低了不一致的可能性,从而大大降低了后续需要运维和业务人员处理的不一致数据的数量

TCC补偿事务

TCC代表的是

try(尝试)

confirm(确认)

cancel(取消)

在TCC事务中,要求任何一个服务逻辑都有3个接口,它们对应的就是尝试(try)方法、确认(confirm)方法和取消(cancel)方法。

e2aa75306c65

TCC事务模型

TCC事务的一致性可达99.99%,是一种较为成熟的方案,因此在目前有着较为广泛的应用。

继续通过上面的商品交易流程来解析这个模型:

一阶段

商品表减库存,商品交易明细表记录商品交易明细,并且将对应记录状态设置为“1—准备提交”。

调用账户服务,用户账户表扣减账户资金,账户交易明细表记录交易明细,并且将对应记录状态设置为“1—准备提交”

在一阶段的调用中,如果没有发生异常,就可以执行正常二阶段进行提交了

正常二阶段

商品服务 更新对应记录的状态为“2—提交成功”,使得数据生效

调用账户服务,使得对应的记录状态也为“2—提交成功”,这样正常的提交就完成了

如果在一阶段发生异常,需要取消操作,可以执行异常二阶段

异常二阶段

商品服务执行冲正交易,冲掉原有的产品交易,将库存归还给商品表

调用账户服务,发起冲正交易,冲掉原有的资金交易,将资金归还到账户里

注意,这些提交和退出机制在TCC中,都需要开发者对接口作幂等性处理

TCC事务机制,也并不能保证所有的数据都是完全一致的,它只是提供了一个可以修复的机制,来降低不一致的情况,从而大大降低后续维护数据的代价。TCC事务也会带来两个较大的麻烦:第一个是,原本的一个方法实现,现在需要拆分为3个方法,代价较大;第二个是,需要开发者自已实现提交和取消方法的幂等性

总结

使用分布式事务,并不是很容易的事情,甚至有些方法还相当复杂。

在互联网中,并不是所有的数据都需要使用分布式事务,所以首先要考虑的是:在什么时候使用分布式事务。即使需要使用分布式事务,有时候也并非需要实时实现数据的一致性,因为可以在后续通过一定的手段来完成。例如电商网站,对买家来说,需要的是快速响应,但对商家来说,就未必需要得到实时数据了,过段时间得到数据也是可以的,而这段时间就可以考虑进行数据补偿了。无论我们如何使用分布式事务,也无法使数据完全达到百分之百的一致性,因此一般金融和电商企业会通过对账等形式来完成最终一致性的操作。

在分布式事务的选择中,都会采用弱一致性代替强一致性,相对来说,弱一致性更加灵活,更方便我们开发。从网站的角度来说,弱一致性可以获得更佳的性能,提升用户的体验,这是互联网应用需要首先考虑的要素。

拓展---电商中的高并发和分布式事务

电商网站中高并发是常见的,高并发是针对用户而言的,比如抢购中,用户只希望短时间内快速抢到商品,而商家对于交易信息可以延迟处理得到。

这就是意味着,对于用户交易部分,要尽可能通过分布式事务进行保证,但而对于商户数据部分,实时性要求相对不是那么高,可以过段时间通过后续手段来补偿修复,从而缩小分布式事务的范围。

e2aa75306c65

确定需要分布式事务的范围

这里可以看出使用分布式事务的主要是请求数据,保证这个过程可以提高数据可靠性。对于商户数据,不需要使用分布式事务,这样可以提升性能,使抢购进行得更快,满足买家的需求,但是这也会引发数据的丢失。为了解决这个问题,后续可以通过和请求数据进行对比来修复数据,使数据达到一致,这个过程可以在高并发过后(一般高并发都是时间段性的,如性价比高的产品发布点、购物节开始时间段)进行,这样商户最终也可以得到可靠的数据,只是不是实时的,但是这并不影响商户和用户的业务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/572825.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

rhel Linux 网络配置

--网络配置 vi /etc/sysconfig/network-scripts/ifcfg-eth0 1)DHCPDEVICEeth0BOOTPROTOdhcpONBOOTyes2)静态IP DEVICEeth0 BOOTPROTOnone IPADDR192.168.0.22 NETMASK255.255.255.0GATEWAY192.168.0.254DNS11…

如何下载php-5.5.38.tar.gz_搭建PHP服务器php-5.3.28.tar.gz

构建PHP运行环境PHP所用的软件 :php-5.3.28.tar.gz优化模块 :ZendGuardLoader-php-5.3-linux-glibc23-x86_64.tar.gz(两者之间版本一样)准备工作:卸载RPM安装的PHP[rootlocalhost php]# rpm -e php php-cli php-ldap php-common php-mysql --…

004-全局应用程序类Global.asax

服务器对象:Request、Response、Server、Session、Application、Cookie //功能1:为服务器对象注册Start、End处理 protected void Application_Start(object sender, EventArgs e){} protected void Application_End(object sender, EventArgs e){} 管道…

mysql创建表的时候日期给个默认值_mysql 创建表时 日期字段默认值为当前时间...

mysql 创建表时 日期字段默认值为当前时间mysql version 5.1在mysql创建表的时候经常会遇到创建日期字段需要设置当前时间为默认值的时候,就如sqlserver2000一样,把默认值设为getdate()即可,我在网上查了N久都没有查到使用有效的方法&#xf…

Android onKeyDown、onKeyUp、dispatchKeyEvent的区别

1. onKeyDown、onKeyUp、dispatchKeyEvent的区别和使用场景 区别:   1.1 onKeyDown、onKeyUp是按键事件的回调接口(冒泡式调用),dispatchKeyEvent是按键分发(隧道式分发) 1.2 onKeyDown接口的回调只…

python pandas 数据库_Python中pandas函数操作数据库

一:创建链接数据库引擎Pythonfrom sqlalchemy import create_engineengine create_engine(postgresql://user58.251.157.179:port/database,echo True)echo True ,会显示在加载数据库所执行的SQL语句。12345fromsqlalchemyimportcreate_engineenginecr…

策略模式,工厂模式,单例模式编写身份证的验证算法

策略模式&#xff1a;它定义算法家族&#xff0c;分别封装起来&#xff0c;让他们之间互相替换&#xff0c;此模式让算法的变化&#xff0c;不会影响使用算法的客户。 1 /// <summary>2 /// 策略模式3 /// </summary>4 public interface IidCheck5 …

mysql引擎模式_mysql引擎,完整的见表语句,数据库模式, 常用数据类型,约束条件...

引擎show engines : 查看引擎innodb(默认引擎)&#xff1a;支持事务&#xff0c;行级锁&#xff0c;外键myisam:查询效率由于innodb,不需要支持事务&#xff0c;行级锁&#xff0c;外键&#xff0c;可以选用myisam来优化数据库mysql> create table t1(id int)engineinnodb;m…

testng连接MySQL_Selenium+TestNG实战-8-连接数据库方法去验证文章是否发布

原标题&#xff1a;SeleniumTestNG实战-8-连接数据库方法去验证文章是否发布记得之前群里&#xff0c;有人说举例一下连接数据库在Selenium自动化测试中的应用。本篇刚好来举例一个&#xff0c;前面我们都是通过发布后文章的详情页的标题来判断文章是否已经发布成功&#xff0c…

凸包 poj 1113

求一个多边形 拐弯的地方用圆弧补上 距离>l 求他的周长 求一个凸包的周长 加2*pi*l #include<stdio.h> #include<string.h> #include<algorithm> #include<vector> #include<stack> #include<math.h> using namespace std;#define MA…

武汉mysql ocp考点_MySQL OCP考试复习系列–开篇:了解MySQL考试

MySQL OCP考试复习系列–开篇&#xff1a;了解MySQL考试嗯&#xff0c;那个决定去考MySQL OCP了&#xff0c;事实上最近工作一直围绕着DB2&#xff0c;MySQL要去考的话需要好好的复习的啊。150分钟&#xff0c;100道多选&#xff0c;答对60道题可以通过&#xff0c;费用1077。L…

java文件服务器_JavaWeb项目架构之NFS文件服务器

NFS简介NFS(Network File System)即网络文件系统。主要功能&#xff1a;通过网络(局域网)让不同的主机系统之间可以共享文件或目录。主要用途&#xff1a;NFS网络文件系统一般被用来存储共享视频&#xff0c;图片&#xff0c;附件等静态资源文件。NFS存储服务无NFS文件共享存储…

【bzoj】3224: Tyvj 1728 普通平衡树

3224: Tyvj 1728 普通平衡树 Time Limit: 10 Sec Memory Limit: 128 MBSubmit: 10097 Solved: 4302[Submit][Status][Discuss]Description 您需要写一种数据结构&#xff08;可参考题目标题&#xff09;&#xff0c;来维护一些数&#xff0c;其中需要提供以下操作&#xff1a…

java 修改 referer_看好你的门-客户端传数据-用java修改referer

1、简单说明Referer、origin用来表明&#xff0c;浏览器向WEB服务器表明自己来自哪里。但是就它本身而言&#xff0c;并非完全安全。写一个例子&#xff0c;可以任意修改http信息头中的referer、origin2、准备&#xff1a;用httpClient4.0来具体实现3、Java修改http信息头refer…

table 样式详解

1.table 中css样式控制border 只能控制外边框&#xff0c;内边框需要写<table border"1"> 2.table 会自动撑大&#xff0c;即使td 设置了 width和height这与div 是不同的 3.只有一个table的时候 &#xff0c;高度自适应全屏 <style type"text/css"…

libsvm java 实例_LibSVM Java API调用示例程序

【实例简介】LibSVM Java API调用示例程序Eclipse 完整工程可以运行相关详情见http://blog.csdn.net/yangliuy/article/details/8041343#comments3行程序搞定SVM分类-用JAVA程序调用LibSVM API 最简单的示例欢迎关注我的博客blog.csdn.net/yangliuy【实例截图】【核心代码】326…

关于H3 BPM数据库如何实现排序取数据的问题

问题&#xff1a; 在打印模板中获取子表内容&#xff0c;可从数据库中取数据&#xff0c;而当前项目数据库里面数据按年度录入&#xff0c;但是只需要显示近3年的数据&#xff0c;插件如何实现排序取数据&#xff1f; 解决方法&#xff1a; 1、先直接写一个SQL语句&#xff0c;…

java 将pdf转换成word_java如何实现pdf转word?

Atitit pdf转文本 pdfutiljava -jar C:\Users\attilax\Pictures\pdfbox-app-2.0.9.jar ExtractText "C:\atibeks517\l4 doc v3 r7a ori exted\_0index\一种简单的基于字符形状的验证码识别技术.pdf" c:\logs\识别技术.pdf.txt转html-consolefalseSend text to consol…

spring整合

spring整合hibernate&#xff0c;整合什么&#xff1f; 1. Spring 整合 Hibernate 整合什么 ?1). 有 IOC 容器来管理 Hibernate 的 SessionFactory 2). 让 Hibernate 使用上 Spring 的声明式事务2. 整合步骤:1). 加入 hibernate ①. jar 包 ②. 添加 hibernate 的配置文件: hi…

java 进程不关闭_java运行程序关不了窗口

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼package office菜单;import java.awt.Frame;import java.awt.Menu; //菜单组件import java.awt.MenuBar; //菜单条组件import java.awt.MenuItem; //菜单项组件public class frameTest {public static void main(String[] args) {f…