Canal笔记:安装与整合Springboot模式Mysql同步Redis

官方文档

https://github.com/alibaba/canal

使用场景

学习一件东西前,要知道为什么使用它。

1、同步mysql数据到redis

常规情况下,产生数据的方法可能有很多地方,那么就需要在多个地方中,都去做mysql数据同步到redis的处理,相对麻烦很多。
可以使用canal,对mysql进行集中,统一的处理。

概述

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

原理

MySQL主备复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relaylog 中事件,将数据变更反映它自己的数据

image.png

canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

架构

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)
  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

image.png

安装和准备

Centos7安装Canal

1、Mysql配置

开启binlog日志

如果是使用Linux安装的话,则直接找my.cnf,直接修改内容即可

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

docker安装

1、安装my.cnf文件

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2、修改docker-compose.yaml内容
配置挂载卷 前面路径为my.cnf的路径,/etc/mysql/conf.d的路径
image.png

3、查询是否成功

show variables like "%server_id%";

image.png

show variables like 'log_bin';

image.png

获取bin_log当前位置

show master status;

image.png
获取后,记录下来,然后不要动数据库了

创建canal数据库用户
这里可以使用
mysql -uroot -p登录进入设置,
或者直接可视化页面

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
2、Canal下载

下载
方式一:关注公众号 I am Walker 回复canal

方式二:https://github.com/alibaba/canal/releases?page=2
image.png

# 创建文件夹
mkdir /opt/env/canal
# 解压
tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/env/canal
3、配置文件修改

进入canal/conf/example/instance.properties
主要修改下列相关参数

# 数据库
canal.instance.master.address=127.0.0.1:3306
# bin log日志
canal.instance.master.journal.name=mysql-bin.000001
# bin log写入位置
canal.instance.master.position=157#数据库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456

之后进入 canal/bin 执行 ./startup.sh
查看是否启动
image.png
有CanalLauncher则代表ok,或者看日志也ok

场景

springboot整合

简单整合

1、依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>
2、配置文件
canal:# 服务地址serverAddress: localhost# 端口serverPort: 11111# 订阅 库 表subscrie: ".*\\..*"# batchSize: 100# 实例instance:- example

subscrie配置


