rabbit和mysql事务_分布式事务原理及SpringBoot整合RabbitMQ实现可靠事件,TCC事务模型及接口幂等性...

分布式事务

我们知道在单数据库系统中,实现数据的一致性,通过数据库的事务来处理比较简单。在微服务或分布式系统中,各个独立的服务都会有自己的数据库,而不是在同一个数据库中,所以当一组事务(如商品交易中,商品的库存、用户的账户资金和交易记录等)的处理是分布在不同数据库中的,分布式事务就是为了解决在多个数据库节点中保证这些数据的一致性。

分布式事务里有个BASE理论,在分布式数据库中,存在强一致性和弱一致性。

强一致性的好处是,对于开发者来说比较友好,数据始终可以读取到最新值,但这种方式需要复杂的协议,并且需要牺牲很多的性能。

弱一致性,对于开发者来说相对没有那么友好,无法保证读取的值是最新的,但是不需要引入复杂的协议,也不需要牺牲很多的性能。

弱一致性是当今企业采用的主流方案,它并不能保证所有数据的实时一致性,所以有时候实时读取数据是不可信的。它只是在正常的流程中,加入了提供修复数据的可能性,从而减少数据不一致的可能性,大大降低数据不一致的可能性。

什么时候使用分布式事务

对于像电商中用户隐私信息、商品信息、交易记录以及资金等数据,这些具备价值的核心数据,关系到用户隐私和财产的内容,应该考虑使用分布式事务来保证一致性。

但对于用户评价、自身装饰和其他一些非重要的个性化信息,可以采用非事务的处理。因为一个正常的系统出现不一致的情况是小概率事件,而非大概率事件,对于一些小概率的数据丢失,一般来说是允许的。之所以这样选择,主要基于两点,一个是开发者的开发难度;另一个是用户的体验,过多的分布式事务会造成性能的不断丢失

弱一致性分布式事务解决方案有如下几种:

状态表

RabbitMQ可靠事件

最大尝试

TCC模式

幂等性

在分布式事务中,各个访问操作的接口,都需要保证幂等性。

所谓幂等性,是指在HTTP协议中,一次和多次请求某一个资源,对于资源本身应该具有同样的结果,也就是其执行任意多次时,对资源本身所产生的影响,与执行一次时的相同。

实现方式有以下几种:

唯一索引 -- 防止新增脏数据

token机制 -- 防止页面重复提交

悲观锁 -- 获取数据的时候加锁(锁表或锁行)

乐观锁 -- 基于版本号version实现, 在更新数据那一刻校验数据

分布式锁 -- redis(jedis、redisson)或zookeeper实现

状态机 -- 状态变更, 更新数据时判断状态

※说明:如何实现接口的幂等性,可以分篇在接口的幂等性文章里解说。

状态表实现分布式事务

这里拿电商的商品交易为例,讲述下思路:

需要商品数据库:商品表、商品交易明细表;资金数据库:用户账户表、账户交易明细表

主要流程包括:

商品表减商品库存、

商品交易明细表中添加新的交易记录、

用户账户表中扣减用户账户表的资金、

资金交易明细表中记录账户交易明细表

需要准备一个状态表,用redis的Hset数据类型比较合适

这里假设相关的明细记录表中,有4个状态:

1--准备交易,

2--交易成功,

3--被冲正,

4--冲正记录

e2aa75306c65

交易流程

流程说明

在商品服务中,商品减库存后,记录商品交易明细,如果没有异常,就将商品交易记录的状态位设置为“1—准备提交”,并且记录在Redis的状态表中。

商品服务通过RESTFUL调用资金服务,如果成功,就将账户交易明细表的记录的状态位设置为“1—准备提交”,并且记录在Redis的状态表中。

最后,读取Redis相关的所有状态位,确定是否所有的操作都为“1—准备提交”状态,如果是,则更新产品服务的记录状态为“2—提交成功”,然后发起资金服务调用,将对应的记录(可通过业务流水号关联)的状态也更新为“2—提交成功”,这样就完成了整个交易。

