SpringCloud 整合 Canal+RabbitMQ+Redis 实现数据监听

1Canal介绍

Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

Canal是如何同步数据库数据的呢?

Canal通过伪装成mysql从服务向主服务拉取数据,所以先来了解一下MySQL的主从复制吧

2MySQL主从复制原理

1、从库(slave)会生成两个线程,I/O线程(IOthread),SQL线程(SQLthread)。

2、当slave的I/O线程连接到master后,会去请求master的二进制日志(binlog), 此时master会通过logdump(将主库的二进制日志文件内容传输给从库的过程) 给从库传输binlog。

3、 然后slave将拿到的binlog日志依次写入Relaylog(中继日志)的最末端,同时将读取到的Master 的bin-log的文件名和位置记录到master- info文件中,作用为了让slave知道它需要从哪个位置和哪 个日志文件开始同步数据,以保证数据的一致性,并且能够及时获取到master的新的更新操作, 开始数据同步过程。slave不仅在启动时读取 master-info 文件,而且会定期更新该文件中的记 录,以确保记录都是最新的。

4、最后SQL线程会读取Relaylog,并解析为具体操作(比如DDL这种),来实现主从库的操作一致, 最终实现数据一致;

大致了解完了MySQL的主从复制,接着我们看Canal就简单啦。

3Canal工作原理

1、Canal Server与MySQL建立连接后,会通过模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议获取数据库的 binlog(二进制日志)文件。

2、Canal Server解析binlog文件,通过网络将解析后的事件传输给消息中间件(Kafka,RabbitMQ等),实现数据的实时同步。

了解完canal的原理后,我们就正式开始RabbitMQ+Canal+Redis实现缓存和数据库数据一致的功能。

4RabbitMQ+Canal+redis工作原理

通过上图很好理解:

  • APP向数据库进行写操作(比如我们更新商品信息啥的)

  • Canal监听到数据库发生变化,便会向rabbitMQ传递数据库发生变化的消息。

  • 消费者就可以从rabbitMQ获取这些消息,然后进行删除缓存操作。

下面通过实战让我们更好地理解是如何实现缓存和数据库数据一致性的。

5实战配置

Canal 配置

修改 conf/canal.properties 配置

# 指定模式
canal.serverMode = rabbitMQ
# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2
canal.destinations = example # rabbitmq 服务端 ip
rabbitmq.host = 你的ip(注意不要加端口号哦)
# rabbitmq 虚拟主机 
rabbitmq.virtual.host = / 
# rabbitmq 交换机  
rabbitmq.exchange = canal.exchange  (这是本例子用的交换机)
# rabbitmq 用户名
rabbitmq.username = 你的用户名
# rabbitmq 密码
rabbitmq.password = 你的密码
rabbitmq.deliveryMode =

修改实例配置文件 conf/example/instance.properties

#配置 slaveId,自定义,不等于 mysql 的 server Id 即可
canal.instance.mysql.slaveId=10 # 数据库地址:配置自己的ip和端口
canal.instance.master.address=你的IP:端口号# 数据库用户名和密码 
canal.instance.dbUsername=用户名
canal.instance.dbPassword=密码# 指定库和表
canal.instance.filter.regex=.*\..*    # 这里的 .* 表示 canal.instance.master.address 下面的所有数据库# mq config
# rabbitmq 的 routing key
canal.mq.topic=canal.routing.key(这是本例子用的key)

然后重启 canal 服务。

RabbitMQ 配置

这样rabbitMQ就配置完啦,下面就是实战代码啦。

6实战代码

CanalMessage: Canal传来的消息

@NoArgsConstructor
@Data
public class CanalMessage<T> {private String type;private String table;private List<T> data;private String database;private Long es;private Integer id;private Boolean isDdl;private List<T> old;private List<String> pkNames;private String sql;private Long ts;
}

RabbitMQ配置类

