Canal的学习

Canal

  • 基本概念
  • 整合SpringBoot

基本概念

Canal是一个基于MySQL数据库增量日志解析,提供增量数据订阅和消费,支持将增量数据投递到下游消费者(如Kafka、RocketMQ等)或者存储(如 Elasticsearch、HBase 等)的组件。也就是说Canal可以感知到MySQL数据变动,然后解析变动数据,将变动数据发送到MQ或者同步到其他数据库,等待进一步业务逻辑处理。

原理
Canal将自己伪装为MySQL slave,向MySQL master发送dump协议。MySQL master收到 dump 请求,推送binary log给slave(这里指Canal)Canal接收并解析Binlog 日志,得到变更的数据,执行操作。
MySQL Binlog的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配置 binlog_format= statement|mixed|row

分类介绍优点缺点
STATEMENT语句级别,记录每一次执行写操作的语句,相对于ROW模式节省了空间,但是可能产生数据不一致如update tt set create_date=now(),由于执行时间不同产生饿得数据就不同节省空间可能造成数据不一致
ROW行级,记录每次操作后每行记录的变化。假如一个update的sql执行结果是1万行statement只存一条,如果是row的话会把这个1万行的结果存这。持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果占用较大空间
MIXED是对statement的升级,如当函数中包含 UUID() 时,包含 AUTO_INCREMENT 字段的表被更新时,执行 INSERT DELAYED 语句时,用 UDF 时,会按照 ROW的方式进行处理节省空间,同时兼顾了一定的一致性还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便

整合SpringBoot

配置MySQL
修改 my.cnf 中配置

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1       # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

配置Canal服务端:
配置修改conf/example/instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 100
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = xxx 
canal.instance.dbPassword = xxx

依赖导入

<dependency>  <groupId>com.alibaba.otter</groupId>  <artifactId>canal.client</artifactId>  <version>${canal.version}</version>  
</dependency>  

新增组件:

@Component  
public class CanalClient {  private final static int BATCH_SIZE = 1000;  public void run() {  // 创建链接  CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "canal-exchange", "canal", "canal");  try {  //打开连接  connector.connect();  //订阅数据库表,全部表  connector.subscribe(".*\..*");  //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿  connector.rollback();  while (true) {  // 获取指定数量的数据  Message message = connector.getWithoutAck(BATCH_SIZE);  //获取批量ID  long batchId = message.getId();  //获取批量的数量  int size = message.getEntries().size();  //如果没有数据  if (batchId == -1 || size == 0) {  try {  //线程休眠2秒  Thread.sleep(2000);  } catch (InterruptedException e) {  e.printStackTrace();  }  } else {  //如果有数据,处理数据  printEntry(message.getEntries());  }  //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。  connector.ack(batchId);  }  } catch (Exception e) {  e.printStackTrace();  } finally {  connector.disconnect();  }  }   /**  * 打印canal server解析binlog获得的实体类信息  */  private static void printEntry(List<CanalEntry.Entry> entrys) {  for (CanalEntry.Entry entry : entrys) {  if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {  //开启/关闭事务的实体类型,跳过  continue;  }  //RowChange对象,包含了一行数据变化的所有特征  //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等  CanalEntry.RowChange rowChage;  try {  rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());  } catch (Exception e) {  throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);  }  //获取操作类型:insert/update/delete类型  CanalEntry.EventType eventType = rowChage.getEventType();  //打印Header信息  System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",  entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  eventType));  //判断是否是DDL语句  if (rowChage.getIsDdl()) {  System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());  }  //获取RowChange对象里的每一行数据,打印出来  for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {  //如果是删除语句  if (eventType == CanalEntry.EventType.DELETE) {  printColumn(rowData.getBeforeColumnsList());  //如果是新增语句  } else if (eventType == CanalEntry.EventType.INSERT) {  printColumn(rowData.getAfterColumnsList());  //如果是更新的语句  } else {  printColumn(rowData.getBeforeColumnsList());  //变更后的数据   printColumn(rowData.getAfterColumnsList());  }  }  }  }  private static void printColumn(List<CanalEntry.Column> columns) {  for (CanalEntry.Column column : columns) {  System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  }  }  
}  

一般建议整合消息中间件使用,例如Rabbitmq。下面就是整合Rabbitmq的步骤。
第一步:修改canal.properties中的serverMode

canal.serverMode = rabbitMQ  

第二步:修改instance.properties中的topic:

canal.mq.topic=canal-routing-key  

之后就是配置RabbitMQ。修改MySQL中的一条消息,Canal就会发送信息到RabbitMQ,就能从监听的RabbitMQ队列中得到该条消息。