如果不全部为“1—准备提交”状态,则发起各库的冲正交易,冲掉原有的记录,并且归还商品库存和账户金额。发起冲正交易,把原明细记录状态更新为3--被冲正,并往明细表中添加对应的新记录,状态为4--冲正记录

RabbitMQ可靠事件

使用RabbitMQ等消息队列中间件的可靠事件,来实现分布式事务,这里结合SpringBoot

前面有介绍过SpringBoot整合多数据库的文章,这里可以用到,具体参考《Spring Boot学习:MyBatis配置Druid多数据源》,切换数据源使用@DataSource注解,如下

@DataSource(value = DataSourceType.MASTER) //切换到商品数据库

@DataSource(value = DataSourceType.SLAVE) //切换到账户数据库

在此基础上我们加入RabbitMQ实现分布式事务功能

在pom.xml文件中加入依赖

org.springframework.boot

spring-boot-starter-amqp

yml配置文件中,关于RabbitMQ的配置如下:

# Spring 配置

spring:

rabbitmq:

host: localhost

port: 5672

username: admin

password: 123456

#使用发布者确认模式,发布消息者会得到一个“消息是否被服务提供者接收”的确认消息

publisher-confirms: true

#RabbitMQ 队列名称配置

rabbitmq:

queue:

fund: fund

3.创建RabbitMQ配置文件RabbitConfig.java

package com.zhlab.demo.config;

import org.springframework.amqp.core.Queue;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/**

* @ClassName RabbitConfig

* @Description //RabbitMQ消息队列配置

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:10

**/

@Configuration

public class RabbitConfig {

// 读取配置属性

@Value("${rabbitmq.queue.fund}")

private String fundQueueName = null;

// 创建RabbitMQ消息队列

@Bean(name="fundQueue")

public Queue createFundQueue() {

return new Queue(fundQueueName);

}

}

创建数据传输对象FundParams.java

package com.zhlab.demo.model;

import java.io.Serializable;

/**

* @ClassName FundParams

* @Description //FundParams

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:30

**/

public class FundParams implements Serializable {

// 序列化版本号

public static final long serialVersionUID = 989878441231256478L;

private Long xid; // 业务流水号

private Long userId; // 用户编号

private Double amount; // 交易金额

public FundParams() {

}

public FundParams(Long xid, Long userId, Double amount) {

this.xid = xid;

this.userId = userId;

this.amount = amount;

}

public Long getXid() {

return xid;

}

public void setXid(Long xid) {

this.xid = xid;

}

public Long getUserId() {

return userId;

}

public void setUserId(Long userId) {

this.userId = userId;

}

public Double getAmount() {

return amount;

}

public void setAmount(Double amount) {

this.amount = amount;

}

}

创建商品服务 业务逻辑PurchaseService.java

package com.zhlab.demo.service.goods;

import com.zhlab.demo.db.DataSourceType;

import com.zhlab.demo.db.annotation.DataSource;

import com.zhlab.demo.model.FundParams;

import com.zhlab.demo.utils.SnowFlakeUtil;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Service;

/**

* @ClassName PurchaseService

* @Description //商品 业务逻辑

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:24

**/

@Service

