在springboot中Redis数据与MySQL数据的一致性方案思考和案例

文章目录

  • 前言
  • 一、双写一致性模式(同步)
    • Redis->MySQL
    • MySQL->Redis
  • 二、数据监听模式(异步)
    • Redis->MySQL
    • MySQL -> Redis
  • 总结


前言

Redis和MySQL之间保持数据一致性是个复杂的问题,搜索资料发现大部分也只做了理论的说明。主流的方案大概是两种,一种是同步,一种是异步。下面我们来分析这两种模式。


一、双写一致性模式(同步)

双写就是在插入Redis数据的同时再向MySQL写入,或者在写入MySQL的同时再向Redis写入。这种方式的优点是数据高度一致,而且实时同步。但缺点也很明显,侵入性太强,需要时刻编码,同时还需要考虑各自的事务控制。具体实现方案如下:

Redis->MySQL

这种方式需要Redis来显示控制事务,当然数据库事务也必须要有

package com.test.spring;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@RestController
@EnableTransactionManagement
public class TestRedisToMysqlApplication {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate JdbcTemplate jdbcTemplate;@RequestMapping("/test")@Transactionalpublic String test1(){stringRedisTemplate.execute(new RedisCallback<Boolean>() {@Overridepublic Boolean doInRedis(RedisConnection connection) throws DataAccessException {connection.multi();connection.commands().set("k1".getBytes(),"1".getBytes());connection.commands().set("k2".getBytes(),"2".getBytes());jdbcTemplate.update("insert into t_user (k1,k2) values (?,?)","1","2");connection.exec();return true;}});return "success";}public static void main(String[] args) {SpringApplication.run(TestRedisToMysqlApplication.class, args);}}

MySQL->Redis

这种方式,只需要控制jdbc事务即可:

package com.test.spring;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@SpringBootApplication
@RestController
@EnableTransactionManagement
public class TestMysqlToRedisApplication {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate JdbcTemplate jdbcTemplate;@RequestMapping("/test")@Transactionalpublic String test1(){jdbcTemplate.update("insert into t_user (k1,k2) values (?,?)","1","2");stringRedisTemplate.opsForValue().set("k1","1");stringRedisTemplate.opsForValue().set("k2","2");return "success";}public static void main(String[] args) {SpringApplication.run(TestMysqlToRedisApplication.class, args);}}

二、数据监听模式(异步)

异步模式是通过对Redis的监听或者对MySQL的监听来实现,这种方式具有一定延迟,但是对原有代码无侵入性,可以单独开发程序来独立执行,并且无需关心各自的事务操作。在不需要绝对实时性的情况下,是不错的选择。

Redis->MySQL

这种模式需要在Redis的配置文件redis.conf中修改:

