阿里mysql 二进制_Mysql binlog 之阿里canal

1、What is Canal?

canal [kə'næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;

历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;

Github:https://github.com/alibaba/canal

2、工作原理

传统MySQL主从复制工作原理

91fb1300912bbf496eb37da5b5ef0f5f.png

从上层来看,复制分成三步:

MySQL的主从复制将经过如下步骤:

1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;

2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;

3、同时 master 主服务器为每个 I/O Thread 启动一个dump  Thread,用于向其发送二进制事件日志;

4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;

5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;

6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;

canal 工作原理

1、canal 模拟 MySQL slave 的交互协议,把自己伪装为 MySQL slave,向 MySQL master 发送dump 协议;

2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即canal );

3、canal 解析 binary log 对象 (原始数据为byte流)

ba526d41757c9074fed7da2979960732.png

8027f67e66334b6718283810a6a6e22f.png

3、Canal使用场景

Canal是基于MySQL变更日志增量订阅和消费的组件,可以使用在如下一些一些应用场景:

数据库实时备份

业务cache刷新

search build

价格变化等重要业务消息

带业务逻辑的增量数据处理

跨数据库的数据备份(异构数据同步),

例如mysql => oracle,mysql=>mongo,mysql =>redis,

mysql => elasticsearch等;

当前canal 主要是支持源端 MySQL(也支持mariaDB),版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x;Canal搭建环境

1、准备好MySQL运行环境;

2、开启 MySQL的binlog写入功能,配置 binlog-format 为 ROW 模式,my.cnf中配置如下:

[mysqld]

log-bin=mysql-bin #开启 binlog

binlog-format=ROW #选择 ROW 模式

server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复

3、授权canal连接MySQL账号具有作为MySQL slave的权限, 如果已有账户可直接 grant授权:

启动MySQL服务器;

登录mysql:./mysql -uroot -p -h127.0.0.1 -P3306

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

4、下载 canal部署程序

Wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

tar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal.deployer-1.1.4

5、配置修改

vim conf/example/instance.properties

主要是修改配置文件中与自己的数据库配置相关的信息;

6、启动Canal

./startup.sh

7、查看进程:

ps -ef | grep canal

8、查看 server 日志

cat logs/canal/canal.log

9、查看 instance 的日志

vi logs/example/example.log

10、关闭Canal

./stop.sh

canal server的默认端口号为:11111,如果需要调整的话,可以去到\conf目录底下的canal.properties文件中进行修改;

相关命令:

#是否启用了日志

show variables like 'log_bin';

#怎样知道当前的日志

show master status;

#查看mysql binlog模式

show variables like 'binlog_format';

#获取binlog文件列表

show binary logs;

#查看当前正在写入的binlog文件

show master status\G

#查看指定binlog文件的内容

show binlog events in 'mysql-bin.000002';

注意binlog日志格式要求为row格式;

Binlog的三种基本类型分别为:

ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但是会占用较多的空间,需要使用mysqlbinlog工具进行查看;

STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;

MIX模式比较灵活的记录,例如说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

启动了canal的server之后,便是基于java的客户端搭建了;

代码集成方式:

com.alibaba.otter

canal.client

1.1.4

package com.unwulian.search.engine.suggestion.service;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry.*;

import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;

import java.util.List;

/**

* canal测试

*

* @author shiye

* @create 2020-11-30 14:22

*/

public class CanalTest {

public static void main(String[] args) {

String ip = "192.168.2.165";

int port = 11111;

String destination = "example";

String username = "";

String password = "";

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(ip, port), destination, username, password);

try {

connector.connect();

connector.subscribe(".*\\..*");

//跳转到上次进行读取日志的地方

connector.rollback();

while (true) {

//获取指定数量的数据

Message message = connector.getWithoutAck(1);

long id = message.getId();

int size = message.getEntries().size();

if (id == -1 || size == 0) {

//如果没有获取到数据就睡眠疫苗

Thread.sleep(1000);

} else {

System.out.println("messge id:" + id);

printEntry(message.getEntries());

}

//提交确认

connector.ack(id);

// connector.rollback(batchId); // 处理失败, 回滚数据

}

} catch (Exception e) {

e.printStackTrace();

} finally {

connector.disconnect();

}

}

