canal1.1.7实战

1.环境搭建

canal可以用来监听mysql数据库的变化,用来同步数据

先下载最新的部署版本,release地址:Releases · alibaba/canal · GitHub

包下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

 下载完后,在linux上新建一个canal文件夹,放入tar包解压: tar -zxvf canal.xxx.tar.gz

解压完后修改配置文件

查看conf/canal.properties,其中canal.port是客户端连接的端口,需要放开,canal.admin.user和canal.admin.passwd是客户端连接的账号

再打开conf/example/ instance.properties, master.address填数据库地址,dbUsername和dbPassword是数据库账号,flter.regex可以用来过滤数据库,默认是监听所有数据库,如果想监听db_开头的数据可以这么写db_.*\\..*,多个用逗号分隔

 修改完成后,进入bin目录,执行./startup.sh是启动,./stop.sh是关闭

进入logs/example,执行tail -f -n 300 example.log,看到以下输出说明搭建成功了

 2.客户端代码

引入依赖

  <dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.7</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.7</version></dependency></dependencies>

代码实现:

package cn.hollycloud.iplatform;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;/*** Unit test for simple App.*/
@Slf4j
public class CanalTest {private Map<String, String> errorMap = new HashMap<>();@Testpublic void testCanal() {initThread();}private void initThread() {new Thread(new Runnable() {@Overridepublic void run() {while (true) {try {initConnect();} catch (Exception e) {String key = "canal_connection_error";if (!hasSameError(key, e.getMessage())) {log.error("canal连接出错: {}", e);}}try {Thread.sleep(10000);} catch (InterruptedException e) {}}}}).start();}private void initConnect() {String canalIp = "localhost";int canalPort = 11111;String canalDestination = "example";String canalUsername = "admin";String canalPassword = "123456";CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp,canalPort), canalDestination, canalUsername, canalPassword);int batchSize = 200;try {connector.connect(); // 连接到canal serverconnector.subscribe("db_.*\\..*"); // 订阅指定的消息connector.rollback(); // 回滚到未进行ack 的地方log.info("canal连接成功");while (true) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {try {//未获取到消息则睡眠Thread.sleep(2000);} catch (InterruptedException e) {}} else {try {//处理消息log.info("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());handleMessage(message.getEntries());} catch (Exception e) {connector.rollback(batchId); // 处理失败, 回滚数据String key = "canal_sync_data_error";String errMsg = e.getMessage();if (StringUtils.isEmpty(errMsg)) errMsg = e.toString();if (!hasSameError(key, errMsg)) {log.error("同步数据出错: {}", e);}//休眠一段时间继续获取数据try {Thread.sleep(10000);} catch (InterruptedException ex) {ex.printStackTrace();}continue;}}connector.ack(batchId); // 提交确认}} finally {connector.disconnect();}}private boolean hasSameError(String key, String error) {String lastError = errorMap.get(key);if (Objects.equals(lastError, error)) {return true;}errorMap.put(key, error);return false;}private void handleMessage(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}//根据数据库名获取租户名String databaseName = entry.getHeader().getSchemaName();String tableName = entry.getHeader().getTableName();log.info("数据库: {}, 表名: {}", databaseName, tableName);// 获取类型CanalEntry.EntryType entryType = entry.getEntryType();// 获取序列化后的数据ByteString storeValue = entry.getStoreValue();if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// 反序列化数据CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);// 获取当前事件的操作类型CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE|| eventType == CanalEntry.EventType.DELETE) {// 获取数据集List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();// 遍历rowDataList,并打印数据集for (CanalEntry.RowData rowData : rowDataList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();// 变更前数据for (CanalEntry.Column column : beforeColumnsList) {log.info("变更前数据: name: {}, value: {}", column.getName(), column.getValue());}// 变更后数据for (CanalEntry.Column column : afterColumnsList) {log.info("变更后数据: name: {}, value: {}", column.getName(), column.getValue());}}}}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/149321.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端uniapp列表下拉到底部加载下一页列表【下拉加载页面/带源码/实战】

目录 一. 图片1.2. 二.list.vue三.uni-load-more.vue最后 一. 图片 1. 2. 二.list.vue <template><view><!--列表--><scroll-view scroll-y"true" class"scroll-Y" :style"height: scrollviewHigh px;" lower-threshol…

设计模式(二)-创建者模式(3)-抽象工厂模式

一、为什么需要抽象工厂模式&#xff1f; 在工厂模式中&#xff0c;我们需要定义多个继承于共同工厂抽象基类的工厂子类&#xff0c;这些子类负责创建一个对应的对象。工厂模式存在一个缺点就是&#xff1a;每次扩展新的工厂子类&#xff0c;就会增加系统的复杂度。 如果我们…

TP_Link WR886N 硬改闪存16M内存64M,刷入openwrt

一、换内存&#xff0c;拆闪存&#xff1a; 1、先原机开机试试是否功能正常&#xff1b; 2、拆机&#xff0c;比较难拆&#xff0c;容易坏外壳&#xff1b; 3、找到内存和闪存&#xff0c;用胶带把边上的小元件&#xff0c;电阻都贴好&#xff1b; 4、加助焊油&#xff0c;用风…

PC3329L DC-DC降压 10V-100V输入3A大流输出带EN功能实现零功耗只需极少元器件

1. PC3392L特性  通过使能脚关断实现零功耗  宽电压输入范围 10V 至 100V  最大输出电流 3A  集成功率 MOS 管  外围器件少  输出短路保护  温度保护  逐周期限流  输出电压灵活可靠  ESOP8 2. 描述 PC3392L 一款宽电压范围降压型 DC-DC 电源管…

C# 实现腾讯云多路直播流的云端混合录制

目录 应用场景 腾讯云直播和云点播 产品架构 混流显示示例 关键代码 API实现 小结 应用场景 在云考试或视频面试中&#xff0c;除了对考生、考官的实时音视频监控以防止作弊行为的发生以外&#xff0c;对直播流的音视频录制也尤为重要&#xff0c;可做为后期证据材料进…

斯坦福机器学习 Lecture2 (假设函数、参数、样本等等术语,还有批量梯度下降法、随机梯度下降法 SGD 以及它们的相关推导,还有正态方程)

假设函数定义 假设函数&#xff0c;猜一个 x->y 的类型&#xff0c;比如 y ax b&#xff0c;随后监督学习的任务就是找到误差最低的 a 和 b 参数 有时候我们可以定义 x0 1&#xff0c;来让假设函数的整个表达式一致统一 如上图是机器学习中的一些术语 额外的符号&#xf…

spring boot加mybatis puls实现,在新增/修改时,对某些字段进行处理,使用的@TableField()

1.先说场景&#xff0c;在对mysql数据库表数据插入或者更新时都得记录时间和用户id 传统实现有点繁琐&#xff0c;这里还可以封装一下公共方法。 2.解决方法&#xff1a; 2.1&#xff1a;使用aop切面编程&#xff08;记录一下&#xff0c;有时间再攻克&#xff09;。 2.2&…

宝塔站点配置

我这里使用的thinkphp 框架部署的

SaleSmartly新增AI意图识别触发器!让客户享受更精准的自动化服务

AI意图识别技术是对话式AI中很重要的组成部分&#xff0c;通俗点来说就是一种可以识别用户在对话中表达的意图的技术。通过对大量数据的分析和学习&#xff0c;AI可以理解用户想要获得的信息&#xff0c;并根据这些信息来采取相应的行动或提供相应的响应。而在对话式AI中&#…

手机LiDAR-based激光雷达标定板提高无人汽车智能化程度

手机LiDAR-based 3D扫描和建模测试系统是一种利用激光雷达&#xff08;LiDAR&#xff09;技术进行三维扫描和模型创建的工具&#xff0c;它可以在手机上运行。这种测试系统可以用于各种应用&#xff0c;如地形测绘、建筑物建模、机器人视觉、无人驾驶汽车导航等。 手机LiDAR-ba…

云端援手:智能枢纽应对数字资产挑战 ——华为云11.11应用集成管理与创新专区优惠限时购

现新客3.96元起&#xff0c;下单有机会抽HUAWEI P60 Art 福利仅限双十一 机会唾手可得&#xff0c;立即行动&#xff01; 「有效管理保护应用与数据的同时实现高效互通」——华为云全力满足企业需求&#xff0c;推出全套「应用集成管理与创新」智能解决方案&#xff1a;华为云…

家电电器展示预约小程序的作用是什么

电器产品已经成为人们生活的必备品&#xff0c;如冰箱、电视机、洗衣机等&#xff0c;而这些产品的购买方式也很多&#xff0c;可以到线下门店购买&#xff0c;也可以到线上多个电商平台购买&#xff0c;如今互联网高速发展以及民众享受线上服务带来的便捷性&#xff0c;同时商…

SSL证书对网站SEO的好处

随着网络安全意识的提高&#xff0c;越来越多的网站开始采用SSL证书来保护自己的数据传输过程。那么&#xff0c;SSL证书真的能为网站SEO带来好处吗&#xff1f;下面将为您分析这个问题。 加强用户体验和信任度 SSL证书不仅能确保数据传输的安全性&#xff0c;还能让客户感受…

『亚马逊云科技产品测评』活动征文|基于next.js搭建一个企业官网

『亚马逊云科技产品测评』活动征文&#xff5c;基于next.js搭建一个企业官网 授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 Developer Centre, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方…

MySQL数据库入门到大牛_基础_09_子查询(子查询分类方法;单行子查询,多行子查询;相关子查询)

前面的第三章到第八章中&#xff0c;我们讲的是查询&#xff0c;查询的基本结构已经进行了介绍&#xff0c;聚合函数学习完后已经介绍了查询语句的完整的执行流程。 子查询指一个查询语句嵌套在另一个查询语句内部的查询&#xff0c;这个特性从MySQL 4.1开始引入。本章也是查询…

3D建模基础教程:编辑多边形功能命令快捷方式

一、打开3D软件并创建新模型 首先&#xff0c;打开你的3D建模软件&#xff0c;比如Blender、Maya或3ds Max。然后&#xff0c;创建一个新的3D模型。你可以使用基本几何体来创建模型&#xff0c;也可以导入现有的模型。 二、进入编辑多边形模式 在主工具栏中&#xff0c;找到并…

使用JDK自带java.util.logging.Logger引起的冲突问题

现象&#xff1a; 应用代码如下&#xff1a; import javax.script.ScriptEngineManager;ScriptEngineManager manager new ScriptEngineManager(); manager.getEngineByName("JavaScript"); 在TongWeb8上运行出错&#xff0c;日志如下&#xff1a; Servlet.servi…

WinForms C# 导入和导出 CSV 文件 Spread.NET

使用 WinForms C# 和 VB.NET 导入和导出 CSV 文件 2023 年 11 月 17 日 使用 Spread.NET 直接在 .NET WinForms 应用程序中处理 CSV 文件。 Spread.NET可帮助您创建电子表格、网格、仪表板和表单。它包括一个强大的计算引擎&#xff0c;具有 450 多个函数以及导入和导出 Micros…

安全测试工具分为 SAST、DAST和IAST 您知道吗?

相信刚刚步入安全测试领域的同学都会发现&#xff0c;安全测试领域工具甚多&#xff0c;不知如何选择&#xff01;其实安全测试工具大致分为三类&#xff1a;SAST、DAST和IAST。本文就带大家快速的了解这三者的本质区别&#xff01; SAST &#xff08;Static Application Secu…

浪涌防护器件要选对,布局布线更重要!|深圳比创达电子EMC(下)

浪涌测试&#xff0c;作为最常见的EMC抗干扰测试项目之一&#xff0c;基本上是家用消费电子必测的项目&#xff1b;其测试目的是为了验证产品在承受外部的浪涌冲击时能否正常工作。 一、比创达整改案例 1) 背景&#xff1a; 某智能插座产品在浪涌测试&#xff0c;需要过2kV差…