canal 服务安装

简介:Canal 是阿里巴巴开源的一个基于 MySQL 数据库增量日志解析的中间件,用于提供准实时的数据同步功能。

准备工作

1.修改配置文件 

,需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
 2.授权

canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

1.下载与安装

首先从 GitHub 上下载 Canal ,解压后根据官方文档配置 Canal。

我这里下载的快照版,

直接在windows上解压,

在lib目录下添加如下jar

2.配置 Canal

修改 conf/example/instance.properties 文件,主要配置项包括 MySQL 的地址、端口、用户名、密码以及要监听的数据库和表等。

这里是本地部署,可不做修改


3.启动 Canal Server

通过命令行启动 Canal Server,如 startup.bat example,这里的 example 是实例名称,对应配置文件夹下的配置文件名。

执行 startup.bat example 命令后,logs/canal/canal.log 打印如下日志证明启动成功;

 4.测试

1.导入依赖:
        <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>
 2.复制示例代码启动
public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "canal", "canal");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt;修改前");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; 修改后");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
3.在源数据库的表中修改数据

在控制台会出现如下效果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/31463.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

五种实用方法!手把手教你系统盘瘦身

随着电脑的使用时间变长&#xff0c;电脑硬盘会逐渐被各种类型的数据占满&#xff0c;其中系统盘的可用空间也在慢慢变小。这是因为系统在运行过程中会产生大量临时文件和缓存文件&#xff0c;同时&#xff0c;系统的每一次更新升级也都会生成相关的文件夹存放在系统盘中&#…

LeetCode题练习与总结:克隆图--133

一、题目描述 给你无向 连通 图中一个节点的引用&#xff0c;请你返回该图的 深拷贝&#xff08;克隆&#xff09;。 图中的每个节点都包含它的值 val&#xff08;int&#xff09; 和其邻居的列表&#xff08;list[Node]&#xff09;。 class Node {public int val;public L…

阐述一下Golang中defer的原理

基本用法 在Go语言中&#xff0c;defer关键字用于在函数返回前执行一段代码或调用一个清理函数。这对于处理文件关闭、解锁或者返回一些资源到资源池等操作非常有用。 其基本用法如下所示&#xff1a; package mainimport "fmt"func main() {example() }func exam…

如何使用Rekono结合多种工具自动完成渗透测试

关于Rekono Rekono是一款功能强大的自动化渗透测试工具&#xff0c;该工具能够结合其他多种网络安全工具并以自动化的形式完成整个渗透测试过程。在工具运行的过程中所收集到的数据将通过电子邮件或Telegram同时发送给用户&#xff0c;如果需要更加高级的漏洞管理功能&#xf…

浏览器(Browser):轻量级浏览器,高效浏览新体验

在可的哥桌面&#xff08;Codigger Desktop&#xff09;&#xff0c;我们始终秉持创新精神&#xff0c;致力于提供卓越的用户体验。如今&#xff0c;我们激动地宣布一项全新功能的发布——轻量级浏览器Browser。这款浏览器的推出&#xff0c;正是我们对用户体验追求的再次体现&…

设施布置之车间布局优化SLP分析

一 物流分析&#xff08;Flow Analysis&#xff09; 的基本方法 1、当物料移动是工艺过程的主要部分时&#xff0c;物流分析就是工厂布置设计的核心工作&#xff0c;也是物料搬运分析的开始。 2、零部件物流是该部件在工厂内移动时所走过的路线&#xff0c; 物流分析不仅要考虑…

智慧水务十大预测

智慧水务是指利用现代信息技术&#xff0c;如物联网(IoT)、大数据、云计算、人工智能(AI)等&#xff0c;对水务系统进行智能化改造和管理&#xff0c;以提高水资源的利用效率、保障供水安全、降低运营成本和环境影响。以下是对智慧水务未来发展的十大预测&#xff1a; 数字化转…

免费ai写作?这三款软件是你的好帮手!

在信息爆炸的今天&#xff0c;自媒体已成为越来越多人展现自我、分享知识的平台。然而&#xff0c;对于许多自媒体创作者来说&#xff0c;写作过程中的灵感枯竭、文笔不畅等问题常常困扰着他们。幸运的是&#xff0c;随着人工智能技术的飞速发展&#xff0c;免费AI写作软件应运…

小程序onLoad 和 onShow