private static void printEntry(List entrys) {

for (Entry entry : entrys) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

continue;

}

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);

}

EventType eventType = rowChage.getEventType();

System.out.println(String.format("================> binlog日志偏移量[%s:%s] , 库名,表名[%s,%s] , 操作类型 : %s",

entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

eventType));

for (RowData rowData : rowChage.getRowDatasList()) {

if (eventType == EventType.DELETE) {

printColumn(rowData.getBeforeColumnsList());

} else if (eventType == EventType.INSERT) {

printColumn(rowData.getAfterColumnsList());

} else {

System.out.println("-------> before");

printColumn(rowData.getBeforeColumnsList());

System.out.println("-------> after");

printColumn(rowData.getAfterColumnsList());

}

}

}

}

private static void printColumn(List columns) {

for (Column column : columns) {

System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());

}

}

}

springboo集成canal# 阿里binlog canal配置

canal:

ip: 192.168.2.13   #192.168.2.165

subscribe: undev.t_bas_xxx1,undev.t_bas_xxx2#配置你要监听的表

port: 11111

destination: dev

username:

password:

package com.unwulian.search.engine.suggestion.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.context.annotation.Configuration;

import java.io.Serializable;

/**

* binlog canal的配置

*

* @author shiye

* @create 2020-07-17 19:30

*/

@Configuration

@ConfigurationProperties(prefix = "canal")

public class CanalConfig implements Serializable {

/**

* ip

*/

private String ip;

/**

* mq监听表

*/

private String subscribe;

/**

* 端口

*/

private int port;

/**

* 目的地

*/

private String destination;

/**

* 用户名

*/

private String username = "";

/**

* 密码

*/

private String password;

public String getSubscribe() {

return subscribe;

}

public void setSubscribe(String subscribe) {

this.subscribe = subscribe;

}

public String getIp() {

return ip;

}

public void setIp(String ip) {

this.ip = ip;

}

public int getPort() {

return port;

}

public void setPort(int port) {

this.port = port;

}

public String getDestination() {

return destination;

}

public void setDestination(String destination) {

this.destination = destination;

}

public String getUsername() {

return username;

}

public void setUsername(String username) {

this.username = username;

}

public String getPassword() {

return password;

}

public void setPassword(String password) {

this.password = password;

}

}

package com.unwulian.search.engine.suggestion.schedule;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;

import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;

import com.alibaba.otter.canal.protocol.Message;

import com.github.structlog4j.ILogger;

import com.github.structlog4j.SLoggerFactory;

import com.unwulian.search.engine.suggestion.config.CanalConfig;

import com.unwulian.search.engine.suggestion.service.CardService;

import com.unwulian.search.engine.suggestion.service.CommunityStructService;

import com.unwulian.search.engine.suggestion.service.HouseService;

import com.unwulian.search.engine.suggestion.service.RoomService;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

import java.util.List;

/**

* 项目启动的时候就初始化canal启动一个线程去监听canal server

*

* @author shiye

* @create 2020-11-30 15:11

*/

@Component

public class CanalTask implements InitializingBean {

private static final ILogger logger = SLoggerFactory.getLogger(CanalTask.class);

@Autowired

private CanalConfig canalConfig;

@Override

public void afterPropertiesSet() throws Exception {

/**

* 启动一下线程一直监听canal server

*/

new Thread(() -> {

logger.info("start Thread to listent canal success....");

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()),

canalConfig.getDestination(),

canalConfig.getUsername(),

canalConfig.getPassword());

connector.connect();

connector.subscribe(canalConfig.getSubscribe());

//跳转到上次进行读取日志的地方

connector.rollback();

try {

while (true) {

//获取指定数量的数据

Message message = connector.getWithoutAck(1);

long id = message.getId();

int size = message.getEntries().size();

if (id == -1 || size == 0) {

//如果没有获取到数据就睡眠1s

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

logger.error("sleep 1000ms error...." + e.getMessage());

}

} else {

//处理消息

//logger.info("messge id:" + id);

handlerEntry(message.getEntries());

}

//提交确认

connector.ack(id);

// connector.rollback(batchId); // 处理失败, 回滚数据

}

} finally {

//关闭

connector.disconnect();

}

}).start();

logger.info("start Thread to listent canal ....");

}

