SpringBoot 集成 Canal 基于 MySQL 做数据同步

一、canal 组件关系

下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.7/

这里面主要的有两个 canal.deployer-1.1.7.tar.gz 和 canal.adapter-1.1.7.tar.gz,canal.admin-1.1.7.tar.gz 是一个监控服务,可选;

这里说一下 deployer 和 adapter 的关系,deployer 主要是监控源数据的数据变更,也是就所有的 insert、update、delete,

只要数据有变化就通知 adapter ,所以真正负责往目标库写数据的是 adapter 。

二、canal-deployer 配置说明

建议先新建一个文件夹 deployer ,然后把上面下载的压缩包拷进去在解压;

修改 /conf/example/instance.properties,这里只贴出了要修改的地方

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456

这个服务个人理解是一个服务端,程序的客户端会连接他,他监听到数据变化再转发给 adapter

三、canal-adapter 配置说明

建议先新建一个文件夹 adapter ,然后把上面下载的压缩包拷进去在解压;

这里有两个地方要修改,以 mysql 数据同步为例

/conf/application.yml

server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null
​
canal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: -1timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumerkafka.bootstrap.servers: 127.0.0.1:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumerrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumerrabbitmq.host:rabbitmq.virtual.host:rabbitmq.username:rabbitmq.password:rabbitmq.resource.ownerId:
​srcDataSources:defaultDS:url: jdbc:mysql://localhost:3306/hebeiqx?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=falseusername: rootpassword: 123456canalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger- name: rdbkey: mysql1properties:jdbc.driverClassName: com.mysql.jdbc.Driverjdbc.url: jdbc:mysql://localhost:3306/weather?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=falsejdbc.username: rootjdbc.password: 123456

其实这里也没什么改的,srcDataSources 源数据库连接信息,canalAdapters 下面的目标数据库的连接信息,canalAdapters 下面一个实例就是一个 topic

/conf/rdb/mytest_user.yml 这个文件的配置比较奇葩,大概有三种场景

1、单表同步,targetTable 后面直接写目标库表名,这个版本不需要写目标库的名称
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:database: testtable: table1targetTable: table1targetPk:id: idmapAll: truecommitBatch: 7000
2、整个数据库同步,但是有个要求是两个数据库的名字要一致,而且是必须(有疑问?看看3就解决了)
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:mirrorDb: truedatabase: mytest
3、多表同步,网上的案例都是单表的demo,目前还没有看到我这种方式

上面的1里面同步了 table1 这张表,那现在还要同步 table2 这种表怎么办,你是不是以为是这样:

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:- database: testtable: table1targetTable: table1targetPk:id: idmapAll: truecommitBatch: 7000- database: testtable: table2targetTable: table2targetPk:id: idmapAll: truecommitBatch: 7000

上面这种方式启动就直接报错了,网上找了一天也没看到相关资料......

重点:把 mytest_user.yml 复制一份,再里面再配置另一张表就可以了,很脑残但是真管用;

注意这里所有文件的名字都是 xxx_user.yml 这种格式,内容就跟 1 里面的一样,把表名改一下就行;

四、SpringBoot 集成

添加 maven 依赖

<!--canal-->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version><!-- 去掉否则启动报错 --><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></exclusion></exclusions>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.5</version>
</dependency>
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.17.3</version>
</dependency>

