✅技术社区—通过Canal框架实现MySQL与ElasticSearch的数据同步

Canal 是一个由阿里巴巴开源的,基于 Java 的数据库变更日志解析的中间件,其原理是基于Binlog订阅的方式实现,模拟一个MySQL Slave 订阅Binlog日志,从而实现CDC,主要用于实现 MySQL 数据库的增量数据同步。它主要的使用场景包括数据库备份、实时数据同步、以及构建数据湖等。Canal 通过模拟 MySQL Slave 的行为,连接到 MySQL Master,实时地解析 Master 节点的 Binlog 日志,然后提取出数据变更信息,支持将数据变更同步到多种类型的下游系统,如 Kafka、ElasticSearch、HBase 等。

为什么要选Canal来进行数据同步

MySQL向ES(elasticsearch)做数据同步其实同步数据有很多方式,有双写同步数据,异步同步数据:前者双写同步数据我们肯定不用的,它实现原理是同时向MVSQL和ES中写入数据,这种性能慢不说,还存在二者还涉及到了分布式事务了,无法保证数据一致性问题,而且还将业务深深耦合起来了,无法做扩展,因此pass。后者异步同步数据方案比较多,比如目前市面上比较火的阿里的Canal和Debezium工具等等,他们都是利用的CDC(数据抓取变更),监听binlog日志做的同步。由于后者Debezium需要集成Kafka,而且需要手写Kafka消费者代码去同步,使得系统更加复杂,实现起来相对Canal比较复杂,因此采用了阿里Canal去做数据同步。

主从复制原理

MySQL的主从复制是依赖于 binlog,也就是记录 MySQL 上的所有变化并以二进制形式保存在磁盘上二进制日志文件。

主从复制就是将 binlog 中的数据从主库传输到从库上,一般这个过程是异步的,即主库上的操作不会等待 binlog 同步地完成。

详细流程如下:

1.主库写 binlog:主库的更新 SQL(update、insert、delete) 被写到 binlog;

2.主库发送 binlog:主库创建一个 log dump 线程来发送 binlog 给从库;

3.从库写 relay log:从库在连接到主节点时会创建一个 IO 线程,以请求主库更新的 binlog,并且把接收到的 binlog 信息写入一个叫做 relay log 的日志文件;

4.从库回放:从库还会创建一个 SQL 线程读取 relay log 中的内容,并且在从库中做回放,最终实现主从的一致性。

Canal工作原理

  1. 模拟 Slave:Canal 服务端模拟 MySQL 的 Slave,通过 MySQL 提供的dump协议连接到 MySQL 的 Master 节点。
  2. 读取和解析Binlog:MySQL 的 master 节点接收到 dump 请求后推送 Binlog 日志给 Canal 服务端,解析 Binlog 对象(原始为byte 流)转成 Json 格式;
  3. 数据同步:Canal 客户端通过 TCP 协议或 MQ 形式监听 Canal 服务端,将解析后的数据变更信息推送到配置的下游系统或应用(Kafka、ElasticSearch、HBase),如通过 Canal Client API 拉取数据变更,或者配置 Canal Adapter 自动同步到特定的数据存储系统。

工作流程

  1. 配置 MySQL:开启 MySQL 的 Binlog 日志记录,并配置 Canal 连接 MySQL 的权限,确保 Canal 可以作为 Slave 连接到 MySQL Server。
  2. 启动 Canal Server:部署并启动 Canal Server,Canal Server 会连接到 MySQL Server,开始监听 Binlog 日志的变更。
  3. 数据解析:Canal Server 解析 Binlog 日志文件,识别数据变更事件,并将这些事件转换为内部数据格式。
  4. 数据同步:通过 Canal Client API 或者配置 Canal Adapter,将解析后的数据变更同步到 ElasticSearch。这一步可以根据实际业务需求定制数据同步的逻辑,例如根据数据变更类型(插入、更新、删除)更新 ElasticSearch 的索引。
  5. 实时搜索:随着 ElasticSearch 索引的实时更新,搜索服务能够提供基于最新数据的搜索结果,保证了搜索的准确性和高效性。

术语补充解释:

