mysql kafka binlog_为什么使用kafka处理mysql binlog?

在开发 Spark Streaming 的公共组件过程中,需要将 binlog 的数据(Array[Byte])转换为 Json 格式,供用户使用,本文提供一种转换的思路。另外我们会用到几个辅助类,为了行文流畅,我们将辅助类的定义放在文章的最后面。如果如果本文有讲述不详细,或者错误指出,肯请指出,谢谢对于 binlog 数据,每一次操作(INSERT/UPDATE/DELETE 等)都会作为一条记录写入 binlog 文件,但是同一条记录可能包含数据库中的几行数据(这里比较绕,可以看一个具体的例子)在数据库中,有 id, name 两个字段,其中 id 为主键,name 随意, age 随意。有两行数据如下idnameage

1john30

2john40

那么你进行操作

update table set age = 50 where name = john的时候,就会将两行的数据都进行更改,这两行更改的数据会在同一个 binlog 记录中,这一点会在后面的实现中有体现。

下面,我们给出具体的代码,然后对代码进行分析def desirializeByte(b: (String, Array[Byte])) : (String, String) = {val binlogEntry = BinlogEntryUtil.serializeToBean(b._2) //将 Array[Byte] 数据转换成 com.meituan.data.binlog.BinlogEntry 类,相关类定义参考附录val pkeys = binlogEntry.getPrimaryKeys.asScala //获取主键,这里的 asScala 将 Java 的 List 转换为 Scala 的 Listval rowDatas : List[BinlogRow] = binlogEntry.getRowDatas.asScala.toList //获取具体的信息val strRowDatas = rowDatas.map(a => { //将获取到的具体信息进行转换,这里主要是将没一条信息的内容,转换 [(K1:V1,K2:V2...Kn:Vn)] 的形式,方面后面进行 Json 化val b = a.getBeforeColumns.asScala //获取 beforColumnsval c = a.getAfterColumns.asScala //获取 afterColumnsval mb = b.map(d => (d._1, d._2.getValue)) //去掉所有不需要的信息,只保留每个字段的值val mc = c.map(c => (c._1, c._2.getValue)) //去掉所有不需要的信息,只保留每个字段的值(mb, mc) //返回转换后的 beforeColumns 和 afterColumns})

//下面利用 json4s 进行 Json 化

(binlogEntry.getEventType, compact("rowdata" -> strRowDatas.map{w => List("row_data" -> ("before" -> w._1.toMap) ~ ("after" -> w._2.toMap)) //这里的两个 toMap 是必要的,不然里层会变成 List,这个地方比较疑惑的是,//w._1 按理是 Map类型,为什么还需要强制转换成 Map//而且用 strRowDatas.foreach(x => println(s"${x._1} ${x._2}")打印的结果表名是 Map}))

desirializeByte 函数传入 topic 中的一条记录,返回参数自己确定,我这里为了测试,返回一个 (String, String) 的 Tuple,第一个字段表示该条记录的 EventType(Insert/Update/Delete 等),第二个字段为 Json 化后的数据。

BinlogEntryUtil.serilizeToBean 是一个辅助类,将 binlog 数据转化为一个 Java bean 类。

第 4 行,我们得到表对应的主键,第 5 行获得具体的数据第 6 行到第 12 行是 Json 化之前的辅助工作,将所有不需要的东西给剔除掉,只留下字段,以及字段对应的值。

第 14, 15 行就是具体的 Json 工作了(使用了 json4s 包进行 Json 化)这个过程中有一点需要注意的是,在 Json 化的时候,记得为 w._1 和 w._2 加 toMap 操作,不然会变成 List(很奇怪,我将 w._1 和 w._2 打印出来看,都是 Map 类型)或者你可以在第 7,8 行的末尾加上 .toMap 操作。这个我查了 API,进行了实验,暂时怀疑是在和 json4s 组合的时候,出现了问题,有待验证。

利用上述代码,我们可以得到下面这样 Json 化之后的字符串(我进行了排版,程序返回的 Json 串是不换行的){"rowdata":

[{"row_data":

{"before":{"param_name":"creator","param_value":"chenqiang05","horigindb_etl_id":"2532","utime":"2016-07-26 15:07:16","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"},"after":{"param_name":"creator","param_value":"chendayao","horigindb_etl_id":"2532","utime":"2016-08-01 10:32:01","id":"15122","status":"0","ctime":"2016-07-25 17:06:01"}

}

}]

}"

到这里,基本就完成了一种将 binlog 数据 Json 化的代码。

附录代码,由于这些代码是从其他工程里面抠出来的,可能读起来会不顺畅,还请见谅。

public static BinlogEntryserializeToBean(byte[] input) {BinlogEntrybinlogEntry = null;

Entryentry = deserializeFromProtoBuf(input);//从 protobuf 反序列化if(entry != null) {

binlogEntry = serializeToBean(entry);

}

return binlogEntry;

}

public static EntrydeserializeFromProtoBuf(byte[] input) {Entryentry = null;

try {

entry = Entry.parseFrom(input);

//com.alibaba.otter.canal.protocol.CanalEntry#Entry 类的方法,由 protobuf 生成} catch (InvalidProtocolBufferExceptionvar3) {logger.error("Exception:" + var3);

}

return entry;

}

//将 Entry 解析为一个 bean 类

public static BinlogEntryserializeToBean(Entryentry) {RowChangerowChange = null;

try {

rowChange = RowChange.parseFrom(entry.getStoreValue());} catch (Exceptionvar8) {

throw new RuntimeException("parse event has an error , data:" + entry.toString(), var8);}

BinlogEntrybinlogEntry = new BinlogEntry();String[] logFileNames = entry.getHeader().getLogfileName().split("\\.");String logFileNo = "000000";

if(logFileNames.length > 1) {

logFileNo = logFileNames[1];

}

binlogEntry.setBinlogFileName(logFileNo);binlogEntry.setBinlogOffset(entry.getHeader().getLogfileOffset());binlogEntry.setExecuteTime(entry.getHeader().getExecuteTime());binlogEntry.setTableName(entry.getHeader().getTableName());binlogEntry.setEventType(entry.getHeader().getEventType().toString());IteratorprimaryKeysList = rowChange.getRowDatasList().iterator();while(primaryKeysList.hasNext()) {

RowDatarowData = (RowData)primaryKeysList.next();BinlogRowrow = new BinlogRow(binlogEntry.getEventType());row.setBeforeColumns(getColumnInfo(rowData.getBeforeColumnsList()));row.setAfterColumns(getColumnInfo(rowData.getAfterColumnsList()));binlogEntry.addRowData(row);

}

if(binlogEntry.getRowDatas().size() >= 1) {BinlogRowprimaryKeysList1 = (BinlogRow)binlogEntry.getRowDatas().get(0);binlogEntry.setPrimaryKeys(getPrimaryKeys(primaryKeysList1));} else {

ArrayListprimaryKeysList2 = new ArrayList();binlogEntry.setPrimaryKeys(primaryKeysList2);}

return binlogEntry;

}

public class BinlogEntry implements Serializable {private String binlogFileName;

private long binlogOffset;

private long executeTime;

private String tableName;

private String eventType;

private List primaryKeys;

private List rowDatas = new ArrayList();}

public class BinlogRow implements Serializable {public static final String EVENT_TYPE_INSERT = "INSERT";public static final String EVENT_TYPE_UPDATE = "UPDATE";public static final String EVENT_TYPE_DELETE = "DELETE";private String eventType;

private Map beforeColumns;private Map afterColumns;}

public class BinlogColumn implements Serializable {private int index;

private String mysqlType;

private String name;

private boolean isKey;

private boolean updated;

private boolean isNull;

private String value;

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/467624.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Fantasia (Tarjan+树形DP)

Time Limit: 1000 ms Memory Limit: 256 MB Description 给定一张N个点、M条边的无向图 $G$ 。每个点有个权值Wi。 我们定义 $G_i$ 为图 $G$ 中删除第 $i$ 号顶点后的图。我们想计算 $G_1, G_2, ..., G_n$ 这N张图的权值。 对于任意一张图 $G$ ,它的权值是这样定义…

买书这件事

知识这种东西,你只有不断的补充才不会觉得匮乏,我每年都会买点书,我喜欢买书,但是却不看书,很多书籍我都是当成工具书来用。我记得在2015年的时候,我需要自己写专利,但是我对写专利这个事情一窍…

python logging mysql_Python 操作 MySQL 的正确姿势

欢迎大家关注腾讯云技术社区-博客园官方主页,我们将持续在博客园为大家推荐技术精品文章哦~作者:邵建永使用Python进行MySQL的库主要有三个,Python-MySQL(更熟悉的名字可能是MySQLdb),PyMySQL和SQLAlchemy。Python-MySQL资格最老&…

Linus Torvalds的最新电脑配置

大家好,祝大家6.1节日快乐最近Linus Torvalds 公布了他的电脑配置,有了这个配置清单之后,每个人都可以拥有一台和Linux之父一样的电脑,当你拥有了一台之后,你可以发个朋友圈,「我今天用Linus 的电脑解了一个…

马上就校招了,是要去实习还是复习?

昨天晚上,遇到一个特别纠结的同学,他现在收到一份实习的通知,他犹豫是要去实习呢还是继续在学校复习学习技术。实习的话可以增加自己校招的筹码,比如在和面试官侃大山的时候,可以把实习这件事情拿出来说,这…

mysql order by 索引名字_MySQL如何利用索引优化ORDER BY排序语句

MySQL索引通常是被用于提高WHERE条件的数据行匹配或者执行联结操作时匹配其它表的数据行的搜索速度。MySQL也能利用索引来快速地执行ORDER BY和GROUP BY语句的排序和分组操作。通过索引优化来实现MySQL的ORDER BY语句优化:1、ORDER BY的索引优化。如果一个SQL语句形…

YAML/Properties配置文件与Spring Boot(转)

多年来,Java开发人员依赖于属性文件或xml文件来指定应用程序配置。在企业应用程序中,人们可以为每个环境(如开发,分段和生产)创建单独的文件,以定义相应环境的属性。但是,通过Spring引导&#x…

嵌入式杂谈之makefile补充

我看了下自己的文章库存,好像还没有一篇关于Makefile的文章,所以这篇刚好可以弥补自己的缺失。makefile预定义变量预定义变量即系统自带的变量预定义变量作用AR库文件维护程序的名称,默认为arAS汇编程序的名称,默认为asCCc编译器的…

Weex 解决Print: Entry, :CFBundleIdentifier, Does Not Exist 错误方法

一、原因 导致这个错误的原因是我们的/user/你的用户名/.rncache文件中 boost_1_63_0.tar.gz文件内容不完整导致 如果你也玩过React-native框架,在0.45版本之后也会出现这个错误。 解决方法: 1、重新下载一个boost_1_63_0.tar.gz文件,替换它&…

mysql表单查询_表单查询实例

[TOC]### 1.查找部门30中员工的详细信息。select * from emp where deptno 30;### 2.找出从事clerk工作的员工的编号、姓名、部门号。select empno,ename,deptno from emp where job clerk;### 3.检索出奖金多于基本工资的员工信息。select * from emp where comm > sal;#…

安卓9.0马达框架分析

前言最近需要将之前的一些驱动接口转为安卓标准接口,方便上层应用或者第三方应用去适配。这篇文章先从简单的马达框架入手进行讲解。正文整个马达框架比较简单,安卓官方已经帮我们实现了framework到HAL层,我们需要实现的就只有驱动层。这篇文…

PYQT4 Python GUI 编写与 打包.exe程序

工作中需要开发一个小工具,简单的UI界面可以很好的提高工具的实用性,由此开启了我的第一次GUI开发之旅,下面将自己学习的心得记录一下,也做为学习笔记吧!!! 参考:http://www.qaulau.…

在MySQL中以下属于ddl语句的_ddl语言(以下哪些命令是ddl语句)

DDL一般指数据定义语言。数据库模式定义语言DDL(Data Definition Language),是用于描述数据库中要存储的现实世界实体的语言。常见的DDL语句:创建数据库.dml触发器 就是普通的 insert / update / delete 触发器。ddl触发器 就是一些特有的 ddl 语句的触发…

你知道嵌入式,那你看过这个吗?

大家好,因为最近各种原因,我身边的很多同事都转行摆地摊了,可能因为那是一份自由的职业,摆地摊可以从事的范围很广,也不用起早贪黑了,而且收入并不低。也是因为这样,很多嵌入式方面的岗位越来越…

mysql把select结果存到变量中_mysql实例 select into保存到变量的例子

本节继续分享mysql中使用select into保存查询结果到变量中的实例。代码:mysql>mysql> CREATE TABLE Employee( //创建雇员表-> id int,-> first_name VARCHAR(15),-> last_name VARCHAR(15),-> start_date DATE,-> end_date DATE,-> salary…

mvc一对多模型表单的快速构建

功能需求描述 Q:在实际的开发中,经常会遇到一个模型中包含有多个条目的表单。如何将数据提交到后台? A: 以数组的形式提交到后台就Ok了(真的那么简单么,如果再嵌套一层呢?) A2:拆分多个模型,映射就没啥问题…

asp删除mysql_asp php 清空access mysql mssql数据库的代码

1、ASP清空ACCESSDim Rs,ConnSet ConnServer.CreateObject("Adodb.Connection")Conn.Open "ProviderMicrosoft.Jet.OLEDB.4.0;Data Source"&server.mappath("readlog.mdb")Set RsConn.OpenSchema(20)Do While not Rs.EofIf Ucase(Rs("T…

c语言中 if(x) 、if(0) 、if(1)

解释if 语句里面包含真和非真&#xff0c;但是如果我们没有写清楚真和非真的话&#xff0c;会如何呢&#xff1f;if(x)相当于if(x ! 0)如果是指针的话&#xff0c;相当于if(x ! NULL)而if(1)相当于if(1 ! 0)还有if(0)相当于if(0 ! 0)举个例子#include<stdio.h> int main(…

接口总结

1.接口定义&#xff1a; ①Java接口是一系列方法的声明&#xff0c;是一些抽象的集合。 ②一个接口只有抽象方法没有方法的实现&#xff0c;因此这些方法可以在不同的地方被不同的类实现&#xff0c;而这些实现可以具有不同的行为&#xff08;功能&#xff09;。 ③简单地说&am…

mysql concat $_mysql concat 的诡异问题

在存储过程中&#xff0c;mysql 中的 concat 函数&#xff0c;总是出现问题&#xff0c;不知道是毛&#xff1f;DROP PROCEDURE IF EXISTS TestPro;CREATE PROCEDURE TestPro()BEGIN-- SET tableName "_aaaa,_bbbb,_cccc,_dddd";SET tableName "_aaaaa,_bbbb,…