canal1.1.7实战

1.环境搭建

canal可以用来监听mysql数据库的变化,用来同步数据

先下载最新的部署版本,release地址:Releases · alibaba/canal · GitHub

包下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

 下载完后,在linux上新建一个canal文件夹,放入tar包解压: tar -zxvf canal.xxx.tar.gz

解压完后修改配置文件

查看conf/canal.properties,其中canal.port是客户端连接的端口,需要放开,canal.admin.user和canal.admin.passwd是客户端连接的账号

再打开conf/example/ instance.properties, master.address填数据库地址,dbUsername和dbPassword是数据库账号,flter.regex可以用来过滤数据库,默认是监听所有数据库,如果想监听db_开头的数据可以这么写db_.*\\..*,多个用逗号分隔

 修改完成后,进入bin目录,执行./startup.sh是启动,./stop.sh是关闭

进入logs/example,执行tail -f -n 300 example.log,看到以下输出说明搭建成功了

 2.客户端代码

引入依赖

  <dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.7</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.7</version></dependency></dependencies>

代码实现:

package cn.hollycloud.iplatform;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;/*** Unit test for simple App.*/
@Slf4j
public class CanalTest {private Map<String, String> errorMap = new HashMap<>();@Testpublic void testCanal() {initThread();}private void initThread() {new Thread(new Runnable() {@Overridepublic void run() {while (true) {try {initConnect();} catch (Exception e) {String key = "canal_connection_error";if (!hasSameError(key, e.getMessage())) {log.error("canal连接出错: {}", e);}}try {Thread.sleep(10000);} catch (InterruptedException e) {}}}}).start();}private void initConnect() {String canalIp = "localhost";int canalPort = 11111;String canalDestination = "example";String canalUsername = "admin";String canalPassword = "123456";CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp,canalPort), canalDestination, canalUsername, canalPassword);int batchSize = 200;try {connector.connect(); // 连接到canal serverconnector.subscribe("db_.*\\..*"); // 订阅指定的消息connector.rollback(); // 回滚到未进行ack 的地方log.info("canal连接成功");while (true) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {try {//未获取到消息则睡眠Thread.sleep(2000);} catch (InterruptedException e) {}} else {try {//处理消息log.info("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());handleMessage(message.getEntries());} catch (Exception e) {connector.rollback(batchId); // 处理失败, 回滚数据String key = "canal_sync_data_error";String errMsg = e.getMessage();if (StringUtils.isEmpty(errMsg)) errMsg = e.toString();if (!hasSameError(key, errMsg)) {log.error("同步数据出错: {}", e);}//休眠一段时间继续获取数据try {Thread.sleep(10000);} catch (InterruptedException ex) {ex.printStackTrace();}continue;}}connector.ack(batchId); // 提交确认}} finally {connector.disconnect();}}private boolean hasSameError(String key, String error) {String lastError = errorMap.get(key);if (Objects.equals(lastError, error)) {return true;}errorMap.put(key, error);return false;}private void handleMessage(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}//根据数据库名获取租户名String databaseName = entry.getHeader().getSchemaName();String tableName = entry.getHeader().getTableName();log.info("数据库: {}, 表名: {}", databaseName, tableName);// 获取类型CanalEntry.EntryType entryType = entry.getEntryType();// 获取序列化后的数据ByteString storeValue = entry.getStoreValue();if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// 反序列化数据CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);// 获取当前事件的操作类型CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE|| eventType == CanalEntry.EventType.DELETE) {// 获取数据集List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();// 遍历rowDataList,并打印数据集for (CanalEntry.RowData rowData : rowDataList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();// 变更前数据for (CanalEntry.Column column : beforeColumnsList) {log.info("变更前数据: name: {}, value: {}", column.getName(), column.getValue());}// 变更后数据for (CanalEntry.Column column : afterColumnsList) {log.info("变更后数据: name: {}, value: {}", column.getName(), column.getValue());}}}}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/149321.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

虚拟路由冗余协议_VRRP

#初次知晓_2023-11-16 #中职在读 #网络_交换机配置部分 虚拟路由冗余协议_VRRP VRRP_概念VRRP_作用VRRP_概述VRRP_两种状态VRRP_配置配置命令&#xff1a; VRRP_概念 VRRP是一种选择协议_LAN接入设备备份协议 它可以把一个虚拟路由器的责任动态分配到局域网上的 VRRP 路由器中…

前端uniapp列表下拉到底部加载下一页列表【下拉加载页面/带源码/实战】

目录 一. 图片1.2. 二.list.vue三.uni-load-more.vue最后 一. 图片 1. 2. 二.list.vue <template><view><!--列表--><scroll-view scroll-y"true" class"scroll-Y" :style"height: scrollviewHigh px;" lower-threshol…

设计模式(二)-创建者模式(3)-抽象工厂模式

一、为什么需要抽象工厂模式&#xff1f; 在工厂模式中&#xff0c;我们需要定义多个继承于共同工厂抽象基类的工厂子类&#xff0c;这些子类负责创建一个对应的对象。工厂模式存在一个缺点就是&#xff1a;每次扩展新的工厂子类&#xff0c;就会增加系统的复杂度。 如果我们…

TP_Link WR886N 硬改闪存16M内存64M,刷入openwrt

一、换内存&#xff0c;拆闪存&#xff1a; 1、先原机开机试试是否功能正常&#xff1b; 2、拆机&#xff0c;比较难拆&#xff0c;容易坏外壳&#xff1b; 3、找到内存和闪存&#xff0c;用胶带把边上的小元件&#xff0c;电阻都贴好&#xff1b; 4、加助焊油&#xff0c;用风…

PC3329L DC-DC降压 10V-100V输入3A大流输出带EN功能实现零功耗只需极少元器件

1. PC3392L特性  通过使能脚关断实现零功耗  宽电压输入范围 10V 至 100V  最大输出电流 3A  集成功率 MOS 管  外围器件少  输出短路保护  温度保护  逐周期限流  输出电压灵活可靠  ESOP8 2. 描述 PC3392L 一款宽电压范围降压型 DC-DC 电源管…

C# 实现腾讯云多路直播流的云端混合录制

目录 应用场景 腾讯云直播和云点播 产品架构 混流显示示例 关键代码 API实现 小结 应用场景 在云考试或视频面试中&#xff0c;除了对考生、考官的实时音视频监控以防止作弊行为的发生以外&#xff0c;对直播流的音视频录制也尤为重要&#xff0c;可做为后期证据材料进…

斯坦福机器学习 Lecture2 (假设函数、参数、样本等等术语,还有批量梯度下降法、随机梯度下降法 SGD 以及它们的相关推导,还有正态方程)

假设函数定义 假设函数&#xff0c;猜一个 x->y 的类型&#xff0c;比如 y ax b&#xff0c;随后监督学习的任务就是找到误差最低的 a 和 b 参数 有时候我们可以定义 x0 1&#xff0c;来让假设函数的整个表达式一致统一 如上图是机器学习中的一些术语 额外的符号&#xf…

C# GC机制

在C#中&#xff0c;垃圾回收&#xff08;Garbage Collection&#xff0c;简称GC&#xff09;是CLR&#xff08;公共语言运行时&#xff09;的一个重要部分&#xff0c;用于自动管理内存。它会自动释放不再使用的对象所占用的内存&#xff0c;避免内存泄漏&#xff0c;减少程序员…

Vue 3 和 Spring Boot 3 的操作流程和执行步骤详解

1.介绍 在本篇博客中&#xff0c;我们将详细介绍Vue 3 和 Spring Boot 3 的操作流程和执行步骤。Vue 3 是一款流行的前端框架&#xff0c;而Spring Boot 3 是一款广泛应用于后端开发的框架。通过结合使用这两个框架&#xff0c;我们可以构建出功能强大的全栈应用。 2.Vue 3 的操…

spring boot加mybatis puls实现,在新增/修改时,对某些字段进行处理,使用的@TableField()

1.先说场景&#xff0c;在对mysql数据库表数据插入或者更新时都得记录时间和用户id 传统实现有点繁琐&#xff0c;这里还可以封装一下公共方法。 2.解决方法&#xff1a; 2.1&#xff1a;使用aop切面编程&#xff08;记录一下&#xff0c;有时间再攻克&#xff09;。 2.2&…

宝塔站点配置

我这里使用的thinkphp 框架部署的

Leetcode合集】1. 两数之和

1. 两数之和 1. 两数之和 代码仓库地址&#xff1a; https://github.com/slience-me/Leetcode 个人博客 &#xff1a;https://slienceme.xyz 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并…

确保软件供应链安全,不容忽略的三大步骤

作者&#xff1a;JFrog大中华区总经理董任远 业务成功的关键驱动力在于企业开发和交付软件的速度。团队任务是不断寻找尽可能高效的工作新方法&#xff0c;通常会借助开源库和组件来加快交付速度。事实上&#xff0c;有研究表明市场上多达 97%的应用程序都使用开源软件。 虽然…

vue3中祖孙组件之间的通信provide和inject

一、在vue3中新增的祖孙之间通信的方式 provide和inject是Vue中的两个相关功能&#xff0c;它们一起提供了一种祖孙组件之间共享数据的方式。父组件可以使用provide来提供数据&#xff0c;而子孙组件可以使用inject来接收这些数据。 二、使用 父组件中部分代码 <script&g…

SaleSmartly新增AI意图识别触发器!让客户享受更精准的自动化服务

AI意图识别技术是对话式AI中很重要的组成部分&#xff0c;通俗点来说就是一种可以识别用户在对话中表达的意图的技术。通过对大量数据的分析和学习&#xff0c;AI可以理解用户想要获得的信息&#xff0c;并根据这些信息来采取相应的行动或提供相应的响应。而在对话式AI中&#…

手机LiDAR-based激光雷达标定板提高无人汽车智能化程度

手机LiDAR-based 3D扫描和建模测试系统是一种利用激光雷达&#xff08;LiDAR&#xff09;技术进行三维扫描和模型创建的工具&#xff0c;它可以在手机上运行。这种测试系统可以用于各种应用&#xff0c;如地形测绘、建筑物建模、机器人视觉、无人驾驶汽车导航等。 手机LiDAR-ba…

云端援手:智能枢纽应对数字资产挑战 ——华为云11.11应用集成管理与创新专区优惠限时购

现新客3.96元起&#xff0c;下单有机会抽HUAWEI P60 Art 福利仅限双十一 机会唾手可得&#xff0c;立即行动&#xff01; 「有效管理保护应用与数据的同时实现高效互通」——华为云全力满足企业需求&#xff0c;推出全套「应用集成管理与创新」智能解决方案&#xff1a;华为云…

计蒜客T1654 数列分段(C语言实现)

【题目描述】对于给定的一个长度为n的正整数数列ai&#xff0c;现要将其分成连续的若干段&#xff0c;并且每段和不超过m&#xff08;可以等于m&#xff09;&#xff0c;问最少能将其分成多少段使得满足要求。 【输入格式】第一行包含两个正整数n&#xff0c;m&#xff0c;表示…

家电电器展示预约小程序的作用是什么

电器产品已经成为人们生活的必备品&#xff0c;如冰箱、电视机、洗衣机等&#xff0c;而这些产品的购买方式也很多&#xff0c;可以到线下门店购买&#xff0c;也可以到线上多个电商平台购买&#xff0c;如今互联网高速发展以及民众享受线上服务带来的便捷性&#xff0c;同时商…

修改docker默认数据目录

前言&#xff1a; docker默认数据目录是/var/lib/docker,根目录的存储空间有限&#xff0c;我们往往不能使用默认配置&#xff0c;需要创建空间相对较大的数据data目录 停止docker服务 systemctl stop docker 编辑配置文件 vi /etc/docker/daemon.json 增加选项 “graph”…