public class PurchaseService implements RabbitTemplate.ConfirmCallback {

//实现RabbitTemplate.ConfirmCallback接口

//需要实现它定义的confirm方法,这样它便可以作为一个发布者检测消息是否被消费者所接收的确认类

// SnowFlake算法生成ID

SnowFlakeUtil worker = new SnowFlakeUtil(003);

// RabbitMQ模板

@Autowired

private RabbitTemplate rabbitTemplate;

// 读取配置属性

@Value("${rabbitmq.queue.fund}")

private String fundQueueName;

// 购买业务方法

@DataSource(value = DataSourceType.MASTER) //切换到商品数据库

public Long purchase(Long productId, Long userId, Double amount) {

rabbitTemplate.setConfirmCallback(this);//设置了回调类为当前类

// SnowFlake算法生成序列号,用户跨服务的关联,这里用本地自定义方法,可以借助Leaf TinyID等分布式ID生成服务中间件

Long xid = worker.nextId();

// 传递给消费者的参数

FundParams params = new FundParams(xid, userId, amount);

// 发送消息给资金服务做扣款

this.rabbitTemplate.convertAndSend(fundQueueName, params); // ④

System.out.println("执行产品服务逻辑");

return xid;

}

/**

* 确认回调,会异步执行

* @param correlationData --相关数据

* @param ack -- 是否被消费

* @param cause -- 失败原因

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

/*

* ack代表是否成功。

* 如果投递消息失败,就会先停滞1秒,然后尝试进行冲正交易,冲掉原有交易,这样就可以使得数据平整

*/

if (ack){ // 消息投递成功

System.out.println("执行交易成功");

} else { // 消息投递失败

try {

// 停滞1秒(稍微等待可能没有完成的正常流程),然后发起冲正交易

Thread.sleep(1000);

} catch (Exception ex) {

ex.printStackTrace();

}

System.out.println("尝试产品减库存冲正交易。");

System.out.println("尝试账户扣减冲正交易。");

//在confirm方法中,如果参数ack为false,则说明消息传递失败,就要尝试执行冲正交易,把数据还原回来

System.out.println(cause); // 打印消息投递失败的原因

}

}

}

创建账户服务业务逻辑AccountService.java

package com.zhlab.demo.service.fund;

import com.zhlab.demo.db.DataSourceType;

import com.zhlab.demo.db.annotation.DataSource;

import com.zhlab.demo.model.FundParams;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Service;

/**

* @ClassName AccountService

* @Description //账户 业务逻辑

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 上午 11:25

**/

@Service

public class AccountService {

/* 消息监听,取YAML文件配置的队列名

*因为消息被消费,所以触发PurchaseService类的confirm方法

*spring.rabbitmq.listener.simple.acknowledge-mode = manual

*如果配置为手动,这里就需要手动确认消息,默认为自动的

*自动确认:这种模式下,当发送者发送完消息之后,它会自动认为消费者已经成功接收到该条消息。

*这种方式效率较高,当时如果在发送过程中,如果网络中断或者连接断开,将会导致消息丢失

*手动确认:消费者成功消费完消息之后,会显式发回一个应答(ack信号),

*RabbitMQ只有成功接收到这个应答消息,才将消息从内存或磁盘中移除消息。

*这种方式效率较低点,但是能保证绝大部分的消息不会丢失,当然肯定还有一些小概率会发生消息丢失的情况

*主要方法:basicAck、basicNack、basicReject根据具体业务情况使用,配合redis做幂等检验

*/

@RabbitListener(queues = "${rabbitmq.queue.fund}")

@DataSource(value = DataSourceType.SLAVE) //切换到账户数据库

public void dealAccount(FundParams params) {

//TODO具体业务逻辑需自己实现

System.out.println("扣减账户金额逻辑......");

}

}

7.写个测试接口来测试一下,创建MqController.java

package com.zhlab.demo.controller;

import com.zhlab.demo.service.goods.PurchaseService;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

/**

* @ClassName MqController

* @Description //RabbitMQ可靠消息 接口测试

* @Author singleZhang

* @Email 405780096@qq.com

* @Date 2020/12/11 0011 下午 2:25

**/

@RestController

@RequestMapping("/mq")

public class MqController {

@Autowired

private PurchaseService purchaseService;

@GetMapping("/test")

public String testMq() {

return purchaseService.purchase(1L, 1L, 200.0) + "";

}

}

以上就是基于RabbitMQ可靠消息 实现的分布式事务处理,逻辑和说明都在注释里了。