	@RabbitHandler  public void process(Map<String, Object> msg) {    ......}  

来源:
SpringBoot整合 Canal、RabbitMQ 监听数据变更
Java:SpringBoot整合Canal+RabbitMQ组合实现MySQL数据监听
B站课程:一小时让你快速上手Canal数据同步神技

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/586876.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nodejs+vue+微信小程序+python+PHP特困救助供养信息管理系统-计算机毕业设计推荐

通过走访某特困救助供养机构实际情况&#xff0c;整理特困救助供养机构管理的业务流程&#xff0c;分析当前特困救助供养机构管理存在的各种问题&#xff0c;利用软件开发思想对特困救助供养机构特困救助供养机构管理进行系统设计分析。通过服务端程序框架进行设计&#xff0c;…

Python基础语法笔记 tkinter的简单使用

语法 物质 动态类型语言,不需要声明类型 数字 类型int float bool 操作 //整除 **幂 字符串 str1 "Hello python" str2 "world" print(str1 * 3) # 重复输出 print(str1[1]) # 索引访问 print(str1 " " str2) # 拼接 print(str1[2…

MFC - 给系统菜单(About Dialog)发消息

文章目录 MFC - 给系统菜单(About Dialog)发消息概述笔记resource.h菜单的建立菜单项的处理MSDN上关于系统菜单项值的说法END MFC - 给系统菜单(About Dialog)发消息 概述 做了一个对话框程序, 在系统菜单(在程序上面的标题栏右击)中有"关于"的菜单. 这个是程序框架…

java常见面试题:请解释一下Java中的常用分布式框架,如Spring Boot、Dubbo等。

下面我将详细介绍Java中的两个常用分布式框架&#xff1a;Spring Boot和Dubbo。 1. Spring Boot Spring Boot是一个用于创建独立、可运行的、生产级别的Spring应用程序的框架。它简化了Spring应用程序的创建和部署&#xff0c;使得开发人员能够专注于编写业务逻辑&#xff0c…

【MySQL】事务Transaction

1. 事务的概念 事务是什么 在业务逻辑中使用sql&#xff0c;面对一些较复杂的场景&#xff0c;是需要多个sql语句组合起来实现的。如&#xff1a;银行的转账业务&#xff0c;若客户A要转账100元给客户B&#xff0c;就要两条sql&#xff1a;A余额减100&#xff0c;B余额加100&a…

ES6语法(五)封装模块化公共工具函数、引入npm包 ,并上传到npm中进行下载

1. 模块化 模块化是指将一个大的程序文件&#xff0c;拆分为许多小的文件&#xff08;模块&#xff09;&#xff0c;然后将小的文件组合起来。 1.1. 优点 &#xff08;1&#xff09;防止命名冲突 &#xff08;2&#xff09;代码复用 &#xff08;3&#xff09;高维护性 &…

【CFP-专栏2】计算机类SCI优质期刊汇总(含IEEE/Top)

一、计算机区块链类SCI-IEEE 【期刊概况】IF:4.0-5.0, JCR2区&#xff0c;中科院2区&#xff1b; 【大类学科】计算机科学&#xff1b; 【检索情况】SCI在检&#xff1b; 【录用周期】3-5个月左右录用&#xff1b; 【截稿时间】12.31截稿&#xff1b; 【接收领域】区块链…

利用idea+ jclasslib插件查看和分析 Java 类文件的字节码

jclasslib介绍 jclasslib 插件是一个用于 IntelliJ IDEA 的工具&#xff0c;它允许开发者在集成开发环境&#xff08;IDE&#xff09;内直接查看和分析 Java 类文件的字节码。这个插件尤其对于想要深入了解 Java 字节码、类加载机制、以及 Java 虚拟机&#xff08;JVM&#xf…

MongoDB聚合:$replaceRoot

定义 $replaceRoot使用指定的文档替换输入文档。该操作可替换输入文档的所有字段&#xff0c;包括_id字段。可以将内嵌文档提升到顶层&#xff0c;可以为提升文档创建新文档。 注意&#xff1a; 从MongoDB4.2开始增加了$replaceWith&#xff0c;执行与$replaceRoot类似的动作…

网络基础操作练习

知识改变命运&#xff0c;技术就是要分享&#xff0c;有问题随时联系&#xff0c;免费答疑&#xff0c;欢迎联系&#xff01; 手把手教你操作华为设备&#xff0c;新手必看。 实验拓扑图 关于命令行视图 1&#xff09;用户视图 <Huawei> 2&#xff09;系统视图 [Hu…

C++初阶(类中的默认成员函数)

呀哈喽&#xff0c;我是结衣 今天给大家带来的是类里面的默认成员函数&#xff0c;一共有六个默认的成员函数哦&#xff0c;包括构造函数&#xff0c;析构函数&#xff0c;拷贝构造函数&#xff0c;运算符重载函数&#xff0c;const成员函数&#xff0c;那么正篇开始。 文章目…

java常见面试题:请解释一下Java中的常用网络协议,如HTTP、TCP/IP等。

Java是一种广泛使用的编程语言&#xff0c;它提供了丰富的库来支持各种网络协议&#xff0c;如HTTP、TCP/IP等。以下是这些协议的详细解释&#xff1a; HTTP (Hypertext Transfer Protocol)&#xff1a; 用途&#xff1a;HTTP是用于从服务器请求和发送网页的协议。当我们浏览网…

Go语言中的性能考虑和优化

优化您的Go代码以达到最佳性能 性能优化是软件开发的关键方面&#xff0c;无论您使用哪种编程语言。在这篇文章中&#xff0c;我们将探讨Go语言中的性能考虑和优化&#xff0c;Go是一种以其效率而著称的静态类型和编译语言。我们将深入探讨三个关键领域&#xff1a;分析并发代…

pytorch01:概念、张量操作、线性回归与逻辑回归

目录 一、pytorch介绍1.1pytorch简介1.2发展历史1.3pytorch优点 二、张量简介与创建2.1什么是张量&#xff1f;2.2Tensor与Variable2.3张量的创建2.3.1 直接创建torch.tensor()2.3.2 从numpy创建tensor 2.4根据数值创建2.4.1 torch.zeros()2.4.2 torch.zeros_like()2.4.3 torch…

AutoSAR(基础入门篇)4.6-Autosar_BSW的Watchdog功能

Autosar_Watchdog功能 一、Autosar_BSW的Watchdog功能 1、Watchdog组件图 2、架构与术语解释 二、看门狗规范

开源可观测性平台Signoz(四)【链路监控及数据库中间件监控篇】

转载说明&#xff1a;如果您喜欢这篇文章并打算转载它&#xff0c;请私信作者取得授权。感谢您喜爱本文&#xff0c;请文明转载&#xff0c;谢谢。 前文链接&#xff1a; ​​开源可观测性平台Signoz系列&#xff08;一&#xff09;【开篇】​​ ​​开源可观测性平台Signoz&…

CSS之元素转换

我想大家在写代码时有一个疑问&#xff0c;块级元素可以转换成其他元素吗&#xff1f; 让我为大家介绍一下元素转换 1.display:block(转换成块元素) display&#xff1a;block可以把我们的行内元素或者行内块元素转换成块元素 接下来让我为大家演示一下&#xff1a; <!DO…

tcpdump出现permission denied

在使用tcpdump -i eth0 src host 192.168.0.184 and ip and port 22 -nn -w ping.pacp命令抓包并把抓到的数据保存到ping.pacp时&#xff0c;出现了权限错误的报错。但实际上我这里用的是root用户执行的命令。 查阅man手册发现: 在tcpdump中&#xff0c;-Z选项用于在启动数据…

CSS 动态提示框

​​ <template> <div class"terminal-loader"><div class"terminal-header"><div class"terminal-title">提示框</div><div class"terminal-controls"><div class"control close"…

【Matlab】BP 神经网络时序预测算法

资源下载&#xff1a; https://download.csdn.net/download/vvoennvv/88681507 一&#xff0c;概述 BP 神经网络是一种常见的人工神经网络&#xff0c;也是一种有监督学习的神经网络。其全称为“Back Propagation”&#xff0c;即反向传播算法。BP 神经网络主要由输入层、隐藏层…