canal 数据同步组件

canal 数据异构组件

为啥要使用这个组件?
在更新DB的时候不同步更新到redis,es等数据库中,时间太久,而且可能会存在同步失败的问题,因此引入canal去拉取DB的数据,再去更新到redis,es等数据库中,有失败重试和回滚等功能。
canal原理?
canal 伪装成salve向mysql发送dump协议,拿到备份数据binlog,去更新数据到redis,es等数据库中或者通过组装数据之后更新。canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据

canal 组件的使用

1.下载canal组件

下载地址canal组件下载地址
在我的资源中也有canal组件包
在这里插入图片描述
解压启动(我是windows版,双击startup.bat)

在这里插入图片描述

2.数据库配置

1.开启MySQL , 需要先开启 Binlog 写入功能

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2.授权 canal 作为mysql 的slave 的权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.项目引入jar包
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>
4.写canal监听数据工具类
package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}
5.简单例子使用测试

1.数据库更改user_id从0改为1,再从1改为0
2.查看canal监测的数据(canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6.进一步完善canal监听数据工具类,用于应用例子

1.加入监听器,项目启动时启动
2.使用线程去监听数据
3.替换掉system.out.print(),里面有锁,会阻塞,使用日志打印
4.处理canal监测到的数据

package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.next.dao.TrainNumberDetailMapper;
import com.next.service.TrainNumberService;
import com.next.service.TrainSeatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;/*** @desc 不要用system.out.print()里面有锁,会阻塞,用日志打印*/
@Service
@Slf4j
public class CanalSubscribe implements ApplicationListener<ContextRefreshedEvent> {@Resourceprivate TrainSeatService trainSeatService;@Resourceprivate TrainNumberService trainNumberService;//监听,启动的时候就开始调用此监听方法@Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {canalSubscribe();}private void canalSubscribe() {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;//使用线程new Thread(() -> {try {log.info("canal subscribe");connector.connect();connector.subscribe(".*\\..*");connector.rollback();while (true) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//没有取到数据继续safeSleep(100);continue;}try {log.info("new message,batchIds:{},size:{}", batchId, batchSize);//打印日志printEntry(message.getEntries());// 提交确认connector.ack(batchId);} catch (Exception e2) {log.error("canal data exception,batchIds:{}", batchId, e2);// 处理失败, 回滚数据connector.rollback(batchId);}}} catch (Exception e3) {log.error("canal subscribe exception", e3);safeSleep(1000);canalSubscribe();}}).start();}private void printEntry(List<CanalEntry.Entry> entrys) throws Exception{for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("RowChange.parse Exception , data:" + entry, e);}//更新类型-更新,删除,新增CanalEntry.EventType eventType = rowChage.getEventType();//数据库名String schemaName = entry.getHeader().getSchemaName();//表名String tableName = entry.getHeader().getTableName();log.info("name:[{},{}],eventType:{}",schemaName,tableName,eventType);for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {handleColumn(rowData.getBeforeColumnsList(), eventType, schemaName, tableName);} else {handleColumn(rowData.getAfterColumnsList(), eventType, schemaName, tableName);}}}}//处理canal监测到的数据private void handleColumn(List<CanalEntry.Column> columnsList, CanalEntry.EventType eventType, String schemaName, String tableName) throws Exception{if(schemaName.contains("12306_seat_")){//处理座位变更trainSeatService.handle(columnsList,eventType);}else if(tableName.equals("train_number")){//车次详情处理(实际上是车次信息变更之后才批量处理车次详情)trainNumberService.handle(columnsList,eventType);}else{log.info("drop data,no need care");}}private void safeSleep(int millis) {try {Thread.sleep(100);} catch (Exception e1) {}}}

处理canal监测到的数据(拿到改变的数据,放到实体类中,存到redis中)

package com.next.service;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.next.dao.TrainNumberMapper;
import com.next.model.TrainNumber;
import com.next.model.TrainSeat;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.List;@Service
@Slf4j
public class TrainSeatService {@Resourceprivate TrainNumberMapper trainNumberMapper;@Resourceprivate TrainCacheService trainCacheService;//处理座位,canal通过监听座位库,拿到改变的数据,放到实体类中public void handle(List<CanalEntry.Column> columns, CanalEntry.EventType eventType) {if (eventType != CanalEntry.EventType.UPDATE) {log.info("not update,no need care");return;}TrainSeat trainSeat = new TrainSeat();boolean isStatusUpdated = false;for (CanalEntry.Column column : columns) {//票的状态改变了才做下面的操作if (column.getName().equals("status")) {trainSeat.setStatus(Integer.parseInt(column.getValue()));if (column.getUpdated()) {isStatusUpdated = true;} else {break;}} else if (column.getName().equals("id")) {trainSeat.setId(Long.parseLong(column.getValue()));} else if (column.getName().equals("carriage_number")) {trainSeat.setCarriageNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("row_number")) {trainSeat.setRowNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("seat_number")) {trainSeat.setSeatNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("train_number_id")) {trainSeat.setTrainNumberId(Integer.parseInt(column.getValue()));} else if (column.getName().equals("ticket")) {trainSeat.setTicket(column.getValue());} else if (column.getName().equals("from_station_id")) {trainSeat.setFromStationId(Integer.parseInt(column.getValue()));} else if (column.getName().equals("to_station_id")) {trainSeat.setToStationId(Integer.parseInt(column.getValue()));}}if (!isStatusUpdated) {log.info("status not update,no need care");}log.info("train seat update,trainSeat:{}", trainSeat);/*** 数据存到redis* 1.指定座位被占:hash* cacheKey:车次_日期  D386_20231001* field: carriage_row_seat_fromStationId_toStationId* value: 0-空闲 1-占座** 2.每个座位详情剩余的座位数* cacheKey: 车次_日期_count D386_20231001_count* field: fromStationId_toStationId* value: 实际座位数**/TrainNumber trainNumber = trainNumberMapper.selectByPrimaryKey(trainSeat.getTrainNumberId());//放票if (trainSeat.getStatus() == 1) {trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()+ "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),"0");trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),1l);log.info("seat+1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);//占票} else if (trainSeat.getStatus() == 2) {trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()+ "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),"1");trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),-1l);log.info("seat-1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);} else {log.info("status update not 1 or 2,no need care");}}}

在这里插入图片描述

参考文档:canal使用说明文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/582720.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode刷题笔记之字符串

一、反转字符串 1. 344【反转字符串】 **题目&#xff1a;**编写一个函数&#xff0c;其作用是将输入的字符串反转过来。输入字符串以字符数组 s 的形式给出。不要给另外的数组分配额外的空间&#xff0c;你必须原地修改输入数组、使用 O(1) 的额外空间解决这一问题。代码&am…

python初试六

之前了解了&#xff1a; 创建Django项目 数据库 模板 表格提交 admin管理页面 上面的功能模块允许我们做出一个具有互动性的站点&#xff0c;但无法验证用户的身份。我们这次了解用户验证部分。通过用户验证&#xff0c;我们可以根据用户的身份&#xff0c;提供不同的服务。 …

TiDB SQL调优案例TiFlash

背景 早上收到某系统的告警tidb节点挂掉无法访问&#xff0c;情况十万火急。登录中控机查了一下display信息&#xff0c;4个TiDB、Prometheus、Grafana全挂了&#xff0c;某台机器hang死无法连接&#xff0c;经过快速重启后集群恢复&#xff0c;经排查后是昨天上线的某个SQL导…

openwrt源码编译

下载openwrt源码 git clone https://github.com/openwrt/chaos_calmer.git // 官方下载地址 当前我们基于15.05版本开发&#xff0c;如果开发者想用最新的OpenWRT系统&#xff0c;可以下载 https://github.com/openwrt/openwrt.git git clone https://github.com/Ying-Yun/o…

OpenGL 绘制Mesh数据(Qt)

文章目录 一、简介二、实现代码三、实现效果一、简介 Mesh数据的结构主要就是点与三角面片,因此本质上仍然是对三角面片进行绘制。这里我们借助VCG这个库实现对Mesh数据的读取,这个库相对简单轻巧,很方便使用。 二、实现代码 由于修改的部分很多,我们逐一进行解释一下: --…

Seata 中封装了四种分布式事务模式,分别是: AT 模式, TCC 模式, Saga 模式, XA 模式,

文章目录 seata概述Seata 中封装了四种分布式事务模式&#xff0c;分别是&#xff1a;AT 模式&#xff0c;TCC 模式&#xff0c;Saga 模式&#xff0c;XA 模式&#xff0c; 今天我们来聊聊seata seata 概述 在微服务架构下&#xff0c;由于数据库和应用服务的拆分&#xff0c…

计算机专业校招常见面试题目总结

博主面试岗位包括&#xff1a;java开发、软件测试、测试开发等岗位&#xff0c;基于之前经历的面试总结出的一些常见题目。仅供参考&#xff0c;互相学习&#xff01;&#xff01; 八股&#xff1a;java开发、测试、测开岗位 Java技术栈&#xff1a;Java基础、JVM、数据结构、…

【SAM系列】Auto-Prompting SAM for Mobile Friendly 3D Medical Image Segmentation

论文链接&#xff1a;https://arxiv.org/pdf/2308.14936.pdf 核心&#xff1a; finetune SAM,为了不依赖外部prompt&#xff0c;通过将深层的特征经过一个编-解码器来得到prompt embedding&#xff1b;finetune完之后做蒸馏

PTA-感染人数

设某住宿区域是一个nn的方阵&#xff0c;方阵中的每个小方格为一个房间&#xff0c;房间里可能住一个人&#xff0c;也可能空着。第一天&#xff0c;某些房间中住着的人得了一种高传染性的流感&#xff0c;以后每一天&#xff0c;得流感的人会使其邻居&#xff08;住在其上、下…

76 Python开发-内外网收集Socket子域名DNS

目录 Python开发相关知识点本篇文章涉及知识点演示案例:IP&Whois&系统指纹获取代码段-外网CDN&子域名&端口扫描&交互代码段-外网IP&计算机名&存活主机&端口扫描代码段-内网Py格式解析环境与可执行程序格式转换-Pyinstaller 涉及资源&#xff1…

git 学习 之一个规范的 commit 如何写

最好的话做一件完整的事情就提交一次

WPF 显示gif动态图

WPF显示gif动态图有以下几种方式&#xff1a; 使用Storyboard使用WpfAnimatedGif(NuGet包管理器安装WpfAnimatedGif)使用ImageAnimator使用Winform控件PictureBox使用MediaElement通过GifBitmapDecoder解析GIF图片&#xff0c;获取gif帧数和每一帧数据&#xff0c;然后通过时间…

一种删除 KubeSphere 中一直卡在 Terminating 的 Namespace--KubeSphere Logging System的简单方法

文章目录 一、问题提出二、删除方法1&#xff0c;获取kubesphere-logging-syste的详细信息json文件2&#xff0c;编辑kubesphere-logging-system.json3&#xff0c;执行清理命令 三、检查结果 一、问题提出 在使用 KubeSphere 的时候发现有一个日志服务KubeSphere Logging Sys…

ARM CCA机密计算软件架构之设备分配(Device Assignment)

这个指南的前几节展示了领域提供的执行环境,它与正常世界的Rich OS、Hypervisor和TrustZone完全隔离。领域可以在初始化时完全通过认证,以确保其初始内容,并确保它在基于RME的平台上运行。 在大多数操作情况下,任何领域软件执行都需要访问系统中可用的设备。默认情况下,系…

MySQL-长事务详解

您好&#xff0c;我是码农飞哥&#xff08;wei158556&#xff09;&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f4aa;&#x1f3fb; 1. Python基础专栏&#xff0c;基础知识一网打尽&#xff0c;9.9元买不了吃亏&#xff0c;买不了上当。 Python从入门到精…

YoloV8改进策略:基于自研的图注意力机制改进| 独家改进方法|图卷积和注意力融合模块

摘要 SE注意力机制是一种通过显式建模卷积特征的信道之间相互依赖性的方法,旨在提高网络产生的表示的质量。SE注意力机制包括两个步骤:Squeeze和Excitation。在Squeeze步骤中,通过全局平均池化操作将输入特征图压缩成一个向量,然后通过一个全连接层将其映射到一个较小的向…

HTML的学习记录

<br /> 标签在 HTML 页面中创换行符。 <hr /> 标签在 HTML 页面中创建水平线。 段落是通过 <p> 标签定义的。 浏览器会自动地在段落的前后添加空行。&#xff08;<p> 是块级元素&#xff09; 文本格式 <b>This text is bold</b>字体加粗 …

Mybatis 动态 SQL - if

MyBatis的一个最强大的特性一直都是其动态SQL能力。如果你有使用JDBC或任何类似框架的经验&#xff0c;你就会明白在条件下连接SQL字符串是多么痛苦&#xff0c;需要确保不忘记添加空格或在列名列表的末尾遗漏逗号。动态SQL处理起来非常痛苦。 尽管使用动态SQL可能不会很轻松&…

2024年最新软件测试必问面试题,面试前一天刷效果更佳

1.你为什么选择软件测试行业 因为之前有了解软件测试这个行业&#xff0c;觉得他的发展前景很好。 2.根据你以前的工作经验描述一下软件开发、测试过程&#xff0c;由那些角色负责&#xff0c;你做什么 要有架构师、开发经理、测试经理、程序员、测试员。我在里面主要是负责所…

HOJ 项目部署-前端定制 默认勾选显示标签、 在线编辑器主题和字号大小修改、增加一言功能 题目AC后礼花绽放

# 项目拉取地址&#xff1a; https://gitee.com/himitzh0730/hoj.git # 切换到hoj-vue目录执行以下命令 #安装依赖 npm install #运行服务 npm run serve #修改代码后构建项目到dist文件夹&#xff0c;到服务器docker-compose.yml中修改hoj-frontend文件映射即可 npm run build…