全库全表	
connector.subscribe(".*\\..*")
指定库全表	
connector.subscribe("test\\..*")
单表
connector.subscribe("test.user")
多规则组合使用
connector.subscribe("test\\..*,test2.user1,test3.user2")
3、properties类
package com.walker.mybatisplus.canal;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import java.util.List;@Data
@Component
@ConfigurationProperties(value = "canal")
public class CanalProperties {private String serverAddress;private Integer serverPort;private String subcribe;private Integer batchSize;private List<String> instance;}
4、监听类编写
package com.walker.mybatisplus.canal;import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
@Component
public class CanalListener {@Autowiredprivate CanalProperties canalProperties;public static Map<String,Integer> NUM_MAP=new HashMap<>();/*** 可能会有多业务,不同的业务,应该有多个处理类,不要使用if等*/@PostConstructpublic void run() throws InterruptedException, InvalidProtocolBufferException {//创建Canal连接对象CanalConnector conn = CanalConnectors.newSingleConnector(new InetSocketAddress(canalProperties.getServerAddress(),canalProperties.getServerPort()),canalProperties.getInstance().get(0),null, null);while(true){//连接conn.connect();
//            监听的数据库和表conn.subscribe(canalProperties.getSubcribe());//回滚操作conn.rollback();//获取信息Message message = conn.getWithoutAck(canalProperties.getBatchSize());long id = message.getId();List<CanalEntry.Entry> entries = message.getEntries();if(id!=-1&&entries.size()>0){//处理数据messageProcess(entries);}else{//防止重复链接数据库Thread.sleep(1000);}//确认消费信息conn.ack(id);//释放连接conn.disconnect();}}private void messageProcess(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entries) {log.info("接收Entry:{}", entry);CanalEntry.Header header = entry.getHeader();//数据库String schemaName = header.getSchemaName();//表名String tableName = header.getTableName();//事件类型CanalEntry.EventType eventType = header.getEventType();//这里可以对数据库和表进行一个重新判断 虽然在subscribe已经定义,但是一般可以配置一个库,然后表的可能可以是全部表
//            对库进行判断if(!"walker_share".equals(schemaName)){continue;}//对表进行判断//这里只是一个案例,如果是实际场景,可以使用工厂模式去编写,不然会有很多的ifif("dish".equals(tableName)){//获取修改数据List<CanalEntry.RowData> rowDataList = getRowDataList(entry);//新增if(eventType.getNumber()==CanalEntry.EventType.INSERT_VALUE){log.info("新增事件");if(CollUtil.isEmpty(rowDataList)) continue;//模拟场景:获取新增的数据,并存储到redis中,这里是直接存储到Map中for (CanalEntry.RowData rowData : rowDataList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {//获取name的类型if("type".equals(column.getName())){//模拟redis  根据类型进行分类String key = column.getValue();NUM_MAP.put(key,NUM_MAP.getOrDefault(key,0)+1);log.info("NUM_MAP {}",NUM_MAP);continue;}}}}if(eventType.getNumber()==CanalEntry.EventType.UPDATE_VALUE){log.info("修改事件");}if(eventType.getNumber()==CanalEntry.EventType.DELETE_VALUE){log.info("删除事件");}}}}//获取row数据private List<CanalEntry.RowData> getRowDataList(CanalEntry.Entry entry) {CanalEntry.RowChange rowChange=null;try {//解析数据rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);}List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();return rowDatasList;}
}

相关类和配置

CanalConnector

image.png

CanalEntry

image.png

EntryType

image.png

Header

image.png

EventType

事件类型,可以根据事件类型去做不一样的操作
image.png

RowChange

image.png

获取数据

CanalEntry.RowChange rowChange=null;
try {//解析数据rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {throw new RuntimeException("解析出现异常 data:" + entry.toString(), e);
}
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
log.info("rowDatasList:{}",rowDatasList);
RowData

image.png

CanalEntry.Column

属性
image.png

问题

IOException: caching_sha2_password Auth failed

因为mysql8.0.3后身份检验方式为caching_sha2_password,但canal使用的是mysql_native_password,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed

参考文档

Java开发 - Canal的基本用法_canal java-CSDN博客
15分钟学会Canal安装与部署-CSDN博客
SpringBoot整合Canal1.1.6并同步数据到Redis(超详细和很多踩坑点)_canal同步数据到redis-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/200477.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在微信小程序中如何改变默认打开的页面

在微信小程序中&#xff0c;在我们编写页面的时候&#xff0c;可能会在重新渲染的时候导致页面跳转到默认打开的页面上&#xff0c;为了提升用户的一个体验&#xff0c;我们可以设置一些内容来修改小程序默认打开的页面&#xff0c;提升开发者的开发体验。 当我们打开一个微信…

nodejs微信小程序+python+PHP在线购票系统的设计与实现-计算机毕业设计推荐

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

某60区块链安全之JOP实战二学习记录

区块链安全 文章目录 区块链安全Jump Oriented Programming实战二实验目的掌握对EVM逆向能力实验环境实验工具实验原理实验内容Jump Oriented Programming实战二 实验步骤Jump Oriented Programming实战二 实验目的 学会使用python3的web3模块 学会分析以太坊智能合约中中Jum…

MyBatis 常见面试题

目录 1.MyBatis——概述1.1.什么是 ORM 框架&#xff1f;1.2.✨谈谈对 MyBatis 的理解。1.3.使用 MyBatis 相对于直接使用 SQL 有哪些优点&#xff1f;1.4.MyBatis 有什么优缺点&#xff1f;1.5.✨MyBatis 的分层结构是什么样的&#xff1f;1.6.✨MyBatis 的执行流程是什么样的…

机器学习实验一:线性回归

系列文章目录 机器学习实验一&#xff1a;线性回归机器学习实验二&#xff1a;决策树模型机器学习实验三&#xff1a;支持向量机模型机器学习实验四&#xff1a;贝叶斯分类器机器学习实验五&#xff1a;集成学习机器学习实验六&#xff1a;聚类 文章目录 系列文章目录一、实验…

12、pytest上下文友好的输出

官方实例 # content of test_assert2.py import pytestdef test_set_comparison():set1 set("1308")set2 set("8035")assert set1 set2def test_dict_comparison():dict_1 {name:陈畅,sex:男}dict_2 {name:赵宁,sex:女}assert dict_1 dict_2def tes…

Flask项目Day1,Flask常见第三方拓展包

拉项目 git clone https://gitee.com/hahaguai007/python-flask-mysql.git git clone 项目地址运行后即可获取项目 2.创建数据库 在MySQL中创建一个数据库&#xff0c;名字自己定&#xff0c;然后修改RealProject\settings.py里的SQLALCHEMY_DATABASE_URI&#xff0c;格式为 …

我在USC南加大学游戏:真实经历/录取作品集_RoSSo艺术留学

近日&#xff0c;美国Common App最新早申统计数据&#xff1a;早申人数与疫情前相比增加了41%&#xff01;专注于国际艺术教育的RoSSo也发现&#xff0c;2022-2023申请季提交早申的学生中&#xff0c;各类热门院校以及艺术留学专业申请人数均是“涨”声一片&#xff01; 图源官…

SpringBoot3-快速体验

1、pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.…

计算机寄存器是如何实现的

冯诺依曼体系 冯诺依曼体系为现代计算机的设计和发展奠定了基础&#xff0c;它的核心思想和原则在当今计算机体系结构中仍然被广泛采用和应用。所以只要谈论计算机的组成就离不开冯诺依曼体系 作为核心组成部分的CPU除了由运算器和控制器组成之外&#xff0c;还有一些寄存器…

阿里云环境下的配置DNS和SLB的几种实践示例

一、背景 对于大多中小型公司来说&#xff0c;生产环境大多是购买阿里云或者腾讯云等等&#xff0c;也就存在以下需求&#xff1a; 外网域名内网域名SLB容器化部署 特别是前两项&#xff0c;一定是跳不过的。容器化部署&#xff0c;现在非K8S莫属了。 既然是购买阿里云&…

NSS [NSSCTF 2022 Spring Recruit]babyphp

NSS [NSSCTF 2022 Spring Recruit]babyphp 考点&#xff1a;PHP特性 开局源码直接裸奔 <?php highlight_file(__FILE__); include_once(flag.php);if(isset($_POST[a])&&!preg_match(/[0-9]/,$_POST[a])&&intval($_POST[a])){if(isset($_POST[b1])&&…

Doris 集成 ElasticSearch

Doris-On-ES将Doris的分布式查询规划能力和ES(Elasticsearch)的全文检索能力相结合,提供更完善的OLAP分析场景解决方案: (1)ES中的多index分布式Join查询 (2)Doris和ES中的表联合查询,更复杂的全文检索过滤 1 原理 (1)创建ES外表后,FE会请求建表指定的主机,获取所有…

Qt 软件调试——windbg初篇(一)

在上一篇《Qt 软件调试&#xff08;二&#xff09;使用dump捕获崩溃信息》中我们结尾处提示大家先准备好windbg&#xff0c;windbg是非常强大的调试工具&#xff0c;对于我们进行代码调试和分析异常有着非常重要的意义。 在Qt软件调试这个系列的首篇&#xff0c;我们介绍了《Qt…

RPG项目01_层级设置

基于“RPG项目01_UI面板Game”&#xff0c; 找到狼人 添加组件&#xff0c;让狼人一定区域自动跟随主角进行攻击 解释&#xff1a;【烘培蓝色】因为如果什么都不做就会被烘培成蓝色对应的功能就是 可修改区域功能 当将区域设置成不可行走状态&#xff0c;则不为蓝色 烘培&…

手机备忘录在哪里找出来?

谈及手机备忘录&#xff0c;每一个品牌的手机大家都能找到很多&#xff0c;现在各大手机品牌都开发的有自带的手机备忘录&#xff0c;所以说&#xff1a;手机备忘录在哪里找出来并不难&#xff0c;即便是手机自带的没有备忘录工具&#xff0c;大家也是可以通过手机应用市场搜索…

在AWS EC2中部署和使用Apache Superset的方案

大纲 1 Superset部署1.1 启动AWS EC21.2 下载Superset Docker文件1.3 修改Dockerfile1.4 配置管理员1.5 结果展示1.6 检查数据库驱动1.7 常见错误处理 2 Glue&#xff08;可选参考&#xff09;3 IAM与安全组3.1 使用AWS Athena3.2 使用AWS RedShift或AWS RDS3.2.1 查看AWS Reds…

【电子取证篇】汽车取证数据提取与汽车取证实例浅析(附标准下载)

【电子取证篇】汽车取证数据提取与汽车取证实例浅析&#xff08;附标准下载&#xff09; 关键词&#xff1a;汽车取证&#xff0c;车速鉴定、声像资料鉴定、汽车EDR提取分析 汽车EDR一般记录车辆碰撞前后的数秒&#xff08;5s左右&#xff09;相关数据&#xff0c;包括车辆速…

基于openEuler20.03安装openGauss5.0.0及安装DBMind

基于openEuler20.03安装openGauss5.0.0及安装DBMind 一、环境说明二、安装部署三、问题及解决 一、环境说明 虚拟机&#xff1a;VirtualBox操作系统&#xff1a;openEuler20.3LTS &#xff08;x86&#xff09;数据库&#xff1a;openGauss5.0.0 (x86)DBMind&#xff1a;dbmind…

Pytest自动化测试数据驱动yaml/excel/csv/json

数据驱动 数据的改变从而驱动自动化测试用例的执行&#xff0c;最终引起测试结果的改变。简单说就是参数化的应用。 测试驱动在自动化测试中的应用场景&#xff1a; 测试步骤的数据驱动&#xff1b;测试数据的数据驱动&#xff1b;配置的数据驱动&#xff1b; 1、pytest结合…