1. 系统架构
2. 介绍流程
- 公司的困难
- 数据的来源
- 业务
- 日志
-
Flume采集日志数据
-
选型
-
ETL
-
flume内存不够,通过ganglia监控器发现
-
提高吞吐量,batchSize
-
-
kafka
-
高效读写
-
提高吞吐量
-
kafka挂了
-
kafka丢数问题
-
数据重复问题
-
数据乱序问题
-
消费策略
-
-
zookeeper
-
CAP,只满足CP
-
非第一次选举
-
-
Flume发到hadoop
-
source、channel、sink选择
-
小文件问题
-
har归档,hive合并
-
零点漂移问题(拦截器)
-
-
业务数据
-
全量、增量
-
-
datax全量同步
-
空值问题
-
-
Maxwell增量数据
-
优点
-
原理
-
-
数仓计算引擎选择
-
通过比较,选hive on Spark
-
-
hive
-
组成
-
元数据换成MySQL
-
HQL转成MR
-
外部表、内部表
-
系统函数
-
自定义UDF
-
hive优化
-
数据倾斜,举例:加购到支付的一个平均使用时长,用累积型快照事实表解决
-
-
建模
-
数据调研,java的表,产品的指标,通过拆分指标确定能不能做
-
明确数据域
-
构建业务总线矩阵
-
建模,自下而上,首先从ODS层,然后是DIM层和DWD层,分别介绍
-
ODS,三件事
-
DWD,三类,事务型事实表、周期型快照事实表、累积型快照事实表
-
DIM,拉链表
-
ETL
-
-
指标体系建设
-
自上而下先有ADS,再有DWS
-
ADS的一些指标
-
指标拆分
-
拆分完之后去找公共的业务过程,统计周期,统计粒度,然后建DWS层宽表
-
DataX导入到MySQL
-
-
superset可视化
-
DS调度
-
这就是整个离线项目的大致过程
3. 详细介绍
在上个公司老板遇到了一些困难,他想详细的了解公司的一些运营情况,比如说了解日活、新增、留存转化率这些信息,但是公司数据量特别庞大,直接用MySQL是统计不了的,而且统计历史数据的时候更不行,所以这时候,他决定组建这个大数据团队,首先是购买相关的一些服务器(阿里云或者物理机都可以),买完服务器之后,开始进行一个项目调研,在调研的时间了解了一下公司的数据来源,一共有两种,一个是Java后台对应的业务数据,在MySQL里面存放的,还有就是前端埋点产生的用户行为数据,以文件形式存放在日志服务器中,并且在接收数据的时候都采用了Nginx进行一个负载均衡。 用户行为数据是保存30天,起到一个备份的作用,防止后续过程中数仓当中任何位置出现异常,都有最原始的一个数据的备份,都知道,在企业当中,数据是最重要的,相对来说,磁盘比较廉价,所以我们备份30天,接下来就是考虑如何将数据传输到Hadoop当中,去进行后续的一个处理? 在这里面,我调研过很多技术,我们也可以直接写一个java程序,读取这个文件上传,我们也可以写一个JAVA程序,创建一个Hadoop客户端,去进行put,一次性put上来也行,但是我们考虑了一下,感觉这个直接put上来不怎么好,因为这个log文件越来越大,而且每天几百G,put上去的时间会比较长,比较慢,最终决定通过Flume来实时的采集数据,你来一点,我就采一点,这样就省去一次性同步一个大文件的过程,并且可以大大提高效率,并且这个flume是专门为采集日志而生的,就是用来读取文件,然后进行一个传输,他就是干这个事儿了。 并且Flume采集这个文件,正常情况下可以直接就可以写到Hadoop当中,但是我们又进行了一个架构上的思考,因为我们需求当中,需要我们的离线出仓,还要兼顾未来实时数仓的一个架构,也就说离线数据和实时数据共用一套采集系统,那这时候我们就考虑说,那能不能加上一个kafka,那这样实时数据和离线数据都可从里面去取,同时,这个kafka也起到了一定的消峰作用,比如说像双十一、618,采集过来这个海量数据,如果直接传到hadoop当中,就会出现阻塞的情况,但这时候呢,如果写到这个kafka中,然后慢慢的进行消费,还有就是前面这个Flume这块,涉及到source、channel、sink的选型问题,这个Source呢,我们选择的是Taildir Source,因为它能支持断点续传和对应的多目录。并且这个Taildir Source的底层原理也非常简单啊,比如说你这个重来数据,它会put到这个Channel当中,之后会记录一个offset,那么下次读取的时间会先看一下offset读到哪里了,然后再读,它不会导致数据丢数,但是在极端情况有可能产生重复数据,比如说你数据发过来之后,这个提交offset的时侯,它失败了,因为这块没有做事物保护,那就有可能产生重复数据,但是产生这个重复数据的概率是比较低,并且即使产生了重复数据我们后续也可以在数仓的dwd层也可以对它进行一个去重,那下一个呢,是Channel,因为我们下一级是kafka,所以channel这块我们果断采用的Kafka Channel,因为Kafka Channel的性能是高于memory Channel + Kafka Sink的,因为后者的传输通道没有Kafka channel的传输通道快,所以说这里我们选择是Kafka channel直接把数据就传到Kafka中,首先是他的速度比较快,另一方面呢,它的可靠性也比较高,因为它的数据是直接存到Kafka中,kafka底层就是基于磁盘的,所以这个可靠性是有保障的,所以这里面选择了Kafka Channel。 同时在这个flume当中,我们还做了一个ETL拦截器,这个ETL拦截器,主要是判断JSON是否完整,因为这里主要是想减少网络上的带宽,比如说你这个JSON根本不完整的,你直接传过来,传到最终的Hadoop中,那就占用网络带宽,那就这种数据我能不能在源头上就把它干掉,因为就算你存到Hadoop当中,在使用时也会解析失败,那为什么没有把这个JSON详细的解析出来,一个一个字段去进行判断,把ETL的操作都放在这儿呢?那因为这个flume相当于下水道,你放的头发丝比较多的话,都容易堵塞,所以这个会影响到flume的吞吐量,降低它传输的一个性能,所以这块只能做简单的清洗操作,并且像这种自定义拦截器的步骤也比较简单,我只需要定一个类,实现一个intercept的接口,再去重写里面四个方法,自主化,关闭,单event,多event,然后再写一个静态内部类Builder。然后之后呢,打包上传到flume的lib包下,然后在配置文件当中全类名拼接上到 $ 这样就可以实现,之后呢,我们还用到这个ganglia监控器,可以监控这个flume的put事务,当然我用的kafka channel就没有这个take事务了,这里面有个put尝试提交的次数和最终success成功的次数,如果你发现这个put大量成功,但是这个尝试提交次数,远远大于最终success成功的次数,就说明发生了大量的这个回滚操作,那就说明flume运行不太良好,那这时候,一般情况下就是你的内存不够导致的啊,因为flume的默认内存只有20M。此时你需要把他增加到这个4-6个G,可以去flume的env.sh当中修改对应的参数就可以了,并且在使用flume当中,想提高它对应的一个吞吐量,可以去修改它里面的参数叫batchSize,把这个值调大就可以提高它对应吞吐量,但是带来的问题就是导致这个数据时间会有一定的延迟,如果能接受这种延迟,那就没有问题,但正常情况我们是可以接受的了的,因为数据传到Hadoop也是在这等着,还有就是凌晨的时候才开始处理这个数据,所以慢慢传也没有问题。
接下来就是数据传输到kafka中,当时为什么选择kafka呢,主要是因为kafka能做到一个高效读写,比如说,它首先是集群,同时呢,它可以设置这个分区增加并行度,并且它底层是采用这个稀疏索引index进行存储的,每4KB的数据去记录一条索引,那这样这个处理的速度要快一些。另一个就是底层采用的是顺序读写,可以达到600 M/S,速度非常快,还有就是零拷贝和页缓存技术,那什么是零拷贝和页缓存呢?比如说这个生产者啊,发送到kafka对应的这个数据,那你发送过来的数据,这个kafka不对它进行处理,直接扔给页缓存。 谁是页缓存呢?就是Linux系统的内存,直接给它,那给它之后呢,什么时候落盘呢?首先第一个它内存不够的时候,那肯定会落盘,还有一种呢,就是它这里面的这个数据不被经常访问了,也会去进行落盘。当外部来访问kafka数据的时候,会直接从内存里面返回数据。 那万一落盘了,这个拿不到数据怎么办?落盘之后可以再加载回来,再进行应答,他的操作都是内存操作,速度非常非常快,那另一方面什么叫零拷贝呢,消费者来消费数据,正常情况下框架设计都会走这个应用层,去页缓存里面读取数据,读过来之后,得通过网卡发送给消费者,但是通过网卡发送的时候,都有一个发送缓冲流,这是网卡内存的一个备份,这样的话,就会多一次拷贝过程,但是,kafka设计的就非常精妙,他根本就没有这个应用层或者是应用层不做任何操作,就直接去访问页缓存,那页缓存中的数据直接就通过网卡返回给消费者,就省去了网卡内存的一个拷贝过程,这就是零拷贝。 在使用kafka过程中,遇到过一些问题,比如说提高这个kafka的吞吐量,怎么提高kafka吞吐量呢?其实这里面涉及到的角色比较多,因为它是由生产者、broker和消费者这样一个构成,任何一个环节都有可能阻塞吞吐量的提高,我们需要进行综合的考量,比如说我可以对参数进行一些优化, 第一,双端队列的32 M缓存,我给他提高到64 M,16K的批次大小给他提高到32K,还有这个linger.ms默认0ms,我给提高到10ms,当然可能会造成一定的延迟,那还有就是采用压缩,我们采用snappy进行压缩以减少网络IO,另一方面,在broker端,我可以增加对应的分区数,同时,我要求消费者端,要么增加消费者,要么增加CPU核数,否则的话没有用,并且,在这个消费数据的时候呢,默认一次拉取的数据量是50 M,我给他提高到60 M,甚至更高,拉过来之后,把他放到这个队列里,我一次处理的条数,它默认是500条,那我可以提高到1000到3000条,这样就可以去提高kafka的吞吐量,同时,在使用kafka中也遇到了那个挂了、丢了、重了,乱序了等等一系列问题,那比如说首先这个挂了,你要先看日志,看是什么原因导致它挂了,一般情况下呢,有可能是你资源不够,那资源不够的话,你可以用linux的高级命令查看磁盘,比如说df -h,查看CPU,可以用这个top,查看这个内存可以用jmap -heap,有的时候内存不够的可能性也比较多,因为它默认内存是1 G,生产环境中,一般我们得调到10~15 G,当然还有一种情况,就是这个,呃,像我之前有个同事误删了一个kafka节点,删掉之后,他当时慌的一批,然后我分析了一下没啥问题,最后跟我的猜测是一样的,因为我们副本是两个,把这个你正常删掉之后,重新去给他服役上来,就没有啥问题,也不会导致数据这个丢失,这是挂了,那如果是丢数据了呢?如果你要想保证数据不丢,那你就将acks设置为-1,然后将这个副本数设置为大于等于2,还有isr里面最小副本数也设置为大于等于2,那这样就可以了啊,就可以保证了,这就可以把丢数据的问题解决了。
那如果有重复数据呢?可以使用这个幂等性,那幂等性的原理,其实就是,你整过来的数据,他在内存当中维护了一组数据的这个元数据。他会判断你这个数据之前发没发过,发过的话,那我就不再往下发了,没发的话我再重新落盘,然后再记住这个元数据,就是这样一个过程。但是呢,他怎么判断这两个数据相不相同呢?条件有三个,第一个是PID,第二个是分区号,第三个是序列号,那这里面PID要注意一下,因为PID是kafka每次一重启的时候就会发生变化,并且kafka的内存全部就清空了,那以后也许你是重复的数据,那我也会认为是不重复的,就会在极端情况产生一些重复数据,所以幂等的话,也不能完全的保证数据不重,只能说他可以保证的是单分区单会话内数据不重,多会话是肯定不行的。还有就是ack没有应答成功的情况下, 要想保证数据不重复,就需要开启事务,事务是靠的五个API来处理的,当然它底层是基于这个幂等性的,也就是说你使用事务的时候必须得开启幂等性,那他5个API是哪5个呢,首先是初始化,开启事务,然后是在事务内提交已经消费的偏移量API,最后是提交事务或者是回滚事务,这5个API就可以控制事务的一个原子性,如果写入broker失败了,我们可以进行回滚。
那还有就是乱序数据,那什么是乱序呢?比如说,第一个先保证有序吧,先说有序,有序怎么保证是有序呢?就是kafka可以保证单分区内是有序的,你往一个分区里面发,那数据肯定是有序的,如果非得要求多分区数据有序,那你就得把所有的分区数据通过下游处理器去读出来,然后进行重新排序,当然还有一些场景就是,希望把一张表的所有数据发送到某一个分区,那你就把这个表名设置为key就行了,就可以实现将一张表发送到某一个分区里面去。但是,单分区中有可能会有乱序,那如何解决这个问题呢,第一个就是,没有开启幂等性的情况下,发送失败时不进行重试不就可以了,就是将retries设置为0,同时将request的那个参数in.flight也设置为1,就是一次只发送一条数据。因为这里面是涉及到一个broker ,还有这个生产者,生产者往broker发送数据的时候,这里面有in.flight,默认是5,也就是说我可以源源不断的异步的往broker进行一个发送,发送过来之后,会判断这个数据的一个序列号是否是连续的,你不是连续的那就不行,不连续就不允许发送,那不就行了,那你就源源不断往这发就行了。第二个就是,开启幂等性,那这个幂等性它在broker端会自动帮我们对这个数据进行一个排序,但最多一次排五个,所以可以把in.flight设置为小于等于5,把retries设置为默认值,再加上这个幂等性,基本上就可以了,而且这个in.flight如果你将他设置为大于等于5,它直接就给你报错了,所以你也设置不成功。 kafka里面还有一个消费策略,它包含了Range、RoundRobin、粘性这三种。 那这里面主要考虑一下Range、RoundRobin,Range的特点就是它的这个分配算法特别快,比如说你七个分区三个消费者,他给你七除三等于二余一,那就是第一个消费者给你三个,然后之后两个两个的,然后这样就完事了,所以这种分配算法速度非常非常快,好多这个企业愿意用,但是呢,它有个毛病,如果你针对这个多个topic的这种情况,它是每个topic都单独的进行这样的一个分配,那就会导致多出来的分区都砸到第一个消费者,最终,它容易造成数据倾斜,所以如果topic比较多,那就不要用它了,那就用RoundRobin,他的的特点,就比如说还是七个分区三个消费者它采用的是轮循,一人一个轮着来,但是这种方法相对来说是有点慢,但好处就是不论有多少topic,我都是轮循,这个消费者与消费者之间,最多就差一个分区,所以就不会产生数据倾斜,另外就是粘性,正常分配的时候一般都不考虑它,都是在这个消费者挂了的时候,如果没有加上粘性,那再平衡时每个分区和消费者之间都可能重新分配,这个改动就比较大,那反过来我加上粘性之后,那只是把挂了的这个消费者的任务,重新分配给其他消费者,那这样的话,分配的速度要更快一些,影响范围也要更小一些,所以通常就是这个粘性结合Range或者RoundRobin进行使用,这个官网默认,就是采用的这个Range + 粘性这种方式进行使用的。 那kafka是依赖于zookeeper的,就是这个zookeeper,是非常不错的轻量级框架,那在这个使用zookeeper的时候,我也对它进行过详细的研究,那比如说zookeeper里面,它有一个CAP法则,但是它只满足对应的CP法则,C就是数据一致性,P是这个分区容错性,但是,它不满足的是A,就是可用性,因为,zookeeper在这个重新选举的时候,是不可能对外提供服务的,所以说就不能保证这个可用性。还有就是他在非第一次选举的时候,epoch,事务id,服务器id,它就遵循一个这样的选举规则,当然了,我也了解了一下,这个kafka 2.8以后,是可以不用这个zookeeper的,并且我大胆的预言了一下,等这个kafka再升级到稳定版,并且稳定一点之后,就可以考虑把zookeeper干掉,省去一定的这个外部通讯的这个资源,那么效率也会适当提高一些。 接着,就是考虑怎么将这个Kafka的数据去传输到hadoop中,其实这块呢,有很多办法,比如说我们可以写个java程序,消费kafka,然后上传到hadoop中,也还可以写一个flink程序,source源对接它,然后这个Sink端对接Hadoop也没问题,但是这都需要我们写代码,相对就麻烦一点,然后又考虑了一下能不能用其他组件,因为我们前边是flume,那我这边也可以用呀,最后我们项目选用的是kafka source和hdfs Sink,中间的channel,因为传输的是日志,丢一些条数,也无所谓,于是我们选用了Memory Channel。 但是我们用hdfs Sink的时候,突然发现这个hdfs中产生了大量的小文件,当时我同事就慌了,这时我还是比较淡定的,我说这个有问题是好事儿,解决掉就可以了,然后果断的查阅这个官方手册一看,这里面确实有参数可以去控制,可以控制生成文件的大小,然后我把大小控制成128 M,但是以我的经验来看,只控制大小是肯定不行的,万一这个过来的数据总量都到达不了128 M,那怎么办,我寻思着如果再有个时间配合他就完美了,一看文档,果然有这个时间参数,又给他添加了一下,但是我又翻了一下这个文档,里面还有个设置event的个数,它默认给你设置成10,就是表示你过来10个event数据我给你生成一个文件,但这样还是有产生小文件的这种风险,那果断的设置为0给他禁止掉,所以控制时间、大小、event个数,这样,在hdfs上就没有产生一些小的文件,那之前产生的小文件怎么办,也有办法,可以采用har归档,就是把这个大量的小文件归档在一起,主要是减少NameNode的一个压力,还有就是,如果你是MR程序,可以采用这个combinerTextinputFormat,如果你用的是hive的话,你可以采用这个combineHiveInputFormat,都可以,就是将这个大量的小文件放在一起,统一进行一个切片,那就减少了mapTask开启的个数,也就减少了占用的内存,并且同时也可以开启JVM重用,来减少JVM开关的一个时间。
并且在使用这个flume过程中,我们还解决了一个零点漂移的问题,这是阿里当年的一个难题,什么难题呢?因为我这个数据存储是按天创建分区的,比如说18号,19号每天一个分区。然后在产生数据的时候,比如说是这个18号23:59:59,产生了一条日志,但是由于整个传输通道肯定是有一定的延迟的,那传到我下一级的时候,已经变成了19号00:00:50。,他在存储时是用的系统的当前时间,那当前时间就是19号,那它就会把这个数据写到19号分区里面去,但是这是18号产生的数据,不能给我放到19号里,那像阿里当年是把下一个分区的最近15分钟的数据都读出来,然后通过时间过滤,将18号的数据过滤出来,再给它追加到18号分区,这显然就很麻烦,但是现在这个技术都在进步,直接在这块放上一个时间戳拦截器,我把你这条数据拦下来,拦下来之后取出他的时间戳,然后把它给hdfs sink的时间戳变量,然后他就根据时间戳变量直接写到18号分区了,这样就省去了后续再处理这个麻烦事,解决了零点漂移的问题,。 接下来就是对业务数据的处理。处理业务数据,就分情况了,里面涉及到一个同步策略,同步策略有全量和增量,全量就是把所有的数据一次性全拿过来,尤其是数据量较小的表,比如这个商品的SKU表,商品的SPU表,商品一级分类,二级分类,三级分类,地区表,类似这些表,数据量都非常非常小,直接就全量同步过来了,但是还有一些大表。例如一些事实表,像加购,下单,支付,物流,这些数量比较大,如果每天都全量的话,那压力太大,不光同步的速度比较慢,也很占用磁盘空间,这时候就考虑说用增量,
处理这个全量的数据的组件目前市场上比较主流的有sqoop和DataX,DataX对接这个数据源比较多,像Oracle,MySQL,HDFS都可以很好的支持。而且我们的数据量也不大,全量的数据加在一起每天才1~2 G,并且这个DataX属于叫单节点,基于内存的一个同步,所以速度也是非常快,可能十几分钟就完事了,但是使用DataX的时候,它是有点小bug的,比如说mysql里面的空值就是空,但是我们同步到hdfs中未来就是给hive使用的,而hive的空是\N,那明显这个需要转换一下,但是这个免费版本没有修改,当然我可以自己改源码,或者不改的话只能在hive创建表时进行空值转换,将其转换成空串也是可以的,那对于这个DataX调优的话,我们之前的那些数据量其实也涉及不到调优,但是,如果贵公司用的这个DataX导的这个数据量比较大,那就可能需要去增加它的内存,或者增加对应的线程数。
接下来就是处理增量数据。我们可以用Maxwell进行一个同步。为什么选择Maxwell,这个同类型的产品有很多,比如说Flink CDC 等,原因很简单,因为Maxwell支持断点续传和首日全量,可以进行一个初始化,那Flink CDC,各方面和这个Maxwell一样,但是比较遗憾的就是当时它是1.0版本,有锁表的bug,所以这个在生产环境当中没法正常使用,到了应该是2022年的这个三月份的左右吧应该,他推出的这个2.0解决了这个锁表bug,但是当时我们只能上Maxwell了,Maxwell的底层原理也比较简单,就是MySQL的主从复制,伪装成一个Mysql的从库,获取这个BinLog的数据,接着就把增量数据同步过来了,增量数据同步的都是一些事实表。 接下来我们又去决定用什么计算引擎去进行一个数仓的搭建,我们考虑了hive的 Mr, Spark Sql,还有hive on Spark,最后我们选择了hive on Spark。因为这几个里面性能最好的就是Spark Sql,但是Spark的生态不是特别完善,比如说权限管理、元数据管理Spark都是不支持的,所以我们只能用hive on Spark进行解决。 在使用hive过程中,我们也对hive进行了研究,比如说hive的组成,首先是客户端,然后是元数据,它默认元数据存储在debery数据库,不支持多人开发,所以我们给它放到mySql里面,同时还有对应的解析器,数据生成器,还有逻辑计划生成器,逻辑计划优化器,物理计划生成器,物理计划优化器,以及对应的执行器,对这个Sql进行一个解析,其实这些模块就是将你写的这个HQL翻译成可以执行的MR程序,首先将这个HQL,翻译成AST抽象语法树,然后对这个抽象语法树进行逻辑计划的一个生成,然后进行优化,然后再通过物理计划生成器生成物理计划,再进行优化,最后一执行就完事了,hive是基于hdfs,它的底层存储是落在HDFS上,并且默认的计算引擎是MR,任务调度用的是Yarn的这一套,这个MR引擎,之后我们换成了Spark引擎进行计算。 还有就是,我们在hive中创建表的时候,通常都会创建外部表,因为外部表在删除数据的时候,只删除元数据,不会删除原始数据,起到一个安全作用,那像我们只有自己使用的那种临时表才会创建内部表,那个删了也没事,还有像我们在公司中,也大量使用系统函数,我看了一下官网上有289个,但是我们并没有用那么多,但是也用了很多,比如date_add、date_sub、 datediff,last_date、next_date、nvl、 case when、if,等函数,但虽然说有这么多函数,但是在我们这个开发中,还可能遇到一些比较复杂的场景,就需要我们去自定义函数来进行实现,比如说可以自定义UDF和UDTF,当然UDAF比较少,那像UDF,就是相当于我们用的算子map,那UDTF就相当于Flat map,那UDF自定义的时候非常简单,只需要定义一个类继承这个GenericUDF里面就一个核心方法叫evaluate。在里面写自己的逻辑就行了,当然还有一些初始化关闭的一些东西,那UDTF,就是属于这个炸裂,一般的就是定义一个类继承GenericUDTF,去写三个方法:初始化、关闭、process,process里面是核心逻辑,初始化里面去声明类型及对应名称就可以了,接着就需要对这个自定义函数进行打包,上传到HDFS路径,然后在hive的客户端进行注册去使用,还有一些窗口函数,像over,它里面既可以进行分区,又可以排序,去进行一个开窗处理,用它的时候,统计最多的就是像一些7天内连续3天、topN,还有这个同时在线人数等一些场景。 还有就是在使用hive当中,也对其进行过一些优化,比如说如果是大、小表join的情况下,这个mapJoin默认打开就不要关闭了,还有我们可以提前进行行列过滤,就类似于这个谓词下推,也可以把谓词下推功能开启,也可以直接创建分区表,去防止后续的全表扫描,也可以创建分桶表,当然,还可以采用压缩,去减少网络上数据的传输,去减少磁盘IO,还可以采用列式存储,来加快查询的速度,可以处理小文件,去提前开启merge功能,就是在执行MR程序的时候,产生了大量的小文件,会单独再开启个MR,将这些小文件进行合并,合并的规则就是小于16 M的文件,就会认为是小文件,给它合并到256 M,还有就是如果只有map任务时是默认打开,如果是MR任务,就需要手动把它打开,还有像小文件这种的,我们可以开启这个combineHiveInputFormat,将小文件打包在一起,统一进行切片,同时也可以开启JVM重用,减少一些JVM开关的时间。同时我们还可以合理的设置reduce的个数,那还有就是如果遇到group by的这种场景,需要提前去开启mapside进行预聚合,还有会将MR引擎更换为Spark引擎来进行相关的一些优化。 我们有时还会遇到数据倾斜,比如说我们之前在统计一个各个省份的交易额这种的指标,在yarn上就看到了个别省份有的运行的快,有的运行的慢,像快的能比这个慢的时间上差了20倍,像北京、广东、江苏的这些地方,执行的特别慢,那这时候果断的把这个任务停掉,然后开启group by的mapside进行预聚合,同时又开启了skewgroup by,进行二次聚合,还有除了group by这种数据倾斜的问题,我们还遇到这种join产生的数据倾斜,首先就是大、小表join的场景,一般就是事实表和维度表join,这种场景特别多,比如订单跟商品,那这种我一般就是先开启mapjoin进行处理就可以了,最怕的就是这种大表和大表的join,比如说之前我们就统计了一个,加购到支付的一个平均使用时长,这种情况下就遇到了这个大表join大表的场景。接着我们就去可以开启skewjoin,也可以开启smb 来分桶join,但是这个办法,就是他要求两张表必须得是分桶且有序,还有也可以用左表随机,右表扩容来解决,但是会增加额外的一些操作,虽然说能够让每个reduce里面处理的数据量少,但是实际的工作量反而是大了,但是它能把这个任务执行完,那最牛的解决方式就是从建模上对他进行解决,比如说我可以采用这个维度建模当中的累积型快照事实表,在这两张表没有变大的时候,我就开始进行了处理累积的过程,这样的来做就避免了大表join大表这种场景。
接着我们就开始进行正常的建模,首先呢,就是进行数据调研,先把JAVA后台的所有的表的数据都拿过来,详细去看,看完之后,对这个业务有大概的一些了解,我是怎么看的呢,一般就是我模拟我自己是一个用户,然后去在页面上浏览他能干哪些事儿,可以跟后台哪些表有这个联系,了解差不多之后,我就找这个JAVA后台人员进行一个沟通,去聊一下我的猜测是否是对的,跟他沟通差不多之后,我就跟那个产品经理聊,聊对应的需求。 需求里面有原子指标,派生指标,衍生指标,像 派生指标 = 原子指标 + 统计周期 + 统计粒度 + 业务限定。原子指标 = 业务过程 + 度量值 + 聚合逻辑。按这样一个过程去拆这里面的业务逻辑,那如果业务逻辑里面有这个业务,那我就能做,没有这个业务就做不了,所以这个得跟他沟通清楚,下面就开始这个第二步,去明确数据域,其实也是跟着需求来的,需求当中有哪些域,就去处理哪些域的指标,那首先呢,是用户来到网站进行一个登录注册,那就有了用户域,然后准备逛一逛,就产生了流量,那流量就是用户的这个启动、页面、动作、错误、曝光,就是流量域,逛差不多了,就去买东西了,那不久产生了交易,那交易的话就得是加购、下单、支付、物流、退单等,交易域就出来了,在这个交易得过程当中,又进行了优惠券得使用,活动的参加,可以把他归结为工具域,最后呢,收到货了,用了后可以进行一个互动,去点赞、评价、收藏,互动域也出来了。
明确这些数据域之后,就是去构建业务总线矩阵,构建业务矩阵就是把事实表往左边一放,上面放上维度表,他们之间有关联的,根据坐标一打勾就行了。
构建业务总线矩阵之后才开始真正的建模。建模的话,是自下而上,首先从ODS层,然后是DIM层和DWD层,
ODS层里面就干了三件事。第一件事,保持数据原貌,不做任何修改,起到一个备份的作用,去防止数仓上面任何一层发生变化,都有最原始的数据,第二件事就是创建分区表,防止后续的全表扫描,第三件事,就是采用压缩,减少磁盘的存储空间。
接下来就是这个DWD,主要处理的就是事实表,这里面就涉及到事实表的分类了,主要有三类,事务型事实表、周期型快照事实表、累积型快照事实表,大多数情况下呢,我们是优先考虑事务性事实表,因为事务性事实表特点就是处理原子操作,是不可再切割的,比如说加购、下单、支付、物流这些,但是他不能解决所有问题,比如说这个连续型指标,或者是这个多张事实表关联的场景他不擅长,那这种连续型指标,就得用周期型快照,如果是多事实表关联,我们就用累积型快照,具体的事务型事实表处理有标准的四步,第一步选择业务过程,第二步声明粒度,第三步确定维度,第四步确定事实,那选择业务过程就选择产品经理感兴趣的,也就是指标当中需要统计的这个业务,第二步声明粒度,就是一行信息代表什么含义,可以代表一次下单,一个月下单,或一年下单,那如果给你的一年下单,那你没法统计一次下单,那只给你一次下单,你就可以统计一年下单,所以这里面要保持最细粒度,只要保证不做聚合操作就行。第三步确定维度,就是我们产品经理统计指标当中需要的,比如说我们需要这个用户商品活动、时间、地区、优惠券这些,那就留下,不需要的就把它干掉,第四步确定事实,确定事实表的度量值,什么叫度量值,就是可以累加的值,例如个数、件数、金额,把它确定好,那这些确定完之后,再看周期型快照,就是在这个声明粒度的时候,变成这个一天,或者是一年,看你自己这个周期是多少,自己确定,那这个累积型快照事实表,只不过是在确定事实的时候,要确定多个事实表的度量值,就完事了。
接着就是DIM层,主要是维度相关的一些处理,就是一般没有度量值,只要一些属性信息,这些通常都是维度,那在这里面主要做的比如说对用户做的拉链表,因为用户表的特点就是缓慢变化,有的时候一天变化一次,一月变化一次,不确定,而且数据量还比较大,所以要做拉链表,对它进行处理,拉链表也比较简单,只需要在用户表末尾加上开始日期、结束日期,然后进行初始化,接下来跟第二天获取的新增和变化的数据进行关联,之后取出最新的数据放到新分区,旧的数据放到旧分区,并且在DIM层我们还进行了维度整合,比如说我们将商品表,商品品牌表,商品一级分类,二级分类,三级分类整合成商品维度表,将省份、地区整合成地区维度表,将这个活动信息,活动规则整合成活动维度表就完事了。
DWD和DIM层还做了一些ETL清洗,比如说清洗我们用的是HQL,在这里面判断一些核心字段不能为空、一些重复数据的处理、相关数据的脱敏等一系列操作。
这之后,整个建模就结束了,那接着我们就开始进行一个指标体系建设,它是自上而下先有ADS,再有DWS。ADS我们统计的指标日活、新增、留存率,最近7日内连续3日下单用户数,各品牌商品收藏次数Top3等等,有了这些指标之后,需要进行拆分,把派生指标变换成原子指标 + 统计周期 + 统计粒度 + 业务限定,拆分完之后去找公共的业务过程,统计周期,统计粒度,然后建DWS层宽表,去进行构建,构建完成之后,DWS建完,那整个数仓也就建完了,那建完之后就是要把数仓中处理好的数据往外部系统中导,那这里就可以用DataX进行同步,DataX这边还会产生空值问题,如果说你这里面是\N,再往MySql里面导,DataX就有对应的参数。 最终我们用superset进行可视化,虽然说它的页面比较丑,但是免费,这点老板比较喜欢,后面调度时我们用的DS,进行数仓的调度,因为它一方面是国产的,并且这团队也比较强大,各方面性能也比较OK,最后就选了他,当然在DS里面,每天跑的指标,平时也就100多个,节假日的情况下,一般是150个到200个左右。 之后元数据管理这块,都是我同事做的,可以用atlist,也可以自己实现,自己去解析相关的这个程序。这个不是我主导的,了解的不是太多。 这些整完之后呢,就是整个集群的监控,是Prometheus+Grafana,可以监控各个组件,某一个进程挂了可以直接触发报警。
这就是整个离线项目的大致过程。