DB 数据同步到数据仓库的架构与实践

背景

 

在数据仓库建模中,未经任何加工处理的原始业务层数据,我们称之为ODS(Operational Data Store)数据。在互联网企业中,常见的ODS数据有业务日志数据(Log)和业务DB数据(DB)两类。对于业务DB数据来说,从MySQL等关系型数据库的业务数据进行采集,然后导入到Hive中,是进行数据仓库生产的重要环节。

 

如何准确、高效地把MySQL数据同步到Hive中?一般常用的解决方案是批量取数并Load:直连MySQL去Select表中的数据,然后存到本地文件作为中间存储,最后把文件Load到Hive表中。这种方案的优点是实现简单,但是随着业务的发展,缺点也逐渐暴露出来:

 

  • 性能瓶颈:随着业务规模的增长,Select From MySQL -> Save to Localfile -> Load to Hive这种数据流花费的时间越来越长,无法满足下游数仓生产的时间要求。

  • 直接从MySQL中Select大量数据,对MySQL的影响非常大,容易造成慢查询,影响业务线上的正常服务。

  • 由于Hive本身的语法不支持更新、删除等SQL原语,对于MySQL中发生Update/Delete的数据无法很好地进行支持。

 

为了彻底解决这些问题,我们逐步转向CDC(Change Data Capture)+ Merge的技术方案,即实时Binlog采集 + 离线处理Binlog还原业务数据这样一套解决方案。Binlog是MySQL的二进制日志,记录了MySQL中发生的所有数据变更,MySQL集群自身的主从同步就是基于Binlog做的。

 

本文主要从Binlog实时采集和离线处理Binlog还原业务数据两个方面,来介绍如何实现DB数据准确、高效地进入数仓。

 

 

整体架构

 

 

整体的架构如上图所示。在Binlog实时采集方面,我们采用了阿里巴巴的开源项目Canal,负责从MySQL实时拉取Binlog并完成适当解析。Binlog采集后会暂存到Kafka上供下游消费。整体实时采集部分如图中红色箭头所示。

 

离线处理Binlog的部分,如图中黑色箭头所示,通过下面的步骤在Hive上还原一张MySQL表:

 

  1. 采用Linkedin的开源项目Camus,负责每小时把Kafka上的Binlog数据拉取到Hive上。

  2. 对每张ODS表,首先需要一次性制作快照(Snapshot),把MySQL里的存量数据读取到Hive上,这一过程底层采用直连MySQL去Select数据的方式。

  3. 对每张ODS表,每天基于存量数据和当天增量产生的Binlog做Merge,从而还原出业务数据。

 

我们回过头来看看,背景中介绍的批量取数并Load方案遇到的各种问题,为什么用这种方案能解决上面的问题呢?

 

  • 首先,Binlog是流式产生的,通过对Binlog的实时采集,把部分数据处理需求由每天一次的批处理分摊到实时流上。无论从性能上还是对MySQL的访问压力上,都会有明显地改善。

  • 第二,Binlog本身记录了数据变更的类型(Insert/Update/Delete),通过一些语义方面的处理,完全能够做到精准的数据还原。

 

 

Binlog实时采集

 

对Binlog的实时采集包含两个主要模块:一是CanalManager,主要负责采集任务的分配、监控报警、元数据管理以及和外部依赖系统的对接;二是真正执行采集任务的Canal和CanalClient。

 

 

当用户提交某个DB的Binlog采集请求时,CanalManager首先会调用DBA平台的相关接口,获取这一DB所在MySQL实例的相关信息,目的是从中选出最适合Binlog采集的机器。然后把采集实例(Canal Instance)分发到合适的Canal服务器上,即CanalServer上。在选择具体的CanalServer时,CanalManager会考虑负载均衡、跨机房传输等因素,优先选择负载较低且同地域传输的机器。

 