※说明:这样的确认方式,只是保证了事件的有效传递,但是不能保证消费类能够没有异常或者错误发生,当消费类有异常或错误发生时,数据依旧会存在不一致的情况。这样的方式,只是保证了消息传递的有效性,降低了不一致的可能性,从而大大降低了后续需要运维和业务人员处理的不一致数据的数量

TCC补偿事务

TCC代表的是

try(尝试)

confirm(确认)

cancel(取消)

在TCC事务中,要求任何一个服务逻辑都有3个接口,它们对应的就是尝试(try)方法、确认(confirm)方法和取消(cancel)方法。

e2aa75306c65

TCC事务模型

TCC事务的一致性可达99.99%,是一种较为成熟的方案,因此在目前有着较为广泛的应用。

继续通过上面的商品交易流程来解析这个模型:

一阶段

商品表减库存,商品交易明细表记录商品交易明细,并且将对应记录状态设置为“1—准备提交”。

调用账户服务,用户账户表扣减账户资金,账户交易明细表记录交易明细,并且将对应记录状态设置为“1—准备提交”

在一阶段的调用中,如果没有发生异常,就可以执行正常二阶段进行提交了

正常二阶段

商品服务 更新对应记录的状态为“2—提交成功”,使得数据生效

调用账户服务,使得对应的记录状态也为“2—提交成功”,这样正常的提交就完成了

如果在一阶段发生异常,需要取消操作,可以执行异常二阶段

异常二阶段

商品服务执行冲正交易,冲掉原有的产品交易,将库存归还给商品表

调用账户服务,发起冲正交易,冲掉原有的资金交易,将资金归还到账户里

注意,这些提交和退出机制在TCC中,都需要开发者对接口作幂等性处理

TCC事务机制,也并不能保证所有的数据都是完全一致的,它只是提供了一个可以修复的机制,来降低不一致的情况,从而大大降低后续维护数据的代价。TCC事务也会带来两个较大的麻烦:第一个是,原本的一个方法实现,现在需要拆分为3个方法,代价较大;第二个是,需要开发者自已实现提交和取消方法的幂等性

总结

使用分布式事务,并不是很容易的事情,甚至有些方法还相当复杂。

在互联网中,并不是所有的数据都需要使用分布式事务,所以首先要考虑的是:在什么时候使用分布式事务。即使需要使用分布式事务,有时候也并非需要实时实现数据的一致性,因为可以在后续通过一定的手段来完成。例如电商网站,对买家来说,需要的是快速响应,但对商家来说,就未必需要得到实时数据了,过段时间得到数据也是可以的,而这段时间就可以考虑进行数据补偿了。无论我们如何使用分布式事务,也无法使数据完全达到百分之百的一致性,因此一般金融和电商企业会通过对账等形式来完成最终一致性的操作。

在分布式事务的选择中,都会采用弱一致性代替强一致性,相对来说,弱一致性更加灵活,更方便我们开发。从网站的角度来说,弱一致性可以获得更佳的性能,提升用户的体验,这是互联网应用需要首先考虑的要素。

拓展---电商中的高并发和分布式事务

电商网站中高并发是常见的,高并发是针对用户而言的,比如抢购中,用户只希望短时间内快速抢到商品,而商家对于交易信息可以延迟处理得到。

这就是意味着,对于用户交易部分,要尽可能通过分布式事务进行保证,但而对于商户数据部分,实时性要求相对不是那么高,可以过段时间通过后续手段来补偿修复,从而缩小分布式事务的范围。

e2aa75306c65

确定需要分布式事务的范围

这里可以看出使用分布式事务的主要是请求数据,保证这个过程可以提高数据可靠性。对于商户数据,不需要使用分布式事务,这样可以提升性能,使抢购进行得更快,满足买家的需求,但是这也会引发数据的丢失。为了解决这个问题,后续可以通过和请求数据进行对比来修复数据,使数据达到一致,这个过程可以在高并发过后(一般高并发都是时间段性的,如性价比高的产品发布点、购物节开始时间段)进行,这样商户最终也可以得到可靠的数据,只是不是实时的,但是这并不影响商户和用户的业务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/572825.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python pandas 数据库_Python中pandas函数操作数据库