客户端连接代码,都是模板代码之间用就行,printEnity 和 printColumn 这俩方法没有也行

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
​
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
@Component
public class CanalClient {
​private Logger logger = LoggerFactory.getLogger(CanalClient.class);
​private static final ThreadFactory springThreadFactory = new CustomizableThreadFactory("canal-pool-");
​private static final ExecutorService executors = Executors.newFixedThreadPool(1, springThreadFactory);
​@Autowiredprivate CanalInstanceProperties canalInstanceProperties;
​@PostConstructprivate void startListening() {canalInstanceProperties.getInstance().forEach(instanceName -> {executors.submit(() -> {connector(instanceName);});});}
​/*** 消费canal的线程池*/public void connector(String instanceName) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), instanceName, "", "");
​try {// 打开连接connector.connect();// 订阅所有消息//connector.subscribe(".*\\..*");// 只订阅test1数据库下的所有表connector.subscribe("hebeiqx.*");// 恢复到之前同步的那个位置connector.rollback();
​int batchSize = 1000;for (; ; ) {// 获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少Message message = connector.getWithoutAck(batchSize);// 获取消息idlong batchId = message.getId();// 获取批量的数量int size = message.getEntries().size();if (size == 0 || batchId == -1) {//logger.info("暂无数据......");try {// 没有数据就等待1秒Thread.sleep(1000);} catch (InterruptedException ignored) {}}if (batchId != -1) {logger.info("数据同步监听中......");logger.info("instance -> {}, msgId -> {}", instanceName, batchId);// 数据处理//printEnity(message.getEntries());// 提交确认connector.ack(batchId);// 处理失败,回滚数据connector.rollback(batchId);}}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}
​private void printEnity(List<CanalEntry.Entry> entries) {for (CanalEntry.Entry entry : entries) {// 开启/关闭事务的实体类型,跳过if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}
​// RowChange对象,包含了一行数据变化的所有特征// 比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange = null;
​try {// 序列化数据rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {e.printStackTrace();}
​assert rowChange != null;
​// 获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChange.getEventType();
​// 打印Header信息logger.info(String.format("================>; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));
​// 判断是否是DDL语句if (rowChange.getEventType() == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {logger.info("sql ------------>{}", rowChange.getSql());}
​// 获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {switch (rowChange.getEventType()) {// 如果希望监听多种事件,可以手动增加casecase UPDATE:printColumn(rowData.getAfterColumnsList());printColumn(rowData.getBeforeColumnsList());break;case INSERT:printColumn(rowData.getAfterColumnsList());break;case DELETE:printColumn(rowData.getBeforeColumnsList());break;default:}}}}
​private void printColumn(List<CanalEntry.Column> columns) {StringBuilder sb = new StringBuilder();for (CanalEntry.Column column : columns) {sb.append("[");sb.append(column.getName()).append(" : ").append(column.getValue()).append("update=").append(column.getUpdated());sb.append("]");sb.append("    ");}logger.info(sb.toString());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/632765.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ROS学习笔记8——实现ROS通信时的常用命令

机器人系统中启动的节点少则几个&#xff0c;多则十几个、几十个&#xff0c;不同的节点名称各异&#xff0c;通信时使用话题、服务、消息、参数等等都各不相同&#xff0c;一个显而易见的问题是: 当需要自定义节点和其他某个已经存在的节点通信时&#xff0c;如何获取对方的话…

gitgud.io+Sapphire注册账号教程

gitgud.io是一个仓库&#xff0c;地址 https://gitgud.io/&#xff0c;点进去之后会看到注册页面。 意思是需要通过注册这个Sapphire账户来登录。点击右边的Sapphire&#xff0c;就跳转到Sapphire的登陆页面&#xff0c;点击创建新账号&#xff0c;就进入注册页面。&#xff0…

SpiderFlow爬虫平台漏洞利用分析(CVE-2024-0195)

1. 漏洞介绍 SpiderFlow爬虫平台项目中spider-flow-web\src\main\java\org\spiderflow\controller\FunctionController.java文件的FunctionService.saveFunction函数调用了saveFunction函数&#xff0c;该调用了自定义函数validScript&#xff0c;该函数中用户能够控制 functi…

Spring | Spring中的Bean--下

Spring中的Bean: 4.Bean的生命周期5.Bean的配装配式 ( 添加Bean到IOC容器的方式 依赖注入的方式 )5.1 基于XML的配置5.2 基于Annotation (注解) 的装配 (更常用&#xff09;5.3 自动装配 4.Bean的生命周期 Spring容器可以管理 singleton作用域的Bean的生命周期&#xff0c;在此…

go语言(七)----slice的声明方式

1、声明方式一 //声明一个slice1是一个切片&#xff0c;但是并没有给slice分配空间var slice1 []intslice1 make([]int,3)2、声明方式二 声明一个slice切片&#xff0c;同时给slice分配空间&#xff0c;3个空间&#xff0c;初始化值是0var slice1 []int make([]int,3)3、声…

ICCV2023 | PTUnifier+:通过Soft Prompts(软提示)统一医学视觉语言预训练

论文标题&#xff1a;Towards Unifying Medical Vision-and-Language Pre-training via Soft Prompts 代码&#xff1a;https://github.com/zhjohnchan/ptunifier Fusion-encoder type和Dual-encoder type。前者在多模态任务中具有优势&#xff0c;因为模态之间有充分的相互…

从临床和科研场景分析ChatGPT在医疗健康领域的应用可行性

2023年4月发表在Journal Medical Systems的文献《Evaluating the Feasibility of ChatGPT in Healthcare: An Analysis of Multiple Clinical and Research Scenarios》&#xff08;评估 ChatGPT 在医疗健康领域的可行性&#xff1a;对多种临床和研究场景的分析&#xff09;介绍…

IPv6自动隧道---6to4中继

6to4中继 普通IPv6网络需要与6to4网络通过IPv4网络互通,这可以通过6to4中继路由器方式实现。所谓6to4中继,就是通过6to4隧道转发的IPv6报文的目的地址不是6to4地址,但转发的下一跳是6to4地址,该下一跳为路由器我们称之为6to4中继。隧道的IPv4目的地址依然从下一跳的6to4地…

kafka消费相关问题(GPT回答版本)

kafka消费相关问题&#xff08;GPT回答版本&#xff09; 在Java中&#xff0c;要避免重复消费Kafka消息&#xff0c;可以使用以下方法 1. 使用消费者组&#xff1a; 在设置Kafka消费者时&#xff0c;可以指定一个消费者组。一个消费者组中可以有多个消费者实例&#xff0c;每…

PPT 编辑模式滚动页面不居中

PPT 编辑模式滚动页面不居中 目标&#xff1a;编辑模式下适应窗口大小、切换页面居中显示 调整视图大小&#xff0c;编辑模式通过Ctrl 鼠标滚轮 或 在视图菜单中点击适应窗口大小。 2. 翻页异常&#xff0c;调整视图大小后&#xff0c;PPT翻页但内容不居中或滚动&#xff0c…

从C到B:消费金融促消费的良性进阶

来源 | 镭射财经&#xff08;leishecaijing&#xff09; 盘点2023年&#xff0c;消费金融在促消费上有诸多尝试&#xff0c;提高消费者的金融服务供给&#xff0c;利率下调&#xff0c;贴息分期等。兼顾效果与可持续发展&#xff0c;将促消费的发力点放在商家端&#xff08;B端…

『MySQL快速上手』-⑩-索引特性

文章目录 1.索引的作用2.索引的理解建立测试表插入多条记录查看结果 2.1 MySQL与磁盘交互的基本单位2.1 为何IO交互要是 Page2.3 理解单个Page2.4 理解多个Page2.5 页目录2.6 单页情况2.7 多页情况2.8 B vs B2.9 聚簇索引 vs 非聚簇索引非聚簇索引聚簇索引 3.索引操作3.1 创建主…

pytest + allure(windows)安装

背景 软硬件环境&#xff1a; windows11&#xff0c;已安装anaconda&#xff0c;python&#xff0c;pycharm用途&#xff1a;使用pytest allure 生成报告allure 依赖java&#xff0c;点击查看java安装教程 allure 下载与安装 从 allure下载网址下载最新版本.zip文件 放在自…

【MySql】MySQL 如何创建新用户

具体代码与实现方法 登录 MySQL&#xff1a; 使用 root 用户或具有相应权限的用户登录到 MySQL。可以使用以下命令&#xff1a; mysql -u root -p这里 -u 后面跟的是用户名&#xff0c;-p 表示提示输入密码。 创建新用户&#xff1a; 使用以下 SQL 命令创建新用户&#xff1a;…

基于YOLOv8深度学习的葡萄簇目标检测系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

【llm 微调code-llama 训练自己的数据集 一个小案例】

这也是一个通用的方案&#xff0c;使用peft微调LLM。 准备自己的数据集 根据情况改就行了&#xff0c;jsonl格式&#xff0c;三个字段&#xff1a;context, answer, question import pandas as pd import random import jsondata pd.read_csv(dataset.csv) train_data data…

pyspark 笔记:窗口函数window

窗口函数相关的概念和基本规范可以见&#xff1a;pyspark笔记&#xff1a;over-CSDN博客 1 创建Pyspark dataFrame from pyspark.sql.window import Window import pyspark.sql.functions as F employee_salary [("Ali", "Sales", 8000),("Bob&qu…

USACO介绍 报名流程 成绩查询方式详解(文末有备赛资料)

USACO美国计算机奥林匹克活动 2023-2024新赛季的时间线安排是怎么样的&#xff1f; 2023-2024USACO竞赛时间 一般来说&#xff0c;USACO竞赛时间在12月-3月期间&#xff0c;每月都有一场比赛每次3-5小时&#xff0c;并在规定时间内完成3-4道题。23-24年USACO竞赛时间安排如下&a…

uniapp h5 生成 ubuntu桌面程序 并运行方法

uniapp h5 生成 ubuntu桌面程序 并运行方法,在window环境下开发&#xff0c;发布到ubuntu桌面&#xff0c;并运行 1、安装Nodejs 安装包官方下载地址&#xff1a;https://www.nodejs.com.cn/ 安装完后cmd&#xff0c;如图&#xff0c;即安装成功 2、通过Nodejs安装 electron…

[flutter]GIF速度极快问题的两种解决方法

原因&#xff1a; 当GIF图没有设置播放间隔时间时&#xff0c;电脑上会默认间隔0.1s&#xff0c;而flutter默认0s。 解决方法一&#xff1a; 将图片改为webp格式。 解决方法二&#xff1a; 为图片设置帧频率&#xff0c;添加播放间隔。例如可以使用GIF依赖组件设置每秒运行…