CanalServer收到采集请求后,会在ZooKeeper上对收集信息进行注册。注册的内容包括:

 

  • 以Instance名称命名的永久节点。

  • 在该永久节点下注册以自身ip:port命名的临时节点。

 

这样做的目的有两个:

 

  • 高可用:CanalManager对Instance进行分发时,会选择两台CanalServer,一台是Running节点,另一台作为Standby节点。Standby节点会对该Instance进行监听,当Running节点出现故障后,临时节点消失,然后Standby节点进行抢占。这样就达到了容灾的目的。

  • 与CanalClient交互:CanalClient检测到自己负责的Instance所在的Running CanalServer后,便会进行连接,从而接收到CanalServer发来的Binlog数据。

 

对Binlog的订阅以MySQL的DB为粒度,一个DB的Binlog对应了一个Kafka Topic。底层实现时,一个MySQL实例下所有订阅的DB,都由同一个Canal Instance进行处理。这是因为Binlog的产生是以MySQL实例为粒度的。CanalServer会抛弃掉未订阅的Binlog数据,然后CanalClient将接收到的Binlog按DB粒度分发到Kafka上。

 

 

离线还原MySQL数据

 

完成Binlog采集后,下一步就是利用Binlog来还原业务数据。首先要解决的第一个问题是把Binlog从Kafka同步到Hive上。

 

 

Kafka2Hive

 

整个Kafka2Hive任务的管理,在美团数据平台的ETL框架下进行,包括任务原语的表达和调度机制等,都同其他ETL类似。而底层采用LinkedIn的开源项目Camus,并进行了有针对性的二次开发,来完成真正的Kafka2Hive数据传输工作。

 

 

对Camus的二次开发

 

Kafka上存储的Binlog未带Schema,而Hive表必须有Schema,并且其分区、字段等的设计,都要便于下游的高效消费。对Camus做的第一个改造,便是将Kafka上的Binlog解析成符合目标Schema的格式。

 

对Camus做的第二个改造,由美团的ETL框架所决定。在我们的任务调度系统中,目前只对同调度队列的任务做上下游依赖关系的解析,跨调度队列是不能建立依赖关系的。而在MySQL2Hive的整个流程中,Kafka2Hive的任务需要每小时执行一次(小时队列),Merge任务每天执行一次(天队列)。而Merge任务的启动必须要严格依赖小时Kafka2Hive任务的完成。

 

为了解决这一问题,我们引入了Checkdone任务。Checkdone任务是天任务,主要负责检测前一天的Kafka2Hive是否成功完成。如果成功完成了,则Checkdone任务执行成功,这样下游的Merge任务就可以正确启动了。

 

 

Checkdone的检测逻辑

 

Checkdone是怎样检测的呢?每个Kafka2Hive任务成功完成数据传输后,由Camus负责在相应的HDFS目录下记录该任务的启动时间。Checkdone会扫描前一天的所有时间戳,如果最大的时间戳已经超过了0点,就说明前一天的Kafka2Hive任务都成功完成了,这样Checkdone就完成了检测。

 

此外,由于Camus本身只是完成了读Kafka然后写HDFS文件的过程,还必须完成对Hive分区的加载才能使下游查询到。因此,整个Kafka2Hive任务的最后一步是加载Hive分区。这样,整个任务才算成功执行。

 

每个Kafka2Hive任务负责读取一个特定的Topic,把Binlog数据写入original_binlog库下的一张表中,即前面图中的original_binlog.db,其中存储的是对应到一个MySQL DB的全部Binlog。

 

 

上图说明了一个Kafka2Hive完成后,文件在HDFS上的目录结构。假如一个MySQL DB叫做user,对应的Binlog存储在original_binlog.user表中。ready目录中,按天存储了当天所有成功执行的Kafka2Hive任务的启动时间,供Checkdone使用。每张表的Binlog,被组织到一个分区中,例如userinfo表的Binlog,存储在table_name=userinfo这一分区中。每个table_name一级分区下,按dt组织二级分区。图中的xxx.lzo和xxx.lzo.index文件,存储的是经过lzo压缩的Binlog数据。

 