一:创建链接数据库引擎Pythonfrom sqlalchemy import create_engineengine create_engine(postgresql://user58.251.157.179:port/database,echo True)echo True ,会显示在加载数据库所执行的SQL语句。12345fromsqlalchemyimportcreate_engineenginecr…

mysql引擎模式_mysql引擎,完整的见表语句,数据库模式, 常用数据类型,约束条件...

引擎show engines : 查看引擎innodb(默认引擎):支持事务,行级锁,外键myisam:查询效率由于innodb,不需要支持事务,行级锁,外键,可以选用myisam来优化数据库mysql> create table t1(id int)engineinnodb;m…

testng连接MySQL_Selenium+TestNG实战-8-连接数据库方法去验证文章是否发布

原标题:SeleniumTestNG实战-8-连接数据库方法去验证文章是否发布记得之前群里,有人说举例一下连接数据库在Selenium自动化测试中的应用。本篇刚好来举例一个,前面我们都是通过发布后文章的详情页的标题来判断文章是否已经发布成功&#xff0c…

武汉mysql ocp考点_MySQL OCP考试复习系列–开篇:了解MySQL考试

MySQL OCP考试复习系列–开篇:了解MySQL考试嗯,那个决定去考MySQL OCP了,事实上最近工作一直围绕着DB2,MySQL要去考的话需要好好的复习的啊。150分钟,100道多选,答对60道题可以通过,费用1077。L…

java文件服务器_JavaWeb项目架构之NFS文件服务器

NFS简介NFS(Network File System)即网络文件系统。主要功能:通过网络(局域网)让不同的主机系统之间可以共享文件或目录。主要用途:NFS网络文件系统一般被用来存储共享视频,图片,附件等静态资源文件。NFS存储服务无NFS文件共享存储…

table 样式详解

1.table 中css样式控制border 只能控制外边框&#xff0c;内边框需要写<table border"1"> 2.table 会自动撑大&#xff0c;即使td 设置了 width和height这与div 是不同的 3.只有一个table的时候 &#xff0c;高度自适应全屏 <style type"text/css"…

spring整合

spring整合hibernate&#xff0c;整合什么&#xff1f; 1. Spring 整合 Hibernate 整合什么 ?1). 有 IOC 容器来管理 Hibernate 的 SessionFactory 2). 让 Hibernate 使用上 Spring 的声明式事务2. 整合步骤:1). 加入 hibernate ①. jar 包 ②. 添加 hibernate 的配置文件: hi…

看看大货车到底有多少盲区,肯定用得到!救命的!

上路的司机都知道&#xff0c;一旦看到大货车就要离它远远的&#xff0c;因为大货车的盲区大。可是又有多少轿车司机懂得盲区在哪里呢?不仅是轿车司机&#xff0c;许多行人和非机动车辆对于大货车的盲区也是一知半解&#xff0c;常常有人因此丧命。 行人篇 先给大家看一张最直…

msgpack java lua_使用lua-cmsgpack序列化和反序列化lua对象

原文在简书首发&#xff1a;http://www.jianshu.com/p/badf412db4e7lua-cmsgpack是一个开源的MessagePack实现方式、纯C的库&#xff0c;没有任何其它依赖&#xff0c;编译后可以直接被lua调用&#xff0c;目前主要支持Lua5.1/5.2/5.3 版本。1、什么是MessagePack&#xff1f;-…

全国250米DEM数据

全国250米DEM数据 DEM是数字高程模型的英文简称(Digital Elevation Model)&#xff0c;是研究分析地形、流域、地物识别的重要原始资料。由于DEM 数据能够反映一定分辨率的局部地形特征&#xff0c;因此通过DEM 可提取大量的地表形态信息&#xff0c;可用于绘制等高线、坡度图、…

redis集群连接 java_Redis分布式集群和直连的Java客户端调用方式详解

jedis是一个著名的key-value存储系统&#xff0c;而作为其官方推荐的java版客户端jedis也非常强大和稳定&#xff0c;支持事务、管道及有jedis自身实现的分布式。在这里对jedis关于事务、管道和分布式的调用方式做一个简单的介绍和对比&#xff1a;一、普通同步方式最简单和基础…

java爬虫新浪微博_java爬虫(爬新浪新闻) 如何从零开始

爬虫通常搜索引擎处理的对象是互联网网页。首先面临的问题是&#xff1a;如何能够设计出高效的下载系统&#xff0c;以将如此海量的网页数据传送到本地&#xff0c;在本地形成互联网网页的镜像备份。网络爬虫即起此作用&#xff0c;它是搜索引擎系统中很关键也很基础的构件。爬…

CodeVS 1068-乌龟棋

原题 题目描述 Description 小明过生日的时候&#xff0c;爸爸送给他一副乌龟棋当作礼物。 乌龟棋的棋盘是一行N个格子&#xff0c;每个格子上一个分数&#xff08;非负整数&#xff09;。棋盘第1格是唯一 的起点&#xff0c;第N格是终点&#xff0c;游戏要求玩家控制一个乌龟棋…

乔布斯传

资料参考 http://www.ruanyifeng.com/blog/2013/03/apple_inc_and_division_of_labor.html 苹果公司与分工原理 1.乔布斯 "乔布斯买了一间不错的房子&#xff0c;但家里只有一幅帕黎思&#xff08;Maxfield Parrish&#xff09;的画作、一部百灵牌咖啡机和几把双人牌的刀子…

ie11浏览器可以下载java吗_解析:WindowsXP系统能否安装IE11浏览器

现在&#xff0c;IE浏览器可以称得上是市场占有率最高的一款网页浏览器。因为windowsxp是一款比较久的操作系统&#xff0c;所以很多用户都会疑惑在xp上是否能够安装最新版的ie11浏览器。下面&#xff0c;小编就给大家详细解答下该问题。很遗憾的告诉大家&#xff0c;Windows X…

java ajax传输图片_Java使用Ajax实现跨域上传图片功能

说明 &#xff1a;图片服务器是用Nginx搭建的&#xff0c;用的是PHP语言这个功能 需要 用到两个js文件&#xff1a;jquery.js和jQuery.form.jsfunction submitImgSize1Upload() {var postData function( form , callback){var form document.getElementById("upload-for…

Java并发——线程中断学习

1. 使用interrupt()中断线程当一个线程运行时&#xff0c;另一个线程可以调用对应的Thread对象的interrupt()方法来中断它&#xff0c;该方法只是在目标线程中设置一个标志&#xff0c;表示它已经被中断&#xff0c;并立即返回。这里需要注意的是&#xff0c;如果只是单纯的调用…

分布式服务框架原理(一)设计和实现

分布式服务框架设计 分布式服务框架一般可以分为以下几个部分&#xff0c; &#xff08;1&#xff09;RPC基础层&#xff1a; 包括底层通信框架&#xff0c;如NIO框架、通信协议&#xff0c;序列化和反序列化协议&#xff0c;以及在这几部分上的封装&#xff0c;屏蔽底层通信细…

saltstack之混合匹配

需要-C参数: salt -C ## 使用grains属性来匹配 [roothadoop0 pillar]# salt -C Gos:Ubuntu test.ping uadoop1:True ## 使用Minion ID的正则表达式来匹配 [roothadoop0 pillar]# salt -C Euadoop\d test.ping uadoop2:True uadoop3:True uadoop1:True ## 使用gr…

java自定义 filter,HBase自定义Filter

必需要提前说明下&#xff1a;不建议使用自定义的Filter。所有的Filter都是在服务端生效&#xff1a;就是说需要将自定义的Filter封装为jar&#xff0c;上传到HBase的类路径下&#xff0c;并重启HBase使之生效。对于生产环境的HBase来说&#xff0c;重启通常是不能接受的。Filt…