onLoad 和 onShow 是小程序页面的生命周期函数&#xff0c;它们在不同的时机触发&#xff0c;具有不同的用途和执行顺序 1.onLoad: &#xff08;1&#xff09;onLoad 在页面加载时触发&#xff0c;仅执行一次。 &#xff08;2&#xff09;用于页面的初始化操作&#xff0c;例如…

2024全国高校名单发布,电子版下载!

今天&#xff0c;教育部网站发布了《全国高等学校名单》。截至2024年6月20日&#xff0c;全国高等学校共计3117所&#xff0c;其中&#xff1a;普通高等学校2868所&#xff0c;含本科学校1308所、高职&#xff08;专科&#xff09;学校1560所&#xff1b;成人高等学校249所。本…

java试卷练习1

试卷练习1 一、单项选择题。 在每小题列出的四个备选项中只有一个选项是符合目的要求的&#xff0c;请将其代码填写在 题后的括号内。 1、Java 语言中&#xff0c;byte 类型占用的二进制位数是&#xff1a;&#xff08;D&#xff09; A、1 位 B、2 位 C、4 位 D、8 位 解释…

任务4.8.3 利用SparkSQL统计每日新增用户

实战概述&#xff1a;利用SparkSQL统计每日新增用户 任务背景 在大数据时代&#xff0c;快速准确地统计每日新增用户是数据分析和业务决策的重要部分。本任务旨在使用Apache SparkSQL处理用户访问历史数据&#xff0c;以统计每日新增用户数量。 任务目标 处理用户访问历史数…

简单了解雪花算法

雪花算法是什么 不多解释。看一看 具体是怎么 生产 唯一ID 的。 ID 由多个数据组合拼接成64位&#xff0c;分别是 时间戳 服务器节点ID 序列号&#xff0c;每个数据项占的位数不固定&#xff0c;可以根据实际需求设置。首位 1 个二进制位 是 符号位。 public long allocate(l…

数字样机:飞行器状态控制系统仿真

引言&#xff1a;数字样机起源于20世纪90年代&#xff0c;是一种用数字化模型代替实际物理样机进行仿真分析的技术。 传统的飞行器研发流程往往遵循一套特定的循环结构&#xff1a;在设计初期&#xff0c;工程人员需要对飞行器提供一个综合的设计思路&#xff08;初期蓝图&…

“河南省勘察设计资质整合趋势与企业应对“

"河南省勘察设计资质整合趋势与企业应对" 河南省勘察设计资质的整合趋势与企业应对策略可以从以下几个方面来分析&#xff1a; 整合趋势&#xff1a; 资质标准简化与合并&#xff1a;随着国家和地方政府深化“放管服”改革&#xff0c;勘察设计资质的管理趋向简化&…

Linux【实操篇-文件目录类命令】

05【实操篇-文件目录类命令】 1.pwd 显示当前工作目录的绝对路径 pwd:print working directory 打印工作目录 到现在为止&#xff0c;我们还不知道自己在系统的什么地方。在浏览器上&#xff0c;我们能够通过导航栏上的url&#xff0c;了解到自己在互联网上的具体坐标。相似的…

Linux权限理解

目录 一.权限的概念 二.Linux权限管理 1.文件访问者的分类&#xff08;人&#xff09; 2.文件类型和访问权限&#xff08;事物属性&#xff09; 3.文件权限值的表示方法 4.文件访问权限的相关设置方法 5.修改文件的拥有者 6.修改文件所属组 7.查看或修改文件权限掩码 …

轻松解决Android复杂数据结构序列化

问题描述 当我编写quickupload库时&#xff0c;因为需要在 Service中进行上传任务&#xff0c;向Service传递时我发现需要传递的数据很多并且结构复杂&#xff0c;如果处理不好就会导致以下几个问题 耗时: 需要更多时间进行开发和测试以确保正确的数据处理。容易出错: 由于手…

C语言:指针笔试题

// 输入某一年的第几天&#xff0c;计算并输出它是这一年的第几月第几日。 /* 函数功能: 对给定的某一年的第几天&#xff0c;计算它是这一年的第几月第几日。 函数入口参数: 整形变量year,存储年&#xff1b; 整形变量yearDay,存储某一年的第几天&am…

【ubuntu】用户添加root权限

添加root用户添加新用户并赋予权限 文件只读&#xff0c;无法更改 rootubuntu-server:/home/ubuntu# vi /etc/sudoers rootubuntu-server:/home/ubuntu# vi /etc/sudoers rootubuntu-server:/home/ubuntu# chmod -R 777 /etc/sudoers rootubuntu-server:/home/ubuntu# vi /et…