Merge

 

Binlog成功入仓后,下一步要做的就是基于Binlog对MySQL数据进行还原。Merge流程做了两件事,首先把当天生成的Binlog数据存放到Delta表中,然后和已有的存量数据做一个基于主键的Merge。Delta表中的数据是当天的最新数据,当一条数据在一天内发生多次变更时,Delta表中只存储最后一次变更后的数据。

 

把Delta数据和存量数据进行Merge的过程中,需要有唯一键来判定是否是同一条数据。如果同一条数据既出现在存量表中,又出现在Delta表中,说明这一条数据发生了更新,则选取Delta表的数据作为最终结果;否则说明没有发生任何变动,保留原来存量表中的数据作为最终结果。Merge的结果数据会Insert Overwrite到原表中,即图中的origindb.table。

 

 

Merge流程举例

 

下面用一个例子来具体说明Merge的流程。

 

 

数据表共id、value两列,其中id是主键。在提取Delta数据时,对同一条数据的多次更新,只选择最后更新的一条。所以对id=1的数据,Delta表中记录最后一条更新后的值value=120。Delta数据和存量数据做Merge后,最终结果中,新插入一条数据(id=4),两条数据发生了更新(id=1和id=2),一条数据未变(id=3)。

 

默认情况下,我们采用MySQL表的主键作为这一判重的唯一键,业务也可以根据实际情况配置不同于MySQL的唯一键。

 

上面介绍了基于Binlog的数据采集和ODS数据还原的整体架构。下面主要从两个方面介绍我们解决的实际业务问题。

 

 

实践一:分库分表的支持

 

随着业务规模的扩大,MySQL的分库分表情况越来越多,很多业务的分表数目都在几千个这样的量级。而一般数据开发同学需要把这些数据聚合到一起进行分析。如果对每个分表都进行手动同步,再在Hive上进行聚合,这个成本很难被我们接受。因此,我们需要在ODS层就完成分表的聚合。

 

 

首先,在Binlog实时采集时,我们支持把不同DB的Binlog写入到同一个Kafka Topic。用户可以在申请Binlog采集时,同时勾选同一个业务逻辑下的多个物理DB。通过在Binlog采集层的汇集,所有分库的Binlog会写入到同一张Hive表中,这样下游在进行Merge时,依然只需要读取一张Hive表。

 

第二,Merge任务的配置支持正则匹配。通过配置符合业务分表命名规则的正则表达式,Merge任务就能了解自己需要聚合哪些MySQL表的Binlog,从而选取相应分区的数据来执行。

 

这样通过两个层面的工作,就完成了分库分表在ODS层的合并。

 

这里面有一个技术上的优化,在进行Kafka2Hive时,我们按业务分表规则对表名进行了处理,把物理表名转换成了逻辑表名。例如userinfo123这张表名会被转换为userinfo,其Binlog数据存储在original_binlog.user表的table_name=userinfo分区中。这样做的目的是防止过多的HDFS小文件和Hive分区造成的底层压力。

 

 

实践二:删除事件的支持

 

Delete操作在MySQL中非常常见,由于Hive不支持Delete,如果想把MySQL中删除的数据在Hive中删掉,需要采用“迂回”的方式进行。

 

对需要处理Delete事件的Merge流程,采用如下两个步骤:

 

  • 首先,提取出发生了Delete事件的数据,由于Binlog本身记录了事件类型,这一步很容易做到。将存量数据(表A)与被删掉的数据(表B)在主键上做左外连接(Left outer join),如果能够全部join到双方的数据,说明该条数据被删掉了。因此,选择结果中表B对应的记录为NULL的数据,即是应当被保留的数据。

  • 然后,对上面得到的被保留下来的数据,按照前面描述的流程做常规的Merge。

 

 

 

总结与展望

 

