MySQL 和 Redis 如何保证数据一致性,通过MySQL的binlog实现

1、简介

        MySQL 和 Redis 如何保证数据一致性,目前大多讨论的是先更新Redis后更新MySQL,还是先更新MySQL 后更新Redis,这两种方式在实际的应用场景中都不能确保数据的完全一致性,在某些情况下会出现问题,本文介绍使用 Canal 工具,通过将自己伪装成MySQL的从节点,读取binlog 日志实现。

        Canal是用 Java开发的基于数据库增量日志解析,提供增量数据订阅 &消费的中间件。
目前 。 Canal主要支持了 MySQL的 Binlog解析,解析完成后才利用 Canal Client来处理获得
的相关数据。与此类似的组件还有 flink CDC 等。

2、canal 实现 MySQL 和 Redis 数据一致原理

        Canal 通过伪装成MySQL 的从节点,获取binlog 得到MySQL 修改内容,从而更新 Redis 缓存,保证MySQL 和 Redis 数据一致性。

3、canal 下载安装配置

注:实现主从复制,要保证 Master 节点的 binlog 功能要开启。

修改主节点 MySQL 的配置文件(MySQL8.0默认开启,且binlog_format默认为row):

# vim /etc/my.cnf
[mysqld]
server-id=1
log-bin=test   # 自定义二进制日志文件名
binlog_format=row
3.1、canal 下载 

        下载地址:Releases · alibaba/canal · GitHub,本文以 canal.deployer-1.1.7.tar.gz 为例搭建环境。

3.2、安装

          Canal 安装很简单,只要解压到指定的文件夹下就行。

# 解压(需要提前创建好canal文件夹)
tar -zxvf canal.deployer-1.1.7.tar.gz -C /opt/canal
3.3、配置文件
3.3.1、 Canal 的配置文件
######### 		common argument		#############
# tcp bind ip
canal.ip =
# 默认端口为11111
canal.port = 11111
canal.metrics.pull.port = 11112
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
# 默认为 tcp,支持以上几种mq的数据传输
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false
canal.instance.parser.parallelBufferSize = 256######### 		destinations(canal 获取数据发送的目标地址)		#############
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = falsecanal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml######### 		     Kafka 		     #############
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.kerberos.enable = false
kafka.kerberos.krb5.file = ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file = ../conf/kerberos/jaas.conf# sasl demo
# kafka.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \\n username=\"alice\" \\npassword="alice-secret\";
# kafka.sasl.mechanism = SCRAM-SHA-512
# kafka.security.protocol = SASL_PLAINTEXT######### 		    RocketMQ	     #############
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = ######### 		    RabbitMQ	     #############
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

        说明:这个文件是 canal的基本通用配置, canal端口号默认就是 11111 修改 canal的输出 model,默认 tcp。

3.3.2、实例的配置文件

        多实例配置,如果创建多个实例 通过前面 Canal 架构,我们可以知道,一个 Canal 服务中可以有多个 instance conf/下的每一个 example即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL数据的话,直接拷贝出多个 example ,并对其重新命名,命名和配置文件中指定的名称一致,然后修改canal.properties中的 canal.destinations = 实例 1,实例 2,实例 3。

        到每个实例下面修改 instance.properties 文件。

#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=12  # 集群中唯一就行# enable gtid use true/false
canal.instance.gtidon=false# position info
canal.instance.master.address=127.0.0.1:3306  # 主节点的ip和端口
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root  # Master 数据库用户名
canal.instance.dbPassword=123456 # Master 数据库密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################
3.4、canal 启动

运行 bin/start.sh

注:如果jdk版本过高,本文用的是 jdk17 版本,canal 目前基于 jdk 11,因此会出现如下错误:

产生原因:

1)、-XX:+AggressiveOpts,这个参数在11的版本已经被废弃了,并在之后的版本会删除;

2)、-XX:+UseBiasedLocking,这个参数在15的版本被废弃了,这个偏向锁已经被废弃了。

解决方法:

修改启动文件(vim  bin/start.sh),将下图中标注的两个参数删除,先 bin/stop.sh,然后再 bin/start.sh 即可:

4、canal 在tcp 模式下的测试
4.1、引入依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.6</version>
</dependency>
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.6</version>
</dependency>
4.2、编写客户端代码
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;public class CanalClient {public static void main(String[] args) throws InvalidProtocolBufferException {//1.获取canal 连接对象CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.30.89", 11111), "example", "", "");while (true) {//2.获取连接canalConnector.connect();// 指定要监控的数据库canalConnector.subscribe("test.*");// 获取 MessageMessage message = canalConnector.get(100);List<CanalEntry.Entry> entries = message.getEntries();if (entries.size() <= 0) {System.out.println(" 没有数据,休息一会");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} else {for (CanalEntry.Entry entry : entries) {// 获取表名String tableName = entry.getHeader().getTableName();CanalEntry.EntryType entryType = entry.getEntryType();// 判断 entryType 是否为 ROWDATAif (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// 序列化数据ByteString storeValue = entry.getStoreValue();//反序列化CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);//获取事件类型CanalEntry.EventType eventType = rowChange.getEventType();// 获取具体的数据List<CanalEntry.RowData> rowDatasList =rowChange.getRowDatasList();// 遍历并打印数据for (CanalEntry.RowData rowData : rowDatasList) {List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();JSONObject beforeData = new JSONObject();for (CanalEntry.Column column : beforeColumnsList) {beforeData.put(column.getName(), column.getValue());}JSONObject afterData = new JSONObject();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {afterData.put(column.getName(), column.getValue());}System.out.println("TableName:" + tableName + ",EventType:" + eventType + ",After:" + beforeData + ",After:" + afterData);}}}}}}
}
4.3、测试结果

插入一条数据:

 监听结果:

 5、总结

         本文介绍一种解决数据一致性的方案,通过MySQL 的主从复制实现,数据库和缓存一致性。canal 可以实现将结果发送到 mq操作,还支持高可用集群部署,这部分内容将在公众号中分享,关注下面公众号,学到更多知识。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/619706.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

遥感影像-语义分割数据集:高分卫星-云数据集详细介绍及训练样本处理流程

原始数据集详情 简介&#xff1a;该云数据集包括RGB三通道的高分辨率图像&#xff0c;包含高分一、高分二及宽幅数据集。 KeyValue卫星类型高分系列覆盖区域未知场景未知分辨率1m、2m、8m数量12000单张尺寸1024*1024原始影像位深8位标签图片位深8位原始影像通道数三通道标签图…

vivado 使用源文件

使用源文件 概述 源文件包括从AMD IP添加的设计源、知识产权&#xff08;IP&#xff09;源目录、RTL设计源、从系统添加的数字信号处理&#xff08;DSP&#xff09;源生成器工具和IP子系统&#xff0c;也称为块设计&#xff0c;由IP集成商创建AMD Vivado的功能™ 设计套件。源…

C++11 14 17内存管理

智能指针 unique_ptr 初始化 访问和移动赋值 重置和移动内存资源 自定义删除器 shared_ptr 原理 自定义删除器 分配器allocator和new重载 new表达式原理 operator new delete placement new new (buf) 是一种 "placement new" 的使用方式&#xff0c;它允许在已…

Qt/QML编程学习之心得:Grid、GridLayout、GridView、Repeater(33)

GRID网格用处非常大,不仅在excel中,在GUI中,也是非常重要的一种控件。 Grid 网格是一种以网格形式定位其子项的类型。网格创建一个足够大的单元格网格,以容纳其所有子项,并将这些项从左到右、从上到下放置在单元格中。每个项目都位于其单元格的左上角,位置为(0,0)。…

uniapp 开发小程序的时候使用自定义 tabbar 时出现切换页面闪烁的情况

问题&#xff1a;在使用自定义组件的时候可以看到页面切换明显的闪烁, 这种体验是很不好的, 当然最好的方式就是使用原生导航栏, 不要搞花里胡哨的东西。 来看下体验不好的效果 优化调整 先说思路&#xff0c;就是仍然设置原生 tabbar, 在应用启动的时候主动隐藏原生 tabba…

VS QT 创建新的QT类后,编译报错无法解析的外部符号 “public: virtual struct QMetaObject const *

问题描述&#xff1a; 新建QT的 Widgets 类&#xff0c;创建新的窗口 在编译的时候出现以下报错信息&#xff1a; 1>vfhclassifydialog.obj : error LNK2001: 无法解析的外部符号 "public: virtual struct QMetaObject const * __cdecl VfhClassifyDialog::metaObject…

vivado 指定顶部模块和重新排序源

指定顶部模块和重新排序源 文件夹默认情况下&#xff0c;Vivado Design Suite会自动确定设计的顶层添加到的源文件的层次结构和细化、合成和模拟的顺序项目这可以通过右键单击中的“层次更新”设置进行控制“源”窗口的菜单。请参阅中的“源”窗口中的“层次更新”命令Vivado …

Ceph入门到精通-通过 CloudBerry Explorer 管理对象bucket

简介 CloudBerry Explorer 是一款可用于管理对象存储&#xff08;Cloud Object Storage&#xff0c;COS&#xff09;的客户端工具。通过 CloudBerry Explorer 可实现将 COS 挂载在 Windows 等操作系统上&#xff0c;方便用户访问、移动和管理 COS 文件。 支持系统 支持 Wind…

uniapp微信小程序投票系统实战 (SpringBoot2+vue3.2+element plus ) -投票帖子管理实现

锋哥原创的uniapp微信小程序投票系统实战&#xff1a; uniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )_哔哩哔哩_bilibiliuniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )共计21条视频…

Java项目:05 停车管理系统

作者主页&#xff1a;舒克日记 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 课题意义&#xff1a; 随着时代和科技的进步&#xff0c;人们的生活水平越来越高&#xff0c;私家车的数量不断上涨&#xff0c;随之产生了一些问题&…

POI-tl 知识整理:整理1 -> 利用模板向word中写入数据

1 文本传值 Testpublic void testText() throws Exception {XWPFTemplate template XWPFTemplate.compile("D:\\Idea-projects\\POI_word\\templates.docx");Map<String, Object> map new HashMap<>();map.put("title", "Hi, girl"…

PyCharm连接服务器 - 1

文章目录 利用PyCharm实现远程开发使用认证代理连接服务器 利用PyCharm实现远程开发 【注】该连接服务器的方法适用于代码在服务器&#xff0c;我们是通过 GateWay 打开远程服务器的代码进行操作。 该功能只有在PyCharm专业版下才可以使用&#xff0c;并且必须是官方的正版许…

谷歌最新医学领域LLM大模型:AMIE

2024年1月11日Google 研究院发布最新医疗大模型AMIE&#xff1a;用于诊断医学推理和对话的研究人工智能系统。 文章链接&#xff1a;Articulate Medical Intelligence Explorer (AMIE) giuthub&#xff1a;目前代码未开源 关于大模型之前有过一篇总结&#xff1a;大语言模型(L…

Jmeter接口测试必会技能:jmeter_HTTP Cookie管理器

HTTP Cookie管理器 HTTP Cookie管理器可以像浏览器一样自动存储和发送cookie&#xff0c;以这种自 动收集的方式收集到的cookie不会在cookie manager中进行展示&#xff0c;但是运行后&#xff0c; 可以通过 查看结果树&#xff08;监听器&#xff09;可以查看到cookie信息 除…

详解矩阵变换:伸缩,旋转,反射和投影

目录 一. 矩阵子空间 二. 矩阵变换 2.1 伸缩矩阵 2.2 旋转矩阵 2.3 反射矩阵 2.4 投影矩阵 2.5 小结 三. 矩阵变换与函数 3.1 原点 3.2 常数倍性质 3.3 加法性质 3.4 小结 四. 空间变换 五. 小结 一. 矩阵子空间 矩阵与向量相乘Ax可以看成子空间的变换。 零空间…

Linux 系统编程:文件系统的底层逻辑 - inode

《Linux 程序设计》的第三章讲文件操作。在提到 目录 时有这么一段文字&#xff1a; 文件&#xff0c;除了本身包含的 内容 以外&#xff0c;它还会有一个 名字 和一些 属性&#xff0c;即“管理信息”&#xff0c;包括文件的创建 / 修改日期和它的访问权限。这些属性被保存在文…

ActiveMQ反序列化RCE漏洞复现(CVE-2023-46604)

漏洞名称 Apache ActiveMQ OpenWire 协议反序列化命令执行漏洞 漏洞描述 Apache ActiveMQ 是美国阿帕奇&#xff08;Apache&#xff09;软件基金会所研发的一套开源的消息中间件&#xff0c;它支持Java消息服务、集群、Spring Framework等。 OpenWire协议在ActiveMQ中被用于…

在虚拟机中安装OpenEuler操作系统

目录 OpenEuler操作系统安装步骤&#xff08;详细&#xff09; 一、首先要做好安装前的准备工作&#xff1a; 二、进行虚拟机的创建&#xff1a; 三、OpenEuler 23.09操作系统的安装部署&#xff1a; OpenEuler操作系统安装步骤&#xff08;详细&#xff09; 一、首先要做好…

C#基于ScottPlot进行可视化

前言 上一篇文章跟大家分享了用NumSharp实现简单的线性回归&#xff0c;但是没有进行可视化&#xff0c;可能对拟合的过程没有直观的感受&#xff0c;因此今天跟大家介绍一下使用C#基于Scottplot进行可视化&#xff0c;当然Python的代码&#xff0c;我也会同步进行可视化。 P…

从AAAI 2024看人工智能研究的最新热点

图 1 由AAAI 2024论文列表生成的词云 AAAI会议作为全球AI领域的顶级学术盛会&#xff0c;被中国计算机学会&#xff08;CCF&#xff09;评为A类会议。AAAI2024的会议论文投稿量达到了历史新高&#xff0c;主赛道收到了12100篇投稿论文&#xff0c;9862篇论文经过严格评审后共有…