/**

* 处理消息

*

* @param entrys

*/

private void handlerEntry(List entrys) {

for (CanalEntry.Entry entry : entrys) {

if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {

//类型是事务开始事务结束不做处理

continue;

}

//库名

//String databaseName = entry.getHeader().getSchemaName();

//表名

String tableName = entry.getHeader().getTableName();

RowChange rowChage = null;

try {

rowChage = RowChange.parseFrom(entry.getStoreValue());

} catch (Exception e) {

logger.error("ERROR 数据转换异常, data:" + entry.toString(), e);

}

switch (tableName) {

case "t_bas_xxx1":

//进行你的业务处理

break;

case "t_bas_xxx2":

//进行你的业务处理

break;

default:

return;

}

}

}

private static void printColumn(List columns) {

for (CanalEntry.Column column : columns) {

System.out.println(column.getName() + " : " + column.getValue() + "    不做处理=" + column.getUpdated());

}

}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/442436.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java 基本类型 引用类型_Java中的基本类型和引用类型变量的区别

基本类型:基本类型自然不用说了,它的值就是一个数字,一个字符或一个布尔值。引用类型:是一个对象类型,值是什么呢?它的值是指向内存空间的引用,就是地址,所指向的内存中保存着变量所…

mysql 学生成绩等级_JSP+SSM+Mysql实现的学生成绩管理系统

项目简介本系统是基于JSPSSMMysql实现的学生成绩管理系统。主要实现的功能有教师管理、学生管理、课程管理、学生成绩管理。难度等级:中等技术栈编辑器Eclipse Version: 2020-03 (4.15.0)前端技术基础:htmlcssJavaScript框架:JQueryH-ui后端技…

java ajax 导出excel文件_springMVC(4)---生成excel文件并导出

springMVC(4)---生成excel文件并导出在开发过程中,需要将数据库中的数据以excel表格的方式导出。首先说明。我这里用的是Apache的POI项目,它是目前比较成熟的HSSF接口,用来处理Excel对象。其实POI不仅仅只能处理excel,它还可以处理…

java swing 模拟发牌_用java设计一个发牌程序

展开全部// 发牌程序。import java.awt.*;import java.awt.event.*;import javax.swing.*;public class CardBuffer //加互斥锁的缓冲区{private int value;private boolean isEmpty true; //value是否为空的信号量private int order0; //信号量,e68a8462616964757…

Java写文件导致io过高_161108、Java IO流读写文件的几个注意点

平时写IO相关代码机会挺少的,但却都知道使用BufferedXXXX来读写效率高,没想到里面还有这么多陷阱,这两天突然被其中一个陷阱折腾一下:读一个文件,然后写到另外一个文件,前后两个文件居然不一样?…

java接口测试框架搭建_接口自动化测试框架搭建

一、原理及特点参数放在XML文件中进行管理用httpClient简单封装一个httpUtils工具类测试用例管理使用了testNg管理,使用了TestNG参数化测试,通过xml文件来执行case。测试报告这里用到第三方的包ReportNG 项目组织用Maven二、准备使用工具:ecl…

java工厂模式 uml_深入浅出设计模式-简单工厂模式

模式定义简单工厂模式是属于创建型模式,又叫做静态工厂方法(Static Factory Method)模式,但不属于23种GOF设计模式之一。简单工厂模式定义了一个创建对象的类,由这个类来封装实例化对象的行为。设计原则遵循的原则:依赖倒置原则迪…

java技术难点_Java核心技术第四章----对象与类重难点总结

一、类之间的关系类和类之间的关系,耦合度从高到低:is -a。继承、实现has-a。组合、聚合、关联user-a。依赖。要求是:高内聚、低耦合。继承(“is-a”)继承(Inheritance),即“is-a”关系,是一种用于表示特殊与一般关系的…

java日志级别的作用_Java系统日志级别对性能的影响性

先介绍下java系统的日志日志框架:是一种日志接口,不负责具体的日志输出形式(有点类似于JDBC),可以灵活的切换日志输出形式。常见的日志框架有slf4j、jcl,只提供Logger、LoggerFactory等接口日志系统:是应用实际使用的日…

java用链表做学生系统_C语言链表实现学生管理系统