作为数据仓库生产的基础,美团数据平台提供的基于Binlog的MySQL2Hive服务,基本覆盖了美团内部的各个业务线,目前已经能够满足绝大部分业务的数据同步需求,实现DB数据准确、高效地入仓。在后面的发展中,我们会集中解决CanalManager的单点问题,并构建跨机房容灾的架构,从而更加稳定地支撑业务的发展。

 

本文主要从Binlog流式采集和基于Binlog的ODS数据还原两方面,介绍了这一服务的架构,并介绍了我们在实践中遇到的一些典型问题和解决方案。希望能够给其他开发者一些参考价值,同时也欢迎大家和我们一起交流。

转载于:https://www.cnblogs.com/dadadechengzi/p/10100319.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/366316.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java log.error_Logger.error打印错误异常的详细堆栈信息

一、问题场景使用Logger.error方法时只能打印出异常类型,无法打印出详细的堆栈信息,使得定位问题变得困难和不方便。二、先放出结论Logger类下有多个不同的error方法,根据传入参数的个数及类型的不同,自动选择不同的重载方法。当e…

笔记本电脑怎么清理灰尘_手机声音越用越小怎么办?一段黑科技音波就能清理扬声器灰尘...

大家好,欢迎收看科技狐,我是小狐。我们都知道,随着手机的使用时间越来越长,手机扬声器里面会积赞一些灰尘。因此手机的声音就会变得越来越小。有时候连电话铃声都听不清楚,说实话我就是这个样子,为此我困扰…

Java方法中的参数太多,第8部分:工具

在我的系列文章的前七篇文章中,有关处理Java方法中期望的参数过多的内容集中在减少方法或构造函数期望的参数数量的替代方法上。 在本系列的第八篇文章中,我将介绍一些工具,这些工具可帮助您确定可能存在过多参数的情况,并在出现这…

predict函数 R_学习|R语言做机器学习的常用函数总结

预测函数:predict() type"prob"判别该量度的昆虫归类为A、B和C的概率;type"response":判别该量度的昆虫的类别;预测分类的概率的函数predict(…, type)参数type:R语音里面不同模型,参数…

微信小程序:动画(Animation)

简单总结一下微信动画的实现及执行步骤。 一、实现方式 官方文档是这样说的:①创建一个动画实例 animation。②调用实例的方法来描述动画。③最后通过动画实例的 export 方法导出动画数据传递给组件的 animation 属性。 因为小程序是数据驱动的,给这句话…

SSM+solr 通过商品搜索学习solr的简单使用

学习了一下https://github.com/TyCoding/ssm-redis-solr这个github上的solr搜索功能,现在来记录一下。 我的理解就是solr有点类似于数据库,但它是有索引的数据库,按很多字段建立索引,可能是b树或者散列索引,然后就能够…

可以使用中文作为变量名_次氯酸可以作为伤口消毒使用吗?

次氯酸可以作为伤口消毒使用吗?次氯酸在经过2020年的洗礼,已然成为常态化,它对于人体是否有害,也是人们关注的焦点。对于那些还不太了解次氯酸的群体做一下简短科普。什么是次氯酸?次氯酸(HCIO)…

tomcat启动java项目_Java web项目启动Tomcat报错解决方案

点击运行项目时显示 A Java Exception has occurred.Starting Tomcat v9.0 Server at localhost has oncountered a problem.Server Tomcat v9.0 Server at localhost failed tostart.并显示以下两个弹框同时控制台报错org.apache.catalina.startup.Bootstraporg.apache.catali…

matlab 从 excel读取 日期_MATLAB批量修改文件名和选择性复制/剪切文件