notify-keyspace-events "KEA"
package com.test.spring;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.jdbc.core.JdbcTemplate;import java.nio.charset.StandardCharsets;
import java.util.Objects;@SpringBootApplication
public class TestRedisApplication {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate JdbcTemplate jdbcTemplate;@Beanpublic MessageListener redisMessageListener() {return (Message message, byte[] pattern)->{String key = new String(message.getBody(), StandardCharsets.UTF_8);String value=stringRedisTemplate.opsForValue().get(key);System.out.println("key:" + key+"  发生变化。变化的值:"+value);//下面进行数据库操作,具体的逻辑需要根据你的设计来编写jdbcTemplate.update("insert into t_user ("+key+") values (?)",key,value);};}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer() {final RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(Objects.requireNonNull(stringRedisTemplate.getConnectionFactory()));return container;}@EventListenervoid listener(ApplicationReadyEvent event) {Topic topic = new PatternTopic("__keyevent@*");// 监听 整个redis数据库 的所有事件;RedisMessageListenerContainer redisMessageListenerContainer = event.getApplicationContext().getBean(RedisMessageListenerContainer.class);MessageListener redisMessageListener = event.getApplicationContext().getBean(MessageListener.class);redisMessageListenerContainer.addMessageListener(redisMessageListener, topic);}public static void main(String[] args) {SpringApplication.run(TestRedisApplication.class, args);}}

MySQL -> Redis

监听MySQL最方便的方式是监听MySQL的二进制文件,这种方式对原有数据无侵入。关于二进制文件的监听方案有很多,比如:Canal ,但是Canal再和Java集成上稍显复杂,这里给大家介绍另外一款工具:Debezium,在集成上很方便,具体操作如下:
加入maven依赖:

<dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>1.6.0.Final</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>1.6.0.Final</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>1.6.0.Final</version>
</dependency>

编写DebeziumServerBootstrap用作启动Debezium

package com.test.spring;import io.debezium.engine.DebeziumEngine;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;@Data
@Slf4j
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {private final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {public void uncaughtException(Thread t, Throwable e) {log.error("解析事件有一个错误 ", e);}};private Thread thread = null;private boolean running = false;private DebeziumEngine<?> debeziumEngine;@Overridepublic void start() {thread=new Thread(debeziumEngine);thread.setName("debezium-server-thread");thread.setUncaughtExceptionHandler(handler);thread.start();running = true;}@SneakyThrows@Overridepublic void stop() {debeziumEngine.close();this.running=false;thread.join();log.info("DebeziumServerBootstrap stop ");}@Overridepublic boolean isRunning() {return running;}@Overridepublic void afterPropertiesSet() throws Exception {Assert.notNull(debeziumEngine, "debeziumEngine must not be null");}
}

编写DebeziumConfiguration配置

package com.test.spring;import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Field;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.apache.commons.lang3.tuple.Pair;import java.util.List;
import java.util.Map;import static io.debezium.data.Envelope.FieldName.AFTER;
import static io.debezium.data.Envelope.FieldName.BEFORE;
import static io.debezium.data.Envelope.FieldName.OPERATION;
import static java.util.stream.Collectors.toMap;@Slf4j
public class DebeziumConfiguration {private static final String serverName="debecontrol";/*** Debezium 配置.** @return configuration*/@Beanpublic io.debezium.config.Configuration debeziumConfig(Environment environment) {String username=environment.getProperty("spring.datasource.username");String password=environment.getProperty("spring.datasource.password");String dir=environment.getProperty("canal.conf.dir");String defaultDatabaseName=environment.getProperty("canal.defaultDatabaseName");String slaveId=environment.getProperty("canal.slaveId");String url=environment.getProperty("canal.address");String[] urls=url.split("[:]");return io.debezium.config.Configuration.create()
//            连接器的Java类名称.with("connector.class", MySqlConnector.class.getName())
//            偏移量持久化,用来容错 默认值.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
//                捕获偏移量的周期.with("offset.flush.interval.ms", "6000")
//               连接器的唯一名称.with("name", "mysql-connector")
//                数据库的hostname.with("database.hostname", urls[0])
//                端口.with("database.port", urls[1])
//                用户名.with("database.user", username)
//                密码.with("database.password", password)
//                 包含的数据库列表.with("database.include.list", defaultDatabaseName)
//                是否包含数据库表结构层面的变更,建议使用默认值true.with("include.schema.changes", "false")
//                mysql.cnf 配置的 server-id.with("database.server.id", slaveId)
//                	MySQL 服务器或集群的逻辑名称.with("database.server.name", serverName)
//                历史变更记录.with("database.history", "io.debezium.relational.history.FileDatabaseHistory").build();}/*** Debezium server bootstrap debezium server bootstrap.** @param configuration the configuration* @return the debezium server bootstrap*/@Beanpublic DebeziumServerBootstrap debeziumServerBootstrap(io.debezium.config.Configuration configuration) {DebeziumServerBootstrap debeziumServerBootstrap = new DebeziumServerBootstrap();DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(configuration.asProperties()).notifying(this::handlePayload).build();debeziumServerBootstrap.setDebeziumEngine(debeziumEngine);return debeziumServerBootstrap;}private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {recordChangeEvents.forEach(r -> {SourceRecord sourceRecord = r.record();Struct sourceRecordChangeValue = (Struct) sourceRecord.value();if(sourceRecordChangeValue==null) return;this.handlePayload1(sourceRecordChangeValue);});}private void handlePayload1(Struct sourceRecordChangeValue){try{// 判断操作的类型 过滤掉读 只处理增删改   这个其实可以在配置中设置Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));if(operation==Envelope.Operation.READ) return;//customer_mysql_db_server.control.t_dic.Envelope//debecontrol.control.t_dic.EnvelopeString name = sourceRecordChangeValue.schema().name();String[] names=name.split("[.]");String talbe=names[2];// 获取增删改对应的结构体数据Struct before_struct = (Struct) sourceRecordChangeValue.get(BEFORE);// 将变更的行封装为MapMap<String, Object> before_payload =null;if(before_struct!=null){before_payload = before_struct.schema().fields().stream().map(Field::name).filter(fieldName -> before_struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, before_struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));}// 获取增删改对应的结构体数据Struct after_struct = (Struct) sourceRecordChangeValue.get(AFTER);Map<String, Object> after_payload =null;if(after_struct!=null){// 将变更的行封装为Mapafter_payload = after_struct.schema().fields().stream().map(Field::name).filter(fieldName -> after_struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, after_struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));}//在这里进行Redis操作if(operation==Envelope.Operation.CREATE){//数据库插入}else if(operation==Envelope.Operation.UPDATE){//数据库更新}else if(operation==Envelope.Operation.DELETE){//数据库删除}}catch (Exception e){log.warn("解析数据错误:"+e.getMessage());}}}

入口类

package com.test.spring;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
@SpringBootApplication
@Import(DebeziumConfiguration.class)
public class TestMysqlApplication {public static void main(String[] args) {SpringApplication.run(TestMysqlApplication.class, args);}}

这里我们需要开启MySQL的二进制日志,需要修改my.cnf文件,增加如下配置:

log-bin=mysql-bin
binlog_format=row
server-id=1
log_bin_trust_function_creators=1

总结

关于Redis与MySQL数据一致性,我觉得还需要考虑各自的数据结构如何设计,因为这两种存储方式完全不一样。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/743275.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙API9+axios封装一个通用工具类

使用方式&#xff1a; 打开Harmony第三方工具仓&#xff0c;找到axios&#xff0c;如图&#xff1a; 第三方工具仓网址&#xff1a;https://ohpm.openharmony.cn/#/cn/home 在你的项目执行命令&#xff1a;ohpm install ohos/axios 前提是你已经装好了ohpm &#xff0c;如果没…

【MySQL 系列】MySQL 函数篇

MySQL 提供了广泛的函数&#xff0c;用于处理各种数据类型&#xff0c;包括数值、字符串、日期和时间等。这些函数可以帮助你进行复杂的数据操作&#xff0c;改善数据查询的效率和灵活性。 文章目录 1、MySQL 函数介绍1.1、MySQL 函数简介2.1、MySQL 函数分类 2、MySQL 数学函数…

最适合Oracle数据库的Linux操作系统?

先声明&#xff1a;以下观点仅供参考。我只引用网上的一些观点&#xff0c;包括官网等。 Oracle数据库认证的Linux操作系统 最近老被问到Oracle Linux免费吗&#xff1f;因为用户需要安装Oracle数据库&#xff0c;面临选择操作系统的问题。 首先&#xff0c;Oracle数据库 19…

一、图的基本概念

文章目录 1、无向图和有向图2、图的表示2.1 图解表示2.2 图的邻接矩阵表示2.2 图的邻接表表示 3、子图4、度5、正则图6、同构7、路&#xff0c;圈和连通图7.1 连通图的判定条件7.2 圈的判定条件 8、补图和双图8.1 补图8.2 双图 9、欧拉图10、哈密顿图10.1 哈密顿图判定的充分条…

考研C语言复习进阶(2)

目录 1. 字符指针 2. 指针数组 3. 数组指针 3.1 数组指针的定义 3.2 &数组名VS数组名 4. 函数指针 5. 函数指针数组 6. 指向函数指针数组的指针 7. 回调函数 8.三步辗转法 9. 指针和数组笔试题解析 10. 指针笔试题 指针的主题&#xff0c;我们在初级阶段的《指…

工程师日常:六大茶类--乌龙茶

工程师日常&#xff1a;六大茶类–乌龙茶 乌龙茶的产区主要分布在福建、广东和台湾。 福建是乌龙茶的发源地和最大产区。所产乌龙茶按地域分为闽北乌龙、闽南乌龙。那么主要的代表性的产品分别为武夷岩茶和安溪铁观音。 广东作为乌龙茶另一重要产区&#xff0c;那么主要地域…

【小沐学C#】C#文件读写方式汇总

文章目录 1、简介2、相关类介绍3、代码示例3.1 FileStream&#xff08;流文件&#xff09;3.2 StreamReader / StreamWriter &#xff08;文本文件&#xff09;3.2.1 StreamReader3.2.2 StreamWriter 3.3 BinaryReader / BinaryWriter &#xff08;二进制文件&#xff09;3.3.1…

地理数据 vs. 3D数据

在表示我们周围的物理世界时&#xff0c;地理空间数据和 3D 建筑数据是两个最常见的选择。 他们在各个行业和项目中发挥着至关重要的作用。 从构建数字孪生到可视化城市景观和创建沉浸式应用程序。 尽管地理空间和 3D 建筑数据有相似之处&#xff0c;但它们不可互换。 虽然地…

国投用什么档案管理系统好

国投适合使用综合档案管理系统。这是因为国投通常规模较大&#xff0c;涉及的业务范围也比较广泛&#xff0c;包括行政管理、财务管理、人力资源管理等。 玖拓智能综合档案管理系统能够整合这些不同部门的档案管理需求&#xff0c;提供统一的档案管理平台&#xff0c;方便国投内…

蓝桥杯 填空 卡片

蓝桥杯 填空题 卡片 解题思路&#xff1a; 我们只需要消耗完卡片的个数即可。 代码示例&#xff1a; #include<bits/stdc.h> using namespace std; int a[10]; bool isEnd(){for(int i0;i<10;i){if(a[i]-1)return false;}return true; } bool getN(int x){while(x){i…

react简单实现获取dom元素

家人们 今天从老杨那里又学到新东西了&#xff01; 赶紧来看看吧 第一个方法&#xff1a; // 使用ref声明一个变量const tel useRef(null)const pass useRef(null) 再给input 内标签属性里面绑定 ref{变量名} <inputclassName{style.inputs}type"text"placeh…

SQLiteC/C++接口详细介绍之sqlite3类(五)

快速跳转文章列表&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;SQLiteC/C接口详细介绍之sqlite3类&#xff08;四&#xff09; 下一篇&#xff1a;SQLiteC/C接口详细介绍之sqlite3类&#xff08;六&#xff09;&#xff08;未发表&#xff09; 14.sqlite3_busy_handle…

4、ETAS INCA标定系统基本操作

目录 一、标定开始操作 二、Working Page & Reference Page 三、Memory Pages存储页 一、标定开始操作 使用实验环境进行上传/下载标定数据,并观察和编辑标定数据 1. 上传/下载标定数据和代码 2. 添加测量变量 3. 添加标定变量 4. 配置测量变量 5. 建立存储触发

2024年云仓酒庄:店中店增项新模式,开启葡萄酒文化新篇章

2024云仓酒庄&#xff1a;店中店增项新模式&#xff0c;开启葡萄酒文化新篇章 在葡萄酒行业蓬勃发展的今天&#xff0c;云仓酒庄以其独特的经营模式和创新思维&#xff0c;在市场中脱颖而出。2024年&#xff0c;云仓酒庄继续深化其战略布局&#xff0c;不仅在多地开设酒庄实体…

docker实战(1)

docker搭建mysql 一,搭建docker环境 二,Docker安装mysql 1,查看mysql版本:docker search mysql 2,安装指定mysql版本或安装最新版本 docker pull mysql:5.8 或 docker pull mysql:latest 3,查看本地镜像中是否已安装好mysql镜像:docker images 4,运行容器启动my…

运维专题.Docker+Nginx服务器的SSL证书安装

运维专题 DockerNginx服务器的SSL证书安装 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https://blog.csdn.net/q…

【C++】—— 代理模式

目录 &#xff08;一&#xff09;什么是代理模式 &#xff08;二&#xff09;为什么使用代理模式 &#xff08;三&#xff09;代理模式实现步奏 &#xff08;四&#xff09;代码示例 &#xff08;五&#xff09;代理模式优缺点 &#xff08;一&#xff09;什么是代理模式 …

备考2025年AMC8竞赛:吃透2000-2024年600道真题(免费赠送真题)

我们继续来随机看五道AMC8的真题和解析&#xff0c;根据实践经验&#xff0c;对于想了解或者加AMC8美国数学竞赛的孩子来说&#xff0c;吃透AMC8历年真题是备考最科学、最有效的方法之一。 即使不参加AMC8竞赛&#xff0c;吃透了历年真题600道和背后的知识体系&#xff0c;那么…

用 C 语言模拟 Rust 的 Box 类型

在 Rust 中&#xff0c;Box<T> 是一个堆上分配的指针类型&#xff0c;用于在堆上存储值而不是栈上。在 C 语言中&#xff0c;没有直接对应于 Rust 的 Box<T> 的类型&#xff0c;但我们可以使用指针和动态内存分配来模拟这种行为。 下面是一个简单的示例&#xff0…

76. 最小覆盖子串-力扣hot100(C++)

76. 最小覆盖子串s 初始化和特判 //本题做题思想 //从头开始&#xff0c;首先找到一个包含所有字母的字串&#xff0c;将i移动到包含字串字母的位置&#xff0c;然后更新长度和字符串ans后&#xff0c; //i的位置加1&#xff0c;j的位置也加1&#xff0c;从新开始上面的流程&…