Canal Server:Canal的服务端组件,负责连接到MySQL服务器,实时读取并解析MySQL的Binlog日志,然后将解析后的数据变更信息提供给Canal Client或同步到其他中间件。

Canal Adapter:Canal的适配器组件,用于将Canal Server解析出的数据变更信息同步到各种类型的下游系统或中间件中,如Elasticsearch、Kafka等。

instance:实例,在这里通常指Canal的一个运行实例,对应于MySQL中的一个数据库或一组数据库。每个instance独立工作,可以有自己的配置和同步逻辑。

Relay Log:在 MySQL 的主从复制架构中,中继日志(Relay Log)是从服务器(Slave)上的一个关键组件。中继日志用于存储从主服务器(Master)复制过来的二进制日志(Binary Log)事件。这些日志文件在从服务器上被重放(执行),以此来确保从服务器的数据与主服务器保持一致。

操作流程

要在本地使用 Canal 实现 MySQL 数据库和 Elasticsearch 的同步,需要先部署 Canal 和配置 Elasticsearch,然后通过 Canal Adapter 实现数据的同步。

增量同步指的是仅同步自上次同步以来在数据库中发生变更的数据,而不是每次都同步全部数据。

步骤概述

  1. 部署 Canal Server:首先需要在本地安装并启动 Canal Server,使其连接到你的 MySQL 数据库,并开始监听 Binlog 日志。
  2. 配置 Elasticsearch:确保本地已经安装并启动 Elasticsearch。
  3. 使用 Canal Adapter:Canal 提供了官方的 Adapter,用于将数据同步到 Elasticsearch。需要配置 Adapter 以连接到你的 Elasticsearch 实例。

示例配置

1. Canal Server 配置

在 Canal 的配置文件 instance.properties 中,配置 MySQL 数据源信息,以及开启的 Binlog 文件和位置:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.tsdb.enable=true
canal.instance.gtidon=false

2. Canal Adapter 配置

application.yml 中配置 Elasticsearch 的连接信息:

spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8
server:port: 8081
logging:level:com.alibaba.otter: DEBUG
canal.conf:canalServerHost: 127.0.0.1:11111flatMessage: truecanalInstances:- instance: example # Canal instance 名称groups:- outAdapters:- key: eshosts: 127.0.0.1:9200 # Elasticsearch 地址properties:cluster.name: elasticsearch

还需要在 src/main/resources/es/mapping 目录下配置同步的表和索引的映射关系。例如,如果你想同步 mydb.user 表到 Elasticsearch,你需要创建一个对应的映射文件 user.yml

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:_index: user_index_type: _doc_id: _idsql: "SELECT id as _id, name, age FROM user"commitBatch: 3000

3. 启动 Canal Adapter

配置好之后,启动 Canal Adapter。它会自动连接到 Canal Server 和 Elasticsearch,根据配置的映射关系同步数据。

增量同步说明

Canal 通过监听 MySQL 的 Binlog 来实现增量数据同步。当在 MySQL 中对数据进行 INSERT、UPDATE、DELETE 操作时,这些变更会被记录在 Binlog 中。Canal Server 解析 Binlog,获取这些变更,然后 Canal Adapter 根据配置将变更的数据同步到 Elasticsearch。这个过程只同步变更的数据,而不是数据库中的全部数据,因此被称为增量同步。

注意:本示例的配置和代码只是一个基本的指导,具体细节(如版本兼容性、安全设置等)需要根据你的实际环境和需求进行调整。

使用场景

  • 数据库同步:实现从一个数据库实时同步数据到另一个数据库,常见于主从复制、读写分离等场景。
  • 数据迁移与备份:在不影响源数据库性能的前提下,实时备份数据,用于灾备或者数据迁移。
  • 数据仓库构建:将业务数据库的增量数据实时同步到数据仓库中,用于后续的数据分析和挖掘。
  • 搜索引擎索引更新:实时将数据库中的变更同步到搜索引擎(如 ElasticSearch),保持搜索数据的实时性和准确性。