今天解决的问题:1、如何利用MATLAB批量修改文件名?(前面写过一次bat命令法,这个应该也算一次改进,程序的初衷是想将Smartsolo导出的文件名批量修改为以炮点桩号为文件名)2、如何利用MATLAB选择性批量复制/剪切文件?(程…

CODE[VS] 1860 最大数 1998年NOIP全国联赛提高组

题目描述 Description设有n个正整数(n≤20),将它们联接成一排,组成一个最大的多位整数。 输入描述 Input Description第一行一个正整数n。 第二行n个正整数,空格隔开。 输出描述 Output Description连接成的多位数。…

您基于JEE的Web项目的结构是什么?

在本文中,我将主要与JSF讨论基于Web的项目的各种组织结构。 开始新项目时,首先想到的是如何组织Java包? 想象一下,您开发了一个基于Web的用户和组管理系统。 很长时间以来,我使用以下Java包结构来将Bean类与模型类分开…

自定义scoll样式

使用伪类自定义scroll样式 效果&#xff1a; 代码&#xff1a; <!DOCTYPE html> <html><head><meta charset"utf-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0,maximum-scale1.0,minimum1.0,…

关于常用的编码工具如何引入jar包

myeclipse和eclipse&#xff08;差不多&#xff09;引入jar包&#xff1a; 普通项目&#xff1a; 1.对准你的项目创建一个文件夹名字尽量命名成lib&#xff08;注意要和src平级&#xff0c;不要在src下创建文件夹&#xff09;。 2.将下载好的依赖放到lib文件夹下&#xff0c; 3…

win10商店打不开_win10自带的照片查看器打不开的修复方法

我们知道win10或win7等系统都自带有默认的照片查看器&#xff0c;安装好系统后&#xff0c;我们再不用安装第三方看图软件来查看照片了&#xff0c;给我们玩电脑带来了极大的方便。但有些朋友近来向我求教照片查看器打不开&#xff0c;或打开很慢不正常的问题。下面我来跟大家介…

休眠事实:访存策略的重要性

在使用ORM工具时&#xff0c;每个人都承认数据库设计和实体到表映射的重要性。 这些方面引起了很多关注&#xff0c;而诸如获取策略之类的事情可能只是推迟了。 我认为&#xff0c;不应将实体获取策略与实体映射设计分开&#xff0c;因为除非经过适当设计&#xff0c;否则它可…

自定义checkbox样式

通过选中时添加背景图片自定义CheckBox样式 效果&#xff1a; CSS样式&#xff1a; <style type"text/css>label {width: 10%;display: flex;display: -webkit-flex;display: -moz-flex;flex-direction: row;justify-content: center;align-items: center;}label i…

安装步骤

1、安装node&#xff0c;安装全局webpack&#xff0c;npm init 生成package.json文件全局打包命令webpack b.js -o bundle.js旧版本的是webpack b.js bundle.js2、npm install webpack --save-dev 引入本项目webpack&#xff0c;package.json文件中会生成对应的webpack版本号…

fpu测试_I510400性能及温度测试详解

5月20号&#xff0c;INTEL将正式销售十代民用桌面级处理器&#xff0c;此次上市的型号相对9代与8代来说要多了很多型号&#xff0c;仅I9系列就有4个型号&#xff0c;下图有此次INTEL更新所有型号的参数&#xff1a;从上图参数可知&#xff0c;Intel有史以来I3~I9全部支持超线程…

ADFLogger的SLF4J绑定–缺少的部分

由于最好的原因&#xff0c;在我的日常工作中&#xff0c;我希望为ADF Logger Oracle ADF提供一个SLF4J适配器。 毫不奇怪&#xff0c;slf4j没有用于ADFLogger的适配器&#xff0c;但是由于ADFLogger只是Java Util Logging的轻巧包装&#xff0c;因此花了一个多小时来填补这一空…

c语言int 转bool_C++代码实现逆波兰式_C 语言

100行以内C代码实现逆波兰式逆波兰式(Reverse Polish notation&#xff0c;RPN&#xff0c;或逆波兰记法)&#xff0c;也叫后缀表达式(将运算符写在操作数之后)。算术表达式转逆波兰式例子&#xff1a;逆波兰式整体的算法流程图如下&#xff1a;下面给出我基于C 语言对逆波兰式…