实时同步:使用 Canal 和 Kafka 解决 MySQL 与缓存的数据一致性问题

目录

1. 准备工作

2. 将需要缓存的数据存储 Redis

3. 监听 canal 存储在 Kafka Topic 中数据


1. 准备工作

1. 开启并配置MySQL的 BinLog(MySQL 8.0 默认开启)

修改配置:C:\ProgramData\MySQL\MySQL Server 8.0\my.ini

log-bin="HELONG-bin"
binlog_format=ROW     # 只能配置行模式, 因为 Cannal 不具备将SQL转化成数据的能力
binlog-do-db=aicloud    # 监控 AI Cloud 项目

如果要同步多个项目:

binlog-do-db=aicloud
binlog-do-db=aicloud2
binlog-do-db=aicloud3

2. 重启MySQL服务

3. 赋值数据同步权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

4. 安装并配置 Canal

下载地址:https://github.com/alibaba/canal/releases

① 修改canal.properties

canal.serverMode=kafka
canal.mq.servers=127.0.0.1:9092

canal 监控 binlog 日志,binlog 日志的传输默认使用 MySQL 的复制协议(基于 TCP/IP),

可以使用写代码的方式直接从 MySQL 服务器读取数据,此处使用本地 kafka 进行存储。

② 修改instance.properties

canal.instance.mysql.slaveId=100   # 大于 1 即可
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=ai-cloud-canal-to-kafka

slaveId 表示从节点 id,canal 的执行原理就是伪装成一个从库去主库同步数据

(主节点的 slaveId = 1)

address 配置连接本地的 MySQL

topic 配置数据发送到 Kafka 的某个主题下

5. 拷贝 Jar 包到 lib

将 canal 下 plugin 下的所有 jar 包拷贝到 lib 目录下。

6. 删除 bin 目录下 startup.bat 里的参数

如果启动时报错:

Unrecognized VM option 'PermSize=128m'

Error: Could not create the Java Virtual Machine.

Error: A fatal exception has occurred. Program will exit.

删除 -XX:PermSize=128m 参数即可。

7. 启动 canal

打开 cmd ,cd 到 bin 目录下,输入 startup.bat 回车

2. 将需要缓存的数据存储 Redis

此时我将这个查询列表接口的数据,存储在 Redis 中:

/*** 获取历史聊天记录(对话/绘图)** @param type* @return {@link ResponseEntity }*/
@RequestMapping("/list")
public ResponseEntity getHistoryList(Integer type, Integer model) {String listCacheKey = RedisUtil.getListCacheKey(SecurityUtil.getCurrentUser().getUid(), model, type);Object list = redisTemplate.opsForValue().get(listCacheKey);if (ObjectUtil.isNull(list)) {LambdaQueryWrapper<Answer> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.eq(Answer::getUid, SecurityUtil.getCurrentUser().getUid());queryWrapper.eq(Answer::getType, type);queryWrapper.eq(Answer::getModel, model);queryWrapper.orderByDesc(Answer::getAid);List<Answer> answerList = answerService.list(queryWrapper);List<Long> userIds = answerList.stream().map(Answer::getUid).collect(Collectors.toList());Map<Long, User> userIdMap = userService.selectByIds(userIds).stream().collect(Collectors.toMap(User::getUid, Function.identity()));List<AnswerVo> answerVoList = answerList.stream().map(answer -> AnswerVoUtil.getListAnswerVo(answer, userIdMap)).collect(Collectors.toList());// 缓存 1 天redisTemplate.opsForValue().set(listCacheKey, answerVoList, 1, TimeUnit.DAYS);return ResponseEntity.success(answerVoList);} else {return ResponseEntity.success(list);}
}
/*** 查询列表存储 Redis 缓存** @param uid* @param model* @param type* @return {@link String }*/
public static String getListCacheKey(Long uid, Integer model, Integer type) {return "LIST_CACHE_KEY_" + uid + "_" + model + "_" + type;
}

3. 监听 Kafka Topic 中数据并删除 Redis 缓存

首先对数据库中需要缓存的数据进行一些修改操作:

此时,使用 kafka ui(下载地址划到最底下),刷新 kafka 对应 topic 下的 message,就可以看到当前所作出的修改:

执行修改操作:将 “如何学习Spring???”修改成 “如何学习Spring??”

执行删除操作:

由此可见,对数据库的每一个修改操作,都是对应固定格式的一个数据,所以可以监听对应的  topic 并针对 data 中的数据进行一个提取,得到一个  cacheKey,然后删除对应的缓存,使得下一次的查询去访问数据库,并同步缓存。

【代码示例】

/*** canal 监控 binlog 日志,将修改的数据存储 kafka topic 中* 监听 kafka topic 中的数据** @param data* @param ack* @throws JsonProcessingException*/
@KafkaListener(topics = {KafkaConstant.CANAL_TOPIC})
public void canalListen(String data, Acknowledgment ack) throws JsonProcessingException {HashMap<String, Object> map = objectMapper.readValue(data, HashMap.class);if (map.isEmpty()) {ack.acknowledge();return;}// 匹配上对应的数据库和数据表if (KafkaConstant.TARGET_DATABASE.equals(map.get(KafkaConstant.DATABASE_KEY).toString()) &&KafkaConstant.TARGET_TABLE.equals(map.get(KafkaConstant.TABLE_KEY).toString())) {// 更新缓存 List<Map<String, Object>> list = (List<Map<String, Object>>) map.get(KafkaConstant.DATA_KEY);if (!CollectionUtils.isEmpty(list)) {for (Map<String, Object> answerMap : list) {String answerListCacheKey = RedisUtil.getListCacheKey(Long.valueOf(answerMap.get("uid").toString()),Integer.parseInt(answerMap.get("model").toString()),Integer.parseInt(answerMap.get("type").toString()));// 删除缓存,让下一次查询走数据库,并同步缓存redisTemplate.delete(answerListCacheKey);}}}//  手动确认应答ack.acknowledge();
}
/*** canal 同步数据到 kafka*/
public static final String CANAL_TOPIC = "ai-cloud-canal-to-kafka";/*** 数据库,缓存数据一致性的*/public static final String DATABASE_KEY = "database";public static final String TABLE_KEY = "table";public static final String DATA_KEY = "data";public static final String TARGET_DATABASE = "aicloud";public static final String TARGET_TABLE = "answer";

【补充】

kafka ui 下载地址:​​​​​​https://github.com/provectus/kafka-ui/tags

修改配置

kafka:clusters:- name: kafka3_clusterbootstrapServers: 127.0.0.1:9092

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/50406.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Git】merge合并分支

两个分支未修改同一个文件的同一处位置: Git自动合并 两个分支修改了同一个文件的同一处位置:产生冲突 例&#xff1a; 在master分支修改了main同时&#xff0c;feat分支也修改了相同的文件 合并的时候就会产生冲突 解决方法: Step1- 手工修改冲突文件&#xff0c;合并冲突内容…

立仪光谱共焦传感器应用测量之:汽车连接器高度差测量

01 检测要求&#xff0c;要求测量汽车连接器的高度差 02 检测方式 根据观察&#xff0c;我们采用立仪科技光谱共焦H4UC控制器搭配D65A52系列镜头&#xff0c;角度最大&#xff0c;外径最大&#xff0c;量程大&#xff0c;可以有效应用于测量弧面&#xff0c;大角度面等零件。 0…

会员信息管理系统-计算机毕业设计源码38258

目 录 摘要 1 绪论 1.1 研究背景 1.2 研究意义 1.3开发技术 1.3.1 Spring Boot框架 1.3.2 Java语言 1.3.3 MySQL数据库 1.4论文结构与章节安排 2系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1 登录流程 2.2.2数据删除流程 2.3 系统功能分析 2.4 系统用例分析…

视频去水印免费电脑版 pdf压缩在线免费网页版 pdf压缩在线免费 简单工具软件详细方法步骤分享

消除视频中的恼人水印&#xff0c;是许多视频编辑爱好者的常见需求。在这篇文章中&#xff0c;我们将探讨几种视频去水印的技巧&#xff0c;在数字化时代&#xff0c;视频和图片的传播越来越方便&#xff0c;但随之而来的水印问题也让人头疼。本文将为您详细介绍视频剪辑去水印…

Web开发:ASP.NET CORE中前端使用Ajax定时获取后端数据

一、低难度&#xff08;刷新a标签&#xff09; 1、需求 给a标签每15s刷新一次&#xff0c;显示最新的时间&#xff08;时间必须由后端获取&#xff09; 应该如何操作呢 2、代码 后端 using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc.RazorPages; using Mi…

医疗器械维修行业发展及趋势

医疗器械维修的前景是广阔的。‌ 随着医疗技术的不断发展和进步&#xff0c;‌医疗器械的种类和数量持续增加&#xff0c;‌对专业维修人员的需求也在不断上升。‌无论是医院、‌诊所等医疗机构&#xff0c;‌还是医疗器械生产企业、‌销售企业等&#xff0c;‌都需要专业的维修…

System.identityHashCode(Object obj) 和 obj.hashCode() 的区别

System.identityHashCode(Object obj) 和 obj.hashCode() 都用于获取对象的哈希码&#xff0c;但它们有显著的区别&#xff1a; System.identityHashCode(Object obj): 返回对象的默认哈希码&#xff0c;这个哈希码是基于对象的内存地址生成的&#xff0c;而不受对象的 hashC…

快速入门了解Ajax

博客主页&#xff1a;音符犹如代码系列专栏&#xff1a;JavaWeb关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ Ajax的初识 意义&#xff1a;AJAX&#xff08;Asynchronous JavaScript and…

leetcode-79. 单词搜索

题目描述 给定一个 m x n 二维字符网格 board 和一个字符串单词 word 。如果 word 存在于网格中&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 单词必须按照字母顺序&#xff0c;通过相邻的单元格内的字母构成&#xff0c;其中“相邻”单元格是那些水平相…

Linux安装TrueNAS(网络附加存储)教程 –第1部分

TrueNAS CORE&#xff08;原名FreeNAS&#xff09;是一款流行的存储系统&#xff0c;可帮助您构建自己的高质量存储设置&#xff0c;而无需支付软件费用。您可以将其安装在计算机硬件或虚拟机 (VM) 上&#xff0c;以获得开源存储的好处。 您可以在家中、办公室或数据中心使用T…

个性化音频生成GPT-SoVits部署使用和API调用

一、训练自己的音色模型步骤 1、准备好要训练的数据&#xff0c;放在Data文件夹中&#xff0c;按照文件模板中的结构进行存放数据 2、双击打开go-webui.bat文件&#xff0c;等待页面跳转 3、页面打开后&#xff0c;开始训练自己的模型 &#xff08;1&#xff09;、人声伴奏分…

RV1126 Linux 系统,接外设,时好时坏(一)应该从哪些方面排查问题

在 Linux 系统中接外设时,遇到“时好时坏”的问题,可能是由多种因素引起的。以下是一些排查问题的建议。 1. 硬件方面的排查 1.1 连接检查 物理连接: 确保外设与主板之间的连接良好,检查插头、插座及线缆是否牢固。引脚配置: 确认设备树中引脚的配置是否正确,尤其是引脚…

shopee虾皮 java后端 一面面经 整体感觉不难

面试总结&#xff1a;总体不难&#xff0c;算法题脑抽了只过了一半&#xff0c;面试官点出了问题说时间到了&#xff0c;反问一点点&#xff0c;感觉五五开&#xff0c;许愿一个二面 1.Java中的锁机制&#xff0c;什么是可重入锁 Java中的机制主要包括 synchronized关键字 Loc…

Profinet 转 EtherCAT 主站网关

一、功能概述 1.1 设备简介 本产品是 PN(Profinet)和 ECAT(EtherCAT)网关&#xff0c;通过数据映射方式工作。 本产品在 PN 侧作为 PN IO 从站&#xff0c;接西门子 PLC 的 Profinet 口&#xff1b;在 ECAT 侧 做为 ECAT 主站&#xff0c;接 ECAT 从站&#xff0c;如伺服驱…

CTF-Web习题:[GXYCTF2019]Ping Ping Ping

题目链接&#xff1a;[GXYCTF2019]Ping Ping Ping 解题思路 访问靶机&#xff0c;得到如下页面&#xff0c;类似于URL参数 尝试用HackBar构造url传输过去看看 发现返回了ping命令的执行结果&#xff0c;可以猜测php脚本命令是ping -c 4 $ip&#xff0c;暂时不知道执行的函数…

IMU用于肌骨相关职业病风险评估

肌肉骨骼疾病&#xff08;WMSDs&#xff09;是职场中常见的健康问题&#xff0c;会导致员工疼痛和工作效率降低。为了更好地评估和管理这些风险&#xff0c;科研人员开发了一种基于惯性测量单元&#xff08;IMU&#xff09;的新型系统。 这个创新系统通过监测员工在工作时的身体…

软件测试中的压力测试和性能测试区别

压力测试和性能测试是软件测试中两种重要的测试类型&#xff0c;它们都旨在评估软件在不同条件下的表现&#xff0c;但侧重点和目的有所不同。 压力测试&#xff08;Stress Testing&#xff09;定义&#xff1a; 压力测试是一种测试方法&#xff0c;用于确定软件在极端条件下…

安卓开机启动性能优化之-bootchart相关工具使用及查看

背景&#xff1a; 开机启动相关的详细信息&#xff0c;一般都是可以通过logcat中查看boot_progress相关查看&#xff0c;这种方式查看相对不那么方便&#xff0c;毕竟开机过程中涉及的进程较多&#xff0c;要查看也较多&#xff0c;而且还经常需要查看代码才可以对应起来&…

Linux系统上安装zookeeper

百度网盘 通过网盘分享的文件&#xff1a;zookeeper_linux 链接: https://pan.baidu.com/s/1_hybXZVwTRkotz0VbwbSMw?pwd8888 提取码: 8888 1.将压缩包拖进虚拟机 2.解压压缩包 cd /ruanjian/zookeeper/ tar -zxvf apache-ZooKeeper-3.7.2-bin.tar.gz3. 进入到conf目录 cd …

《python程序语言设计》第6章12题 显示字符,使用下面的函数头,编写一个打印字符的函数

def printChars(ch1, ch2, numberPerLine):a ord(ch1)b ord(ch2)count 0for i in range(a, b 1):count 1print(chr(i), end" ")if count % numberPerLine 0:print()printChars("1", "Z", 10)