SpringBoot结合Canal 实现数据同步

1、Canal介绍

     Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。

    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x,Canal通过伪装成mysql从服务向主服务拉取数据。所以首先我们要了解mysql 主从数据同步。

2、Mysql 主从数据同步

1、从库(slave)会生成两个线程,I/O线程(IOthread),SQL线程(SQLthread)。

2、当slave的I/O线程连接到master后,会去请求master的二进制日志(binlog), 此时master会通过logdump(将主库的二进制日志文件内容传输给从库的过程) 给从库传输binlog。

3、 然后slave将拿到的binlog日志依次写入Relaylog(中继日志)的最末端,同时将读取到的Master 的bin-log的文件名和位置记录到master- info文件中,作用为了让slave知道它需要从哪个位置和哪 个日志文件开始同步数据,以保证数据的一致性,并且能够及时获取到master的新的更新操作, 开始数据同步过程。slave不仅在启动时读取 master-info 文件,而且会定期更新该文件中的记 录,以确保记录都是最新的。

4、最后SQL线程会读取Relaylog,并解析为具体操作(比如DDL这种),来实现主从库的操作一致。

3、Canal 原理

1、Canal Server与MySQL建立连接后,会通过模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议获取数据库的 binlog(二进制日志)文件。

2、Canal Server解析binlog文件,通过网络将解析后的事件传输给消息中间件(Kafka,RabbitMQ、Rocket等),实现数据的实时同步。

4、实现过程

       下载Canal,地址为https://github.com/alibaba/canal/releases,然后解压

  修改Canal的配置文件(conf/canal.properties)

指定模式canal.serverMode = rabbitMQ# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2canal.destinations = example# rabbitmq 服务端 iprabbitmq.host = 你的ip(注意不要加端口号哦)# rabbitmq 虚拟主机rabbitmq.virtual.host = /# rabbitmq 交换机rabbitmq.exchange = canal.exchange  (这是本例子用的交换机)# rabbitmq 用户名rabbitmq.username = 你的用户名# rabbitmq 密码rabbitmq.password = 你的密码rabbitmq.deliveryMode =

修改实例配置文件 conf/example/instance.properties。

#配置 slaveId,自定义,不等于 mysql 的 server Id 即可canal.instance.mysql.slaveId=10# 数据库地址:配置自己的ip和端口canal.instance.master.address=你的IP:端口号# 数据库用户名和密码canal.instance.dbUsername=用户名canal.instance.dbPassword=密码# 指定库和表canal.instance.filter.regex=.*\..*    # 这里的 .* 表示 canal.instance.master.address 下面的所有数据库# mq config# rabbitmq 的 routing keycanal.mq.topic=canal.routing.key(这是本例子用的key)

然后重启 canal 服务。

2 、搭建RabbitMq并且 配置Exchange、Routing key。

3、Spring  Boot 实现代码

   引入依赖

<!--引入rabbitmq集成的依赖-->       <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

编辑配置文件

spring:#rabbitmqrabbitmq:#    host: 192.168.31.189host: 192.168.1.12port: 5672virtual-host: /username: adminpassword: adminlistener:simple:         #手动确认消息 ackacknowledge-mode: manualretry:enabled: truemax-attempts: 3max-interval: 1000ms
server:port: 8888

   定义队列的消息接受的数据对象

@Data
public class CanalMessage<T> {private String type;private String table;private List<T> data;private String database;private Long es;private Integer id;private Boolean isDdl;private List<T> old;private List<String> pkNames;private String sql;private Long ts;
}

定义rabbitmq的配置,代码如下