本文实例为大家分享了C语言链表实现学生管理系统的具体代码,供大家参考,具体内容如下#include#include#include#include#include#includeusing namespace std;typedef struct ndoe{char id[10];char name[10];char sex[3];char num[10];struct node *nex…

mysql 全文本检索的列_Mysql 全文本检索

mysql 全文索引注意 并非所有的引擎都支持 全文检索mysql最常用的引擎 INnodb 和 myisam 后者支持全文检索 前者不支持创建表的时候指定要检索列 CREATE TABLE TEST_FULLTEXT(note_id int not null auto_increment,note_text text null, primaty key(note_id),FULLTEXT(note_te…

java 的对象类用_java基础(第零篇)对象与类

前言:本文讲述java中对象与类的一些概念。包括对象与类的有关概念,类间五种关系,类的访问权限等。在java中,一切都可以用对象来描述,操作对象的标识符只不过是对象的一个引用,一个对象可以有多个引用&#…

buffer java nio_Java NIO深入理解Buffer(缓冲区)

前言Github:https://github.com/yihonglei/java-allProject:java-nio一 Buffer概述Java NIO中的Buffer用于和NIO通道进行交互。数据是从通道读入缓冲区,从缓冲区写入到通道中的。缓冲区本质上是一块可以写入数据,然后可以从中读取…

java robot键值_Java:使用Robot类模拟键盘, 以Alt码方式输出汉字

java.awt.Robot类Java提供java.awt.Robot类来模拟操作键盘和鼠标, 下面是一个简单的demopublic static void keyPressByInt(Robot r,int key, int time){r.keyPress(key);r.keyRelease(key);if (time > 0) {r.delay(time);}}public static void main(String[] args) throws …

java调用 火眼臻睛,火眼臻睛车牌识别SDK评测

【CPS中安网 cps.com.cn】CPS LAB总评:用专业角度解读产品--CPS评测中心对火眼臻睛车牌识别SDK进行了全面评测,火眼臻睛车牌识别SDK在综合识别率、车牌定位成功率、大角度下的识别率、夜间环境下的识别率、极端环境下的识别率、支持的最小车牌像素宽度等测试表现,都位于行业前列…

java各层级限流对比,面试官说:来谈谈限流-从概念到实现,一问你就懵逼了?...

后端服务的接口都是有访问上限的,如果外部qps或并发量超过了访问上限会导致应用瘫痪。所以一般都会对接口调用加上限流保护,防止超出预期的请求导致系统故障。从限流类型来说一般来说分为两种:并发数限流和qps限流,并发数限流就是限制同一时刻…

mysql and 和where,关于mysql:连接sql查询中where和and子句的区别

本问题已经有最佳答案,请猛点这里访问。下面两个SQL查询有什么区别和号根据以下两个测试结果速度更快(237比460)。据我所知,这是一个标准。。氧化镁不,有细微的差别,你不能说没有差别除了语法之外没有别的区别。虽然只有一个简短的…

matlab里输出恒压的逆变器,基于IGBT逆变器的异步电机变频调速系统的MATLAB仿真...

异步电机变频调速系统电路仿真模型如图(4)所示。直流电压不621V,逆变器为IGBT 的三相半桥逆变器,电机为异步电机模块,其主电路由直流电压源、逆变器和电机依次相连。图(4)变频调速系统控制部分,利用“Step”模块设定频率指令f1*&a…

php 获取京东交易账号,PHP爬虫爬取京东列表

这里使用到了一个php插件下面是源码simple_html_dom.phpdefined(IN_ECS);define(HDOM_TYPE_ELEMENT, 1);define(HDOM_TYPE_COMMENT, 2);define(HDOM_TYPE_TEXT, 3);define(HDOM_TYPE_ENDTAG, 4);define(HDOM_TYPE_ROOT, 5);define(HDOM_TYPE_UNKNOWN, 6);define(HDOM_QUOTE_DOU…

php dns刷新,Windows DNS缓存自动刷新

Windows DNS缓存自动刷新admin • 2018 年 09 月 04 日DNS(域名服务器)DNS(Domain Name Server)是进行域名和与之相对应的ip地址转换的服务器。DNS中保存了一张域名和与之相应的ip地址的表,以解析消息的域名。DNS轮训在统一主机添加多条A记录,这就是DNS轮…