@Configuration
@Slf4j
public class RabbitConfig {/*** 消息序列化配置*/@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {// SimpleRabbitListenerContainerFactory 是 RabbitMQ 提供的一个实现了 RabbitListenerContainerFactory 接口的简单消息监听器容器工厂。// 它的作用是创建和配置 RabbitMQ 消息监听器容器,用于监听和处理消息。SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();//ConnectionFactory 是 RabbitMQ 提供的一个接口,用于创建 RabbitMQ 的连接factory.setConnectionFactory(connectionFactory);//使用了 Jackson2JsonMessageConverter 将消息转换为 JSON 格式进行序列化和反序列化factory.setMessageConverter(  new Jackson2JsonMessageConverter());return factory;}
}

将消息转换为JSON格式,才能映射到CanalMessage上。

RabbitMQ+Canal监听处理类

@Component
@Slf4j
@RequiredArgsConstructor
public class CanalListener {private final SysMenuService menuService;//@RabbitListener(queues = "canal.queue")public void handleDataChange(@Payload CanalMessage message) {String tableName = message.getTable();log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);if (Arrays.asList("sys_menu", "sys_role", "sys_role_menu").contains(tableName)) {log.info("======== 清理菜单路由缓存 ========");menuService.cleanCache();}}
}

menuService的cleanCache()是把登录时的路由列表缓存清除掉,

具体可去源码查看,在最底下。

这样我们实现缓存和数据库数据一致性的功能就完成啦,接下来测试一下。

7测试

我们直接通过手动修改数据库来完成测试。

图片

我们在菜单表修改菜单管理的内容改成菜单管理1,点击保存

图片

可以看到更新操作已经被监听到啦。接着就完成清理缓存操作咯,然后就可以防止缓存和数据库数据不一致的问题啦。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/579549.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CAN协议FPGA篇

一.引言 CAN&#xff08;Controller Area Network&#xff09;总线&#xff0c;即控制器局域网总线&#xff0c;是一种功能丰富的车用总线标准。该协议最初是由德国博世&#xff08;Bosch&#xff09;公司在1983年制定的&#xff0c;之后在美国密歇根州底特律举行的汽车工程师协…

Socks5代理IP在跨境电商与游戏中的应用

随着互联网的迅猛发展&#xff0c;网络已经成为人们日常生活不可或缺的一部分。在这个数字化时代&#xff0c;跨境电商和网络游戏产业蓬勃发展&#xff0c;但伴随而来的是网络安全的威胁与挑战。本文将介绍Socks5代理IP技术&#xff0c;探讨它在网络安全、跨境电商以及游戏中的…

Hive03_数据类型

数据类型 1 案例实操 &#xff08;1&#xff09;假设某表有如下一行&#xff0c;我们用 JSON 格式来表示其数据结构。在 Hive 下访问的格式为 {"name": "wukong","friends": ["bajie" , "lili"] , //列表 Array, "c…

jupyter notebook打开其他盘的文件

jupyter notebook打开其他盘文件 打开jupyter notebook打开terminal输入&#xff1a;jupyter-notebook 路径打开你想打开的工程的文件 打开jupyter notebook 打开terminal 输入&#xff1a;jupyter-notebook 路径 打开你想打开的工程的文件

odoo17核心概念view7——listview总体框架分析

这是view系列的第七篇文章&#xff0c;今天主要介绍我们最常用的list视图。 1、先看list_view,这是主文件 /** odoo-module */import { registry } from "web/core/registry"; import { RelationalModel } from "web/model/relational_model/relational_mode…

Typora使用PicGo+Gitee上传图片报错403 Forbidden

Typora使用PicGoGitee上传图片报错403 Forbidden Typora使用PicGoGitee上传图片&#xff0c;上传失败了&#xff0c;错误信息如下 打开PicGo的日志文件查看&#xff0c;可以看到错误详情如下 换了一个插件github-plus重新配置&#xff0c;解决了这个问题 再打开日志查看&…

GO基础进阶篇 (九)、临界资源安全问题(锁、channel)

临界资源安全问题 在并发编程中对临界资源的处理不当&#xff0c;往往会导致数据的不一致问题 package mainimport ("fmt""time" )func main() {a : 1go func() {a 2fmt.Println("goroutine", a)}()a 3fmt.Println("a", a)time.Sl…

Swift学习笔记第三节:Set类型

1、代码 import Foundationvar set1: Set<Int> [1, 2, 3, 4, 3] print("定义1: \(set1)") var set2 Set(1...4) print("定义2: \(set2)") print("长度: \(set2.count)") print("是否为空: \(set2.isEmpty)") set1.insert(99)…

ElasticSearch的RestClient结合Sniffer提高可用性

一、背景 由于要安装分词器插件&#xff0c;所以需要重启ElasticSearch集群以使得新安装的插件生效 但是在重启集群的过程中&#xff0c;服务端代码却出现了大量错误&#xff0c;如下所示 java.net.ConnectException: Connection refused    at org.elasticsearch.client.R…

利用策略模式与Spring Boot实现灵活的文件上传功能:多策略选择与动态实现

当涉及文件上传功能时&#xff0c;使用策略模式是一个明智的选择。在Spring Boot中&#xff0c;您可以利用策略模式来实现文件的动态上传功能。这种模式允许您定义一系列的算法&#xff0c;将它们封装成独立的类&#xff0c;使得这些算法可以相互替换&#xff0c;而不影响客户端…

使用 AnyGo 修改 iPhone 手机定位

在当今数字化时代&#xff0c;我们的手机已经成为我们日常生活中不可或缺的一部分。然而&#xff0c;有时我们可能会遇到一些情况&#xff0c;需要修改手机的定位信息。这个需求可能来自于各种不同的原因&#xff0c;包括但不限于保护个人隐私、测试应用程序的地理位置相关功能…

【kafka消息里会有乱序消费的情况吗?如果有,是怎么解决的?】

文章目录 什么是消息乱序消费了&#xff1f;顺序生产&#xff0c;顺序存储&#xff0c;顺序消费如何解决乱序数据库乐观锁是怎么解决这个乱序问题吗 保证消息顺序消费两种方案固定分区方案乐观锁实现方案 前几天刷着视频看见评论区有大佬问了这个问题&#xff1a;你们的kafka消…

前端-nginx.conf文件中proxy_pass变量值的结尾有无斜杠的区别

server {listen 8080;server_name localhost;location ^~/mgrcontrol/{proxy_pass $MGR_SERVICE;}} 在Nginx配置文件中&#xff0c;proxy_pass 指令用于将请求代理到指定的后端服务。在配置中&#xff0c;proxy_pass 后面使用了变量 $MGR_SERVICE&#xff0c;而这个变量的值是…

Linux - 记录问题:怎么通过安装包的方式安装gRPC

适用场景 当docker 构建环境不能链接到github 的时候&#xff0c;就可以使用本地构建的方式 完成对应服务的构建需求。 参考案例 使用本地安装包的方式安装 gRPC 注意&#xff1a; 在Docker构建过程中&#xff0c;某些软件包可能会尝试配置时区&#xff0c;这通常需要交互式…

TVS二极管(瞬变抑制)

TVS二极管(瞬变抑制) 常用电子元器件 TVS二极管/ESD抑制器SZESD7451N2T5G X-DFN-2 代码EE \TVS二极管/ESD抑制器ESD7451N2T5G X-DFN-2 代码EE 文章目录 TVS二极管(瞬变抑制)前言一、TVS二极管(瞬变抑制)是什么二、ESD抑制器SZESD7451N2T5G X-DFN-2 代码EE三、ESD抑制器…

WordPress主题大前端DUX v8.3源码下载

DUX主题8.3版本更新内容&#xff1a; 新增&#xff1a;Cloudflare Turnstile 免费验证功能 新增&#xff1a;子菜单页面模版&#xff0c;支持多级页面 新增&#xff1a;手机端文章内表格自动出现横向滚动条&#xff0c;可集体或单独设置滚动宽度 新增&#xff1a;标签云页面模版…

【MATLAB第86期】基于matlab的Catboost多输入单输出分类预测模型 catboost-1.1.1版本

基于matlab的Catboost多输入单输出分类预测模型 catboost-1.1.1版本 运行环境 windows10 matlab2020a catboost版本&#xff1a;catboost-1.1.1 往期&#xff1a; 【MATLAB第20期】基于matlab的Catboost多输入单输出回归预测模型 catboost-1.1.1版本 一、导入数据 采用12输…

Spark与Hadoop的关系和区别

在大数据领域&#xff0c;Spark和Hadoop是两个备受欢迎的分布式数据处理框架&#xff0c;它们在处理大规模数据时都具有重要作用。本文将深入探讨Spark与Hadoop之间的关系和区别&#xff0c;以帮助大家的功能和用途。 Spark和Hadoop简介 1 Hadoop Hadoop是一个由Apache基金会…

Linux和Win 共享文件夹 搭建使用方法【超简单】+ 共享后无法出现文件夹的解决方式

win和Linux 共享文件夹 超简单的搭建使用方法 一、编辑虚拟机设置二、在Linux下访问共享文件夹三、共享后无法出现文件夹的解决方式 很多时候我们需要在Linux环境下使用一些安装包。 一般都是在win下进行下载&#xff0c;然后通过共享文件夹的方式&#xff0c;共享到虚拟机环境…

Xmake v2.8.6 发布,新的打包插件:XPack

Xmake 是一个基于 Lua 的轻量级跨平台构建工具。 它非常的轻量&#xff0c;没有任何依赖&#xff0c;因为它内置了 Lua 运行时。 它使用 xmake.lua 维护项目构建&#xff0c;相比 makefile/CMakeLists.txt&#xff0c;配置语法更加简洁直观&#xff0c;对新手非常友好&#x…