@Configuration
@Slf4j
public class RabbitConfig {/*** 消息序列化配置*/@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {// SimpleRabbitListenerContainerFactory 是 RabbitMQ 提供的一个实现了 RabbitListenerContainerFactory 接口的简单消息监听器容器工厂。// 它的作用是创建和配置 RabbitMQ 消息监听器容器,用于监听和处理消息。SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();//ConnectionFactory 是 RabbitMQ 提供的一个接口,用于创建 RabbitMQ 的连接factory.setConnectionFactory(connectionFactory);//使用了 Jackson2JsonMessageConverter 将消息转换为 JSON 格式进行序列化和反序列化factory.setMessageConverter(  new Jackson2JsonMessageConverter());return factory;}
}

消费的代码

@Component
@RabbitListener(queues = "data")
public class MqNotifyReceive {@RabbitHandlerpublic void receive(CanalMessage CanalMessage) {//进行同步业务处理。。。 }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/11048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络技术主要学什么内容,有哪些课程

计算机网络技术专业是一个涉及理论与实践紧密结合的学科&#xff0c;主要学习内容有计算机网络基础、网络设备技术、网络编程等内容&#xff0c;以下是上大学网&#xff08;www.sdaxue.com&#xff09;整理的计算机网络技术主要学什么内容&#xff0c;供大家参考&#xff01; 基…

20.接口自动化-Git

1、Git和SVN–版本控制系统 远程服务出问题后&#xff0c;可以先提交commit到本地仓库&#xff0c;之后再提交push远程仓库 git有clone Git环境组成部分 常用Git代码仓库服务-远程仓库 GitHub-服务器在国外&#xff0c;慢 GitLab-开源&#xff0c;可以在自己服务器搭建&…

根据docker部署nginx并且实现https

目录 一、Docker中启用HTTPS有几个重要的原因 二、https介绍 三、https过程 四、安装docker-20.10.18 五、如何获取证书 通过阿里云获取证书 六、docker部署nginx并且实现https 6.1准备证书 6.2准备nginx.conf 和 index.html文件 6.3生成容器 6.4浏览器验证证书 一、…

PyTorch的基础用法简介

PyTorch是一个基于Python的开源机器学习库&#xff0c;它提供了灵活的神经网络构建和训练工具。下面是PyTorch的基础用法介绍&#xff1a; 张量&#xff08;Tensors&#xff09;&#xff1a;PyTorch中的基本数据结构是张量&#xff0c;它类似于多维数组。可以通过torch.Tensor…

ssm120基于SSM框架的金鱼销售平台的开发和实现+jsp

金鱼销售平台 摘 要 随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;通过科技手段提高自身的优势&#xff1b;对于金鱼销售平台当然也不能排除在外&#xff0c;随着网络技术的不断成熟&#xff0c;带动了金鱼销售平台&#xff0c;它彻底改…

Oracle一键安装脚本安装教程合集

目前 Oracle 一键安装脚本已经更新到第四代&#xff0c;经作者测试以下版本均可成功安装&#xff01; RedHat/Centos/OracleLinux 6.10 ✅ Oracle 11GR2&#xff08;231017&#xff09;单机Oracle 11GR2&#xff08;231017&#xff09;单机 ASMOracle 11GR2&#xff08;23101…

栈与队列的实现

前言 本次博客将要实现一下栈和队列&#xff0c;好吧 他们两个既可以使用动态数组也可以使用链表来实现 本次会有详细的讲解 栈的实现 栈的基础知识 什么是栈呢&#xff1f; 栈的性质是后进先出 来画个图来理解 当然可不可以出一个进一个呢&#xff0c;当然可以了 比如…

【面试题】音视频流媒体高级开发(2)

面试题6 衡量图像重建好坏的标准有哪些&#xff1f;怎样计算&#xff1f; 参考答案 SNR&#xff08;信噪比&#xff09; PSNR10*log10((2n-1)2/MSE) &#xff08;MSE是原图像与处理图像之间均方误差&#xff0c;所以计算PSNR需要2幅图像的数据&#xff01;&#xff09; SSIM…

Vue路由开启步骤

1.在控制台输入命令 //控制台下载安装npm add vue-router3.6.5 2.在main.js下导入并注册组件 import Vue from vue import App from ./App.vue//控制台下载安装npm add vue-router3.6.5 //导入 import VueRouter from "vue-router";//注册 Vue.use(VueRouter) con…

IntelliJ的Maven编译返回找不到有效证书

文章目录 小结问题及解决找不到有效证书找不到org.springframework.stereotype.Service问题IntelliJ: Cannot resolve symbol springframework 参考 小结 将IntelliJ工程拷贝到新的机器中&#xff0c;返回Maven编译返回找不到有效证书的问题&#xff0c;进行了解决。 问题及解…

实现stract(字符串拼接)函数(C语言)

一、N-S流程图&#xff1b; 二、运行结果&#xff1b; 三、源代码&#xff1b; # define _CRT_SECURE_NO_WARNINGS # include <stdio.h>int main() {//初始化变量值&#xff1b;char a[80], b[80];int i, n1, n2;//填充字符串&#xff1b;printf("请输入字符串a的内…

Python图形界面(GUI)Tkinter笔记(一):根窗口的创建

Tkinter库是Python的内置关于图形界面编程&#xff08;GUI全称为Graphical User Interface&#xff0c;中文意思为“图形用户界面”&#xff09;的一个库。直接导入Tkinter使用即可。 其余笔记&#xff1a;【Python图形界面&#xff08;GUI&#xff09;Tkinter笔记&#xff08;…

蓝桥杯-错误票据(两种写法stringstream和扣字符)

某涉密单位下发了某种票据&#xff0c;并要在年终全部收回。 每张票据有唯一的ID号。 全年所有票据的ID号是连续的&#xff0c;但ID的开始数码是随机选定的。 因为工作人员疏忽&#xff0c;在录入ID号的时候发生了一处错误&#xff0c;造成了某个ID断号&#xff0c;另外一个…

rt-thread 挂载romfs与ramfs

参考&#xff1a; https://www.rt-thread.org/document/site/#/rt-thread-version/rt-thread-standard/tutorial/qemu-network/filesystems/filesystems?id%e4%bd%bf%e7%94%a8-romfs https://www.rt-thread.org/document/site/#/rt-thread-version/rt-thread-standard/tutor…

React 之 useCallback(缓存函数)(十八)

useCallback 是一个允许你在多次渲染中缓存函数的 React Hook。 useCallback 是一个 Hook&#xff0c;所以应该在 组件的顶层 或自定义 Hook 中调用。你不应在循环或者条件语句中调用它。如果你需要这样做&#xff0c;请新建一个组件&#xff0c;并将 state 移入其中。 //fn&am…

计算机毕业设计 | vue+springboot线上考试 在线测试系统(附源码)

1&#xff0c;项目介绍 项目背景 在线考试借助于网络来进行&#xff0c;传统考试所必备的考场和监考对于在线考试来说并不是必要项目&#xff0c;因此可以有效减少组织考试做需要的成本以及设施。同时&#xff0c;由于在线考试系统本身具有智能阅卷的功能&#xff0c;也大大减…

Sping @Autowired依赖注入原理

Spring 依赖注入发生在bean的实例化之后初始化之前的阶段&#xff0c;可以查看&#xff1a;bean的创建过程 Autowired Autowired由AutowiredAnnotationBeanPostProcessor实现依赖注入。 寻找注入点&#xff1a; AutowiredAnnotationBeanPostProcessor实现了MergedBeanDefin…

Python中的compile()函数,动态编译代码的艺术

关注公众号【一点sir】&#xff0c;领取编程资料。 简介 在Python编程中&#xff0c;compile()函数是一个强大的工具&#xff0c;它允许开发者将字符串形式的Python代码动态编译成字节码。这为执行动态生成或从外部源接收的代码提供了极大的灵活性。这些字节码随后可以被Pytho…

TensorFlow基于anaconda3快速构建

基于python构建太累 Installing Packages - Python Packaging User Guide 使用 pip 安装 TensorFlow 有兴趣自己学&#xff0c;我放弃了 -------------------------------------------------------- 下面基于anaconda 1、下载 Index of /anaconda/archive/ | 清华大学开…

电商核心技术揭秘56: 社群营销的未来趋势与挑战

相关系列文章 电商技术揭秘相关系列文章合集&#xff08;1&#xff09; 电商技术揭秘相关系列文章合集&#xff08;2&#xff09; 电商技术揭秘相关系列文章合集&#xff08;3&#xff09; 电商技术揭秘四十一&#xff1a;电商平台的营销系统浅析 电商技术揭秘四十二&#…