优势

  • 实时性:Canal 基于 Binlog 的增量数据同步机制,能够实现接近实时的数据同步。
  • 低侵入性:Canal 通过模仿 MySQL Slave 的方式进行数据同步,无需修改 MySQL Server 的任何配置(只需开启 Binlog)。
  • 灵活性:Canal 支持多种数据源和数据目的地的同步,用户可以根据需要灵活配置同步任务。
  • 高可用性:Canal 支持集群部署,通过负载均衡和故障转移机制,提高数据同步的稳定性和可靠性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/751343.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

模块化项目Eclipse测试网零撸教程

简介:Eclipse 是一个基于 Solana 区块链的初创项目,致力于构建基于 Solana 虚拟机的通用 Layer2 解决方案,为以太坊提供更快速、更通用的 Rollup 技术。其主要用途是为开发者提供构建基于 Solana 虚拟机的 Rollup 应用的平台,解决…

Vue3-响应式基础:单文件和组合式文件

单文件&#xff1a;html <!DOCTYPE html> <html> <head><title>响应式基础</title> </head> <body><div id"app" ><!-- dynamic parameter:同样在指令参数上也可以使用一个 JavaScript 表达式&#xff0c;需要包…

企业微信H5文件下载。

废话不多说&#xff0c;直接上代码。 1.判断是不是企业微信打开的 const ua navigator.userAgent.toLowerCase() if (/micromessenger/.test(ua)) {} 2.复制功能 navigator.clipboard.writeText(newsUrl).then(() > {this.$message({message: 您已复制文件链接&#xff…

leetcode513找树左下角的值

解法1&#xff1a;BFS 思路就是层序遍历 用队列记住每层的元素&#xff0c;如果每次记住每层的第一个元素 ---->https://programmercarl.com/0102.%E4%BA%8C%E5%8F%89%E6%A0%91%E7%9A%84%E5%B1%82%E5%BA%8F%E9%81%8D%E5%8E%86.html#_102-%E4%BA%8C%E5%8F%89%E6%A0%91%E7%9A…

微调大型语言模型(LLM):应用案例示例

微调大型语言模型&#xff08;LLM&#xff09;&#xff1a;应用案例示例 摘要&#xff1a; 本文讨论了大型语言模型&#xff08;LLM&#xff09;的微调&#xff0c;这是一种通过少量数据训练已经预训练好的模型以执行特定任务的过程。微调可以让LLM在翻译、文本分类、文本生成…

SpringBoot(整合MyBatis + MyBatis-Plus + MyBatisX插件使用)

文章目录 1.整合MyBatis1.需求分析2.数据库表设计3.数据库环境配置1.新建maven项目2.pom.xml 引入依赖3.application.yml 配置数据源4.Application.java 编写启动类5.测试6.配置类切换druid数据源7.测试数据源是否成功切换 4.Mybatis基础配置1.编写映射表的bean2.MonsterMapper…

从零到一构建短链接系统(五)

1.修改UserService Service public class UserServiceImpl extends ServiceImpl<UserMapper, UserDO> implements UserService {public UserRespDTO getUserByUsername(String username) {LambdaQueryWrapper<UserDO> queryWrapper Wrappers.lambdaQuery(UserDO.c…

MySQL实战:监控

监控指标 性能类指标 名称说明QPS数据库每秒处理的请求数量TPS数据库每秒处理的事务数量并发数数据库实例当前并行处理的会话数量连接数连接到数据库会话的数量缓存命中率Innodb的缓存命中率 功能类指标 名称说明可用性数据库是否正常对外提供服务阻塞当前是否有阻塞的会话…

HarmonyOS-鸿蒙系统概述

你了解鸿蒙系统吗&#xff1f; 你看好鸿蒙系统吗&#xff1f; 今年秋季即将推出的HarmonyOS Next 星河版热度空前&#xff0c;一起来了解一下吧。本文将从HarmonyOS 的应用场景、发展历程、架构、开发语言、开发工具、生态建设六个角度聊一聊个人的理解。 1、应用场景 鸿蒙…

Sora提示词与视频创作的融合(一):创意启发:利用提示词激发创作灵感

在Sora模型的创作中&#xff0c;利用提示词激发创作灵感是一个至关重要的环节。提示词作为引导模型生成视频内容的关键因素&#xff0c;不仅能够指导模型按照特定的主题和风格生成内容&#xff0c;还能够激发创作者的灵感&#xff0c;推动创意的产生。下面将详细探讨如何利用提…

深度学习pytorch——拼接与拆分(持续更新)

cat拼接 使用条件&#xff1a;合并的dim的size可以不同&#xff0c;但是其它的dim的size必须相同。 语法&#xff1a;cat([tensor1,tensor2],dim n) # 将tensor1和tensor2的第n个维度合并 代码演示&#xff1a; # 拼接与拆分 a torch.rand(4,32,8) b torch.rand(…

多线程JUC 第2季 wait和notify唤醒机制

一 wait和notify的区别与相同 1.1 wait和notify的作用 1) 使用wait()、notify()和notifyAII()时需要先对调用对象加锁。否则直接调用的话会抛出 IllegalMonitorStateExceptiona。 2) 调用wait()方法后&#xff0c;线程状态。由RUNNING变为WAITING&#xff0c;并将当前线程放置…

无人机是如何进行开发的

无人机开发涉及多个方面&#xff0c;包括硬件设计、软件开发、飞行控制算法等。以下是一个简化的无人机方案开发流程&#xff1a; 需求分析&#xff1a;明确无人机的功能、性能要求和应用场景。硬件选择&#xff1a;选择合适的飞控系统、传感器&#xff08;如陀螺仪、加速度计…

Runnable 和 Callable 的区别?什么是 Callable 和 Future?什么是 FutureTask?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 Runnable 和 Callable 的区别 Runnable接口适用于那些不需要返回结果的任务,而Callable接口适用于需要返回结果的任务,并且可以抛出受检异常。Runnable接…

【LabVIEW FPGA入门】流水线

LabVIEW中流水线 在当今多核处理器和多线程应用程序的世界中&#xff0c;程序员在开发应用程序时需要不断思考如何最好地利用尖端 CPU 的强大功能。尽管用传统的基于文本的语言构建并行代码可能难以编程和可视化&#xff0c;但 NI LabVIEW 等图形开发环境越来越多地允许工程师和…

【Docker】一文趣谈Docker

&#x1f3e1;浩泽学编程&#xff1a;个人主页 &#x1f525; 推荐专栏&#xff1a;《深入浅出SpringBoot》《java对AI的调用开发》 《RabbitMQ》《Spring》《SpringMVC》《项目实战》 &#x1f6f8;学无止境&#xff0c;不骄不躁&#xff0c;知行合一 文章目录 …

ELK日志管理实现的3种常见方法

ELK日志管理实现的3种常见方法 1. 日志收集方法 1.1 使用DaemonSet方式日志收集 通过将node节点的/var/log/pods目录挂载给以DaemonSet方式部署的logstash来读取容器日志,并将日志吐给kafka并分布写入Zookeeper数据库.再使用logstash将Zookeeper中的数据写入ES,并通过kibana…

UGUI界面性能优化2-最大程度降低UI的DrawCall和重绘

降低UI的DrawCall和重绘是优化UI性能的重要手段&#xff0c;可以提升应用的流畅度和响应速度。以下是一些降低UI DrawCall和重绘的方法&#xff1a; 合批绘制&#xff1a;将多个UI元素合并为一个DrawCall&#xff0c;减少绘制调用次数。可以通过将相邻的UI元素合并为一个大的纹…

第七节:Vben Admin权限-后端获取路由和菜单

系列文章目录 第一节:Vben Admin介绍和初次运行 第二节:Vben Admin 登录逻辑梳理和对接后端准备 第三节:Vben Admin登录对接后端login接口 第四节:Vben Admin登录对接后端getUserInfo接口 第五节:Vben Admin权限-前端控制方式 第六节:Vben Admin权限-后端控制方式 第七节…

PHP魔术方法详解

php魔术方法是一些特殊的方法&#xff0c;由特定的环境来进行触发。 这些魔术方法让开发者能够更好地控制对象的行为&#xff0c;特别是在处理不常见的操作或者需要自动化处理某些任务时非常有用。 1、_construct()构造函数&#xff1a; <?php highlight_file(__FILE__);…