离线数仓-项目介绍

1. 系统架构 

 2. 介绍流程

  1. 公司的困难
  2. 数据的来源
    1. 业务
    2. 日志
  3. Flume采集日志数据

    1. 选型

    2. ETL

    3. flume内存不够,通过ganglia监控器发现

    4. 提高吞吐量,batchSize

  4. kafka

    1. 高效读写

    2. 提高吞吐量

    3. kafka挂了

    4. kafka丢数问题

    5. 数据重复问题

    6. 数据乱序问题

    7. 消费策略

  5. zookeeper

    1. CAP,只满足CP

    2. 非第一次选举

  6. ​​​​​​​Flume发到hadoop

    1. ​​​​​​​source、channel、sink选择

    2. 小文件问题

    3. har归档,hive合并

    4. 零点漂移问题(拦截器)

  7. ​​​​​​​业务数据

    1. ​​​​​​​全量、增量

  8. datax全量同步

    1. ​​​​​​​空值问题

  9. ​​​​​​​Maxwell增量数据

    1. ​​​​​​​优点

    2. 原理

  10. ​​​​​​​数仓计算引擎选择

    1. ​​​​​​​通过比较,选hive on Spark

  11. ​​​​​​​hive

    1. ​​​​​​​组成

    2. 元数据换成MySQL

    3. HQL转成MR

    4. 外部表、内部表

    5. 系统函数

    6. 自定义UDF

    7. hive优化

    8. 数据倾斜,举例:加购到支付的一个平均使用时长,用累积型快照事实表解决

  12. ​​​​​​​​​​​​​​建模

    1. ​​​​​​​数据调研,java的表,产品的指标,通过拆分指标确定能不能做

    2. 明确数据域

    3. 构建业务总线矩阵

    4. 建模,自下而上,首先从ODS层,然后是DIM层和DWD层,分别介绍

    5. ODS,三件事

    6. DWD,三类,事务型事实、周期型快照事实、累积型快照事实

    7. DIM,拉链表

    8. ETL

  13. ​​​​​​​​​​​​​​指标体系建设

    1. ​​​​​​​自上而下先有ADS再有DWS

    2. ADS的一些指标

    3. 指标拆分

    4. 拆分完之找公共的务过程,统计周期,统计粒度,然后建DWS层宽表

    5. DataX导入到MySQL

  14. ​​​​​​​​​​​​​​superset可视化

  15. DS调度

  16. 这就是整个离线项目的大致过程

3. 详细介绍

在上个公司老板遇到了一些困难,他想详细的了解公司的一些运营情况,比如说了解日活新增、留存转化率这些信息,但是公司数据量特别庞大直接用MySQL是统计不了的,而且统计历史数据的时候更不行,所以这时候,他决定组建这个大数据团队,首先是购买相关的一些服务器(阿里云或者物理机都可以),买完服务器之后,开始进行一个项目调研,调研的时了解了一下公司的数据来源,一共有两种,一个是Java后台对应的业务数据,在MySQL里面存放的还有就是前端埋点产生的用户行为数据,以文件形式存放在日志服务器中并且在接收数据的时候都采用了Nginx进行一个负载均衡。 用户行为数据是保存30天,起到一个备份的作用,防止后续过程中数仓当中任何位置出现异常,都有最原始的一个数据的备份,都知道,在企业当中,数据是最重要的,相对来说,磁盘比较廉价,所以我们备份30天,接下来就是考虑如何将数据传输到Hadoop当中进行后续的一个处理 在这里面,我调研过很多技术,我们可以直接写一个java程序,读取这个文件上传,我们也可以写一个JAVA程序,创建一个Hadoop客户端,进行put,一次性put上来也行,但是我们考虑了一下感觉这个直接put上来不怎么好,因为这个log文件越来越大,而且每天几百G,put上去的时间会比较长,比较慢最终决定通过Flume来实时的采集数据,你来一点,我就一点,这样就省去一次性同步一个大文件的过程并且可以大大提高效率,并且这个flume是专门为采集日志而生的,就是用来读取文件,然后进行一个传输,他就是干这个事儿了。 并且Flume采集这个文件,正常情况下可以直接就可以写到Hadoop当中,但是我们又进行了一个架构的思考,因为我们需求当中,需要我们离线出仓,还要兼顾未来实时数仓的一个架构,也就说离线数据和实时数据共用一套采集系统,那这时候我们就考虑说,那能不能加上一个kafka,那这样实时数据和离线数据都可从里面去,同时,这个kafka也起到了一定的消峰作用,比如说像双十一、618,采集过来这个海量数据,如果直接传到hadoop当中,就会出现阻塞的情况,但这时候呢,如果写到这个kafka中,然后慢慢的进行消费,还有就是前面这个Flume这块,涉及到source、channel、sink的选型问题,这个Source呢,我们选择的是Taildir Source,因为它能支持断点续传和对应的多目录。并且这个Taildir Source的底层原理也非常简单啊,比如说你这个重来数据,它会put到这个Channel当中,之后会记录一个offset,那么下次读取的时间会先看一下offset读到哪里了,然后再读,它不会导致数据丢数,但是极端情况有可能产生重复数据,比如说你数据发过来之后,这个提交offset的时侯失败了,因为这块没有做事物保护,那有可能产生重复数据,但是产生这个重复数据概率比较低,并且即使产生了重复数据我们后续也可以在数仓的dwd层也可以对它进行一个去重,那下一个呢,是Channel因为我们下一级是kafka,所以channel这块我们果断采用的Kafka Channel,因为Kafka Channel性能是高于memory Channel + Kafka Sink,因为后者的传输通道没有Kafka channel的传输通道快,所以说这里我们选择是Kafka channel直接把数据就传到Kafka中,首先是他的速度比较快,另一方面呢,它的可靠性也比较高,因为它的数据是直接存到Kafka中,kafka底层就是基于磁盘的,所以这个可靠性是有保障的,所以这里面选择Kafka Channel。 同时在这个flume当中,我们还做了一个ETL拦截器这个ETL拦截器,主要是判断JSON是否完整,因为这里主要是想减少网络上的带宽,比如说你这个JSON根本不完整的,你直接传过来,传到最终的Hadoop中,那就占用网络带宽,那就这种数据我能不能在源头上就把它干掉,因为就算存到Hadoop当中,在使用时解析失败,那为什么没有把这个JSON详细的解析出来,一个一个字段去进行判断,把ETL操作都放在这儿呢?那因为这个flume相当于下水道,你放的头发丝比较多的话,都容易堵塞,所以这个会影响到flume的吞吐量,降低它传输的一个性能,所以这块只能做简单的清洗操作,并且像这自定义拦截器的步骤也比较简单,我只需要定一个类,实现一个intercept的接口,再去重写里面四个方法,自主化关闭event,多event,然后再写一个静态内部类Builder。然后之后呢,打包上传到flume的lib包下,然后在配置文件当中全名拼接上到 $ 这样就可以实现,之后呢我们还用到这个ganglia监控器,可以监控这个flume的put事务,当然我用的kafka channel就没有这个take事务了,这里面有个put尝试提交的次数和最终success成功的次数,如果你发现这个put大量成功,但是这个尝试提交次数,远远大于最终success成功的次数,就说明发生了大量的这个回滚操作,那就说明flume运行不太良好,那这时候,般情况下就是你的内存不够导致的啊,因为flume的默认内存只有20M此时你需要把他增加到这个4-6G,可以去flume的env.sh当中修改对应的参数就可以并且在使用flume当中,想提高它对应的一个吞吐量,可以修改它里面参数叫batchSize,把这个值调大就可以提高它对应吞吐量,但是带来的问题就是导致这个数据时间有一定的延迟,如果能接受这种延迟,那问题,但正常情况我们是可以接受的了的,因为数据传到Hadoop也是在这等着,还有就是凌晨的时候才开始处理这个数据,所以慢慢传也没有问题。

接下来就是数据传输到kafka中,当时为什么选择kafka呢,主要是因为kafka能做到一个高效读写,比如说,它首先是集群,同时呢,它可以设置这个分区增加并行度,并且它底层是采用这个稀疏索引index进行存储的每4KB的数据去记录一条索引,那这样这个处理的速度要快一些。另一个就是底层采用的是顺序读写,可以达到600 M/S,速度非常快,还有就是零拷贝和缓存技术那什么是零拷贝和缓存呢?比如说这个生产者啊,发送到kafka对应的这个数据,那你发送过来的数据,这个kafka不对它进行处理,直接扔给缓存。 谁是页缓存呢?就是Linux系统的内存,直接给,那给之后呢,什么时候落盘呢?首先第一个它内存不够的时候,那肯定会落盘,还有一种呢,就是它这里面的这个数据不被经常访问了,也会进行落盘外部来访问kafka数据的时候,直接从内存里面返回数据 那万一落盘了,这个拿不到数据怎么办?盘之后可以再加载回来进行他的操作都是内存操作速度非常非常快,那另一方面什么叫零拷贝呢,消费者来消费数据,正常情况下框架设计都会走这个应用层,去页缓存里面读取数据,读过来之后,得通过网卡发送消费者,但是通过网卡发送的时候,都有一个发送缓冲流,这是网卡内存的一个备份,这样的话,就会多一次拷贝过程,但是,kafka设计的就非常精妙,他根本就没有这个应用层或者应用层不做任何操作,就直接访问缓存,那缓存中的数据直接通过网卡返回给消费者,就省去了网卡内存的一个拷贝过程,这就是零拷贝。 在使用kafka过程中,遇到一些问题,比如说提高这个kafka吞吐量,怎么提高kafka吞吐量呢?其实这里面涉及到的角色比较多,因为它是由生产者、broker和消费者这样一个构成,任何一个环节都有可能阻塞吞吐量的提高,我们需要进行综合的考量,比如说我可以对参数进行一些优化, 第一双端队列的32 M缓存,我给他提高到64 M,16K的批次大小给他提高到32K还有这个linger.ms默认0ms,我提高到10ms,当然可能会造成一定的延迟,那还有就是采用压缩,我们采用snappy进行压缩少网络IO,另一方面,在broker端,我可以增加对应的分区数,同时,我要求消费者端,要么增加消费者,要么增加CPU核数,否则的话没有用,并且,在这个消费数据的时候,默认一次拉取的数据量是50 M,我给他提高到60 M,甚至更高拉过来之后,把他放到这个队列里,我一次处理的条数,默认是500条,那我可以提高到1000到3000条,这样就可以去提高kafka的吞吐量,同时,在使用kafka中也遇到了那个挂了丢了、重了,乱序了等等一系列问题,那比如说首先这个挂了你要先看日志,看什么原因导致它挂了,一般情况下呢,有可能是你资源不够,那资源不够的话,你可以用linux的高级命令查看磁盘,比如说df -h,查看CPU,可以用这个top,查看这个内存可以用jmap -heap,有的时候内存不够可能性也比较多,因为它默认内存是1 G,生产环境中,一般我们得调到10~15 G,当然还有一种情况,就是这个,呃,像我之前有个同事误删了一个kafka节点,删掉之后,当时慌的一批,然后我分析了一下没啥问题,最后跟我的猜测是一样的,因为我们副本是两个把这个你正常删掉之后,重新去给他服役上来,就没有啥问题,不会导致数据这个丢失,这是挂了,那如果是数据了呢?如果你要想保证数据不丢,那你就将acks设置为-1,然后这个副本数设置为大于等于2,还有isr里面最小副本数也设置为大于等于2,那这样就可以了啊,就可以保证这就可以把丢数据的问题解决了。

那如果有重复数据呢?可以使用这个幂等性,那幂等性的原理,其实就是,你过来的数据,他在内存当中维护了一组数据的这个数据。他会判断你这个数据之前发没发过,发过的话,那我就不再往下发了,没发的话我再重新落盘,然后再记住这个数据,就是这样一个过程但是呢,他怎么判断这两个数据相不相同呢?条件有三个,第一个是PID,第二个是分号,第三个是序列号,那这里面PID要注意一下,因为PID是kafka每次一重启的时候就会发生变化,并且kafka的内存全部清空了,那以后也许你是重复的数据,那我也会认为是不重复的,就会极端情况产生一些重复数据,所以幂等也不能完全的保证数据不重,只能说他可以保证的是单分区单会话内数据不重,多会话是肯定不行的还有就是ack没有应答成功的情况下, 要想保证数据不重复,就需要开启事务,事是靠的五个API来处理的,当然它底层是基于这个幂等性的,也就说你使用事的时候必须得开启幂等性,那他5个API是哪5个呢,首先是初始化,开启事务然后是在事务内提交已经消费的偏移量API,后是提交事务或者是回滚事务,这5个API就可以控制事务的一个原子性,如果写入broker失败了,我们可以进行回滚

那还有就是乱序数据,那什么是乱序呢?比如说,第一个先保证有序吧,先说有序,有序怎么保证是有序呢?就是kafka可以保证单分区内是有序的,你往一个分区里面发,那数据肯定是有序的,如果非得要求多分区数据有序,那你就得把所有的分区数据通过下游处理器去出来然后进行重新排序,当然还有一些场景就是,希望把一张表的数据发送到某一个分区,那你就把这个表名设置为key就行了,就可以实现将一张表发送到某一个分区里面去。但是,单分区中有可能有乱序,那如何解决这个问题呢,第一个就是没有开启幂等性情况下发送失败时不进行重试不就可以了,就是将retries设置为0,同时将request那个参数in.flight也设置为1,就是一次只发送一条数据。因为这里面是涉及到一个broker 还有这个生产者,生产者往broker发送数据的时候,这里面有in.flight,默认是5,也就是说我可以源源不断的异步的往broker进行一个发送,发送过来之后,会判断这个数据的一个序列号是否是连续的,你不是连续的那就不行,不连续就不允许发送,就行了,你就源不断往这发就行了第二个就是,开启幂等性,那这个幂等性它在broker端会自动帮我们对这个数据进行一个排序,但最多一次排五个,所以可以把in.flight设置为小于等于5把retries设置为默认值,再加上这个幂等性,基本上就可以,而且这个in.flight如果你将他设置为大于等于5,它直接就给你报错了,所以你也设置不成功 kafka里面还有一个消费策略,它包含了Range、RoundRobin、粘性这三种 那这里面主要考虑一下Range、RoundRobin,Range的特点就是它的这个分配算法特别快,比如说你七个分区三个消费者,他给你七除三等于二余一,那就是第一个消费者给你三个,然后之后两个两个,然后这样就完事了所以这种分配算法速度非常非常快好多这个企业愿意用,但是呢,它有个毛病如果你针对这个多个topic这种情况,它是每个topic单独的进行这样的一个分配,那就会导致多出来的分区都砸到第一个消费者,最终,它容易造成数据倾斜,所以如果topic比较多,那就不要用它了,那就用RoundRobin,他的的特点,比如说还是七个分区三消费者它采用的是轮一人一个轮着来,但是这种方法相对来说是有点慢好处就是不论有多少topic,我都是,这个消费者与消费者之间,最多就差一个分区,所以就不会产生数据倾斜,另外就是粘性,正常分配的时候一般都不考虑它,都是在这个消费者挂了的时候,如果没有加上粘性,那再平衡时每个分区和消费者之间都可能重新分配这个改动就比较大,那反过来我加上粘性之后,那只是把挂了这个消费者的任务,重新分配给其他消费者,那这样的话,分配的速度要更快一些,影响范围也要更小一些,所以通常就是这个粘性合Range或者RoundRobin进行使用,这个官网默认,就是采用的这个Range + 粘性这种方式进行使用的 那kafka是依赖于zookeeper的,就是这个zookeeper是非常不错的量级框架,那在这个使用zookeeper的时候,我也对它进行详细的研究,那比如说zookeeper里面,它有一个CAP法则,但是满足对应的CP法则,C就是数据一致性,P是这个分区容错性,但是,它不满足的是A,就是可用性,因为,zookeeper这个重新选举的时候,是不可能对外提供服务的,所以说就不能保证这个可用性还有就是他在非第一次选举的时候,epoch,事务id,服务器id,它就遵循一个这样的选举规则,当然了,我也了解了一下,这个kafka 2.8以后,是可以不用这个zookeeper的,并且大胆的预言了一下,等这个kafka再升级稳定版,并且稳定一点之后,可以考虑把zookeeper干掉,省去一定的这个外部通讯的这个资源,那么效率也会适当提高一些 接着,就是考虑怎么将这个Kafka的数据传输到hadoop中,其实这块呢,有很多办法,比如说我们可以写个java程序,消费kafka,然后上传到hadoop还可以写一个flink程序,source对接它,然后这个Sink对接Hadoop也没问题,但是这都需要我们写代码,相对麻烦一点,然后又考虑了一下能不能用其他组件,因为我们前边是flume,那我这也可以用呀,最后我们项目选用的是kafka source和hdfs Sink,中间的channel为传输的是日志丢一些条数,无所谓,于是我们选用了Memory Channel。 但是我们用hdfs Sink时候,突然发现这个hdfs中产生大量的小文件,当时我同事就慌了,这时我还是比较淡定的,我说这个有问题是好事儿解决掉就可以了,然后果断的查阅这个官方手册一看,这里面确实有参数可以去控制可以控制生成文件的大小然后我把大小控制成128 M,但是以我的经验看,只控制大小肯定不行,万一这个过来的数据总量都到达不了128 M,怎么办,我寻思着如果再有个时间配合他就完美了,一看文档果然有这个时间参数又给他添加了一下,但是我又翻了一下这个文档,里面还有个设置event的个数,它默认给你设置成10,就是表示你过来10个event数据我给你生成一个文件,但这样还是有产生小文件这种风险,那果断的设置为0给他禁止掉,所以控制时间、大小、event个数,这样,在hdfs上就没有产生一些小的文件,那之前产生的小文件怎么办,也有办法,可以采用har归档,就是把这个大量的小文件归档在一起,主要是减少NameNode的一个压力,还有就是果你是MR程序,可以采用这个combinerTextinputFormat,如果你用的是hive的话,你可以采用这个combineHiveInputFormat,都可以,就是将这个大量的小文件放在一起,统一进行一个切片,那就减少了mapTask开启的个数,也就减少了占用的内存,并且同时也可以开启JVM重用,减少JVM开关的一个时间

并且在使用这个flume过程中,我们还解决了一个零点漂移的问题,这是阿里当年的一个难题,什么难题呢?因为我这个数据存储是按天创建分区的,比如说18号,19号每天一个分区。然后在产生数据的时候,比如说是这个18号23:59:59,产生了一条日志,但是由于整个传输通道肯定是有一定的延迟的,那传到我下一的时候,已经变成了19号0000:50。,他在存储时是用的系统的当前时间,那当前时间就是19号,那它就会把这个数据写到19号分区里面去,但是是18号产生的数据,不能给我放到19号里那像阿里当年是把下一个分区的最近15分钟的数据都读出来,然后通过时间过滤,将18号的数据过滤出来,再给它追加到18号分区,这显然很麻烦,但是现在这个技术都在进步直接在这块放上一个时间戳拦截器,我把你这条数据拦下来,拦下来之后取出他的时间戳,然后把它给hdfs sink的时间戳变量然后他就根据时间戳变量直接写到18号分区了,这样就省去了后续再处理这个麻烦事,解决零点漂移的问题,。 接下来就是对业务数据的处理处理业务数据,就分情况了,里面涉及到一个同步策略,同步策略全量增量,全量就是所有的数据一次性全拿过来,尤其是数据量较小的表,比如这个商品的SKU表,商品的SPU表,商品一级分类,二级分类,三级分类地区表,类似这些表,数据量都非常非常小,直接就全量同步过来了,但是还有一些大表。一些事实表,像加购,下单,支付,物流,这些数量比较大,如果每天都全量的话,压力太大,不光同步的速度比较慢,也很占用磁盘空间,这时候就考虑说用增量

处理这个全量的数据的组件目前市场上比较主流的有sqoop和DataXDataX对接这个数据源比较多,像Oracle,MySQL,HDFS都可以很好的支持。而且们的数据量不大,全量的数据加在一起每天才1~2 G,并且这个DataX属于叫单节点,基于内存的一个同步,所以速度也是非常快,可能十几分钟就完事了但是使用DataX的时候,它是有点小bug的,比如说mysql里面的空值就是空,但是我们同步到hdfs中未来就是给hive使用的,而hive的空是\N,那明显这个需要转换一下,但是这个免费版本没有修改,当然我可以自己改源码,或者不改的话只能在hive创建表时进行空值转换,将转换成空串也是可以的,那对于这个DataX调优的话,我们之前那些数据量其实也涉及不到调优,但是,如果贵公司用的这个DataX导的这个数据量比较大,那就可能需要增加它内存,或者增加对应的线程数

接下来就是处理增量数据。我们可以用Maxwell进行一个同步。为什么选择Maxwell,这个同类型的产品有很多,比如说Flink CDC ,原因很简单,因为Maxwell支持断点续传和首日全量,可以进行一个初始化,Flink CDC,各方面这个Maxwell一样,但是比较遗憾的就是当时它是1.0版本,有锁表的bug,所以这个在生产环境当中没法正常使用,到了应该是2022年这个三月份的左右应该,他推出这个2.0解决了这个锁表bug,但是当时我们只能上Maxwell了Maxwell的底层原理也比较简单,就是MySQL的主从复制,伪装成一个Mysql的从库,获取这个BinLog的数据,接着就把增量数据同步来了,增量数据同步的都是一些事实表 接下来我们又去决定用什么计算引擎进行一个数仓的搭建我们考虑了hive Mr, Spark Sql,还hive on Spark,最后我们选择hive on Spark因为这几个里面性能最好的是Spark Sql,但是Spark的生态不是特别完善,比如说权限管理、元数据管理Spark是不支持,所以我们只能用hive on Spark进行解决。  在使用hive过程中,我们也对hive进行了研究,比如说hive组成,首先是客户端然后是元数据,它默认数据存储在debery数据库,不支持多人开发,所以我们给它放到mySql里面,同时有对应的解析器,数据生成器,还有逻辑计划生成器,逻辑计划优化器,物理计划生成器,物理计划优化器,以及对应的执行器,对这个Sql进行一个解析其实这些模块就是将你写的这个HQL翻译成可以执行MR程序首先将这个HQL,翻译成AST抽象语法,然后对这个抽象语法进行逻辑计划的一个生成,然后进行优化,然后再通过物理计划生成器生成物理计划,进行优化,最后一执行就完事了,hive是基于hdfs,它的底层存储是落在HDFS上,并且默认的计算引擎是MR,任务调度用的是Yarn的这一套这个MR引擎,之后我们换成Spark引擎进行计算。 还有就是,我们在hive创建表的时候,通常都会创建外部表,因为外部表在删除数据的时候,只删除数据,不会删除原始数据,起到一个安全作用,那像我们只有自己使用的那种临时表才会创建内部表,那个删了也没事还有像我们在公司中,也大量使用系统函数,我看了一下官网有289,但是我并没有那么多,但是也用了很多,比如date_add、date_sub datediff,last_date、next_date、nvl、 case whenif,等函数,但虽然说有这么多函数,但是在我们这个开发中,还可能遇到一些比较复杂的场景,需要我们去自定义函数来进行实现,比如说可以自定义UDF和UDTF,当然UDAF比较少,那像UDF,就是相当于我们用的算子map,那UDTF相当于Flat map,那UDF自定义的时候非常简单,只需要定义个类继承这个GenericUDF里面就一个核心方法叫evaluate里面写自己的逻辑就行了,当然还有一些初始化关闭一些东西,那UDTF,就是属于这个炸裂,一般的就是定义一个类继承GenericUDTF去写三个方法初始化关闭process,process里面是核心逻辑,初始化里面去声明类型及对应名称就可以了,接着就需要对这个自定义函数进行打包,上传到HDFS路径,然后在hive的客户端进行注册去使用,还有一些窗口函数,像over,它里面可以进行分区,又可以排序,去进行一个开窗处理,用它的时候,统计最多的就是像一些7天内连续3topN,还有这个同时在线人数一些场景。 还有就是在使用hive当中,也对其进行一些优化,比如说如果是大小表join的情况下,这个mapJoin默认打开就不要关闭了还有我们可以提前进行行列过滤,就类似于这个谓词下推,也可以把谓词下推功能开启,可以直接创建分区表,防止后续的全扫描,可以创建分表,当然,可以采用压缩,减少网络上数据的传输,去减少磁盘IO,可以采用列式存储加快查询的速度,可以处理小文件,提前开启merge功能,就是在执行MR程序的时候,产生大量小文件,会单独再开启个MR,将这些小文件进行合并,合并的规则就是小于16 M的文件,就会认为是小文件给它合并到256 M,还有就是如果只有map任务时是默认打开,如果是MR任务,需要手动把它打开,还有像小文件这种的,我们可以开启这个combineHiveInputFormat,将小文件打包在一起,统一进行切片,同时也可以开启JVM重用,减少一些JVM开关的时间。同时我们还可以合理的设置reduce个数,那还就是如果遇到group by的这种场景,要提前开启mapside进行预聚合,还有会将MR引擎更换为Spark引擎来进行相关的一些优化。 我们有时还会遇到数据倾斜,比如说我们之前统计一个各个省份的交易额这种的指标,yarn上就看到了个别省份有的运行的快,有的运行的慢,像快的能比这个慢的时间上差了20倍,像北京广东江苏这些地方,执行的特别慢,那这时候果断的把这个任务停掉,然后开启group bymapside进行预聚合同时又开启了skewgroup by,进行二次聚合还有除了group by这种数据倾斜的问题,我们还遇到这种join产生的数据倾斜,首先是大表join的场景一般就是事实表和维度表join,这种场景特别多,比如订单商品那这种一般就是先开启mapjoin进行处理就可以了,最怕的就是这种大表和大表的join,比如说之前我们统计了一个,加购到支付的一个平均使用时长,这种情况就遇到了这个大表join大表的场景接着我们就去可以开启skewjoin,也可以开启smb 分桶join,但是这个办法,就是他要求两张表必须得是分桶且有序,还有也可以左表随机,右表扩容来解决,但是会增加额外的一些操作,虽然说能够让每个reduce里面处理的数据量少,但是实际的工作量反而是大了,但是能把这个任务执行完,那最牛的解决方式就是从建模上对他进行解决比如说我可以采用这个维度建模当中累积型快照事实表这两张表没有变大的时候,我就开始进行了处理累积的过程,这样的来做就避免了大表join大表这种场景。

接着我们就开始进行正常的建模,首先呢,就是进行数据调研JAVA后台所有的表的数据都拿过来,详细看,看完之后,对这个业务有大概的一些了解,我是怎么看的呢,一般就是模拟我自己是一个用户,然后去在页面上浏览他能干哪些事儿,可以跟后台哪些表有这个联系,了解差不多之后,我就找这个JAVA后台人员进行一个沟通,聊一下我的猜测是否是对的跟他沟通差不多之后,我就跟那个产品经理聊,聊对应的需求。 需求里面有原子指标,派生指标,衍生指标,像 派生指标 = 原子指标 + 统计周期 + 统计粒度 + 业务限定。原子指标 = 业务过程 + 度量值 + 聚合逻辑。按这样一个过程拆这里面的业务逻辑,那如果业务逻辑里面有这个业务,那我就能做,没有这个业务就做不了所以这个跟他沟通清楚,下面就开始这个第二步,明确数据域,其实也是跟需求来的,需求当中有哪些域,就处理哪些域指标,那首先呢,是用户来到网站进行一个登录注册,那就有了用户域,然后准备逛一逛,产生了流量,那流量就用户的这个启动页面动作错误曝光,就是流量域,逛差不多了,就去买东西了,那不久产生了交易,那交易的话就加购、下单、支付、物流、退单等,交易域就出来了,在这个交易得过程当中,又进行了优惠券得使用,活动的参加,可以把他归结为工具域,最后呢,收到货了,用了后可以进行一个互动,去点赞、评价、收藏互动域也出来了。

明确这些数据之后,就是去构建业务总线矩阵,构建业务矩阵就是把事实表往左边一放,上面放上维度表,他们之间有关联的,根据坐标一打勾就

  构建业务总线矩阵之后才开始真正的建模。建模的话,是自下而上,首先从ODS层,然后是DIM层和DWD

ODS层里面就干了三件事。第一件事,保持数据原貌,不做任何修改,起到一个备份的作用,防止数仓上面任何一层发生变化,有最原始的数据,第二件事就是创建分区表,防止后续的全表扫描第三件事,就是采用压缩,减少磁盘的存储空间

接下来就是这个DWD主要处理的就是事实,这里面就涉及到事实表的分类了,主要有三类,事务型事实、周期型快照事实、累积型快照事实表,大多数情况下呢,我们是优先考虑事务性事实表,因为事务性事实表特点就是处理原子操作,是不可再切割的,比如说加购下单支付物流这些,但是他不能解决所有问题,比如说这个连续指标,或者是这个多张事实表关联的场景他不擅长,那这种连续型指标,得用周期型快照,如果是多事实表关联,我们累积型快照,具体的事务型事实表处理标准的四步,第一步选择业务过程,第二步度,第三步确定维度,第四步确定事实,那选择业务过程就选择产品经理感兴趣的,也就是指标当中需要统计这个业务,第二步声明粒度,就一行信息代表什么含义,可以代表一次下单,一个月下单,一年下单,那如果给你的一年下单,那你没法统计一次下单,那只给你一次下单,你就可以统计一年下单,所以这里面要保持最细粒度,只要保证不做聚合操作就行。第三步确定维度,是我们产品经理统计指标当中需要的,比如说我们需要这个用户商品活动时间地区优惠券这些,那就留下,不需要就把它干掉第四确定事实,确定事实表的度量值,什么叫度量值,就是可以累加值,例如个数件数金额,把它确定好,那这些确定完之后,再看周期型快照就是在这个声明粒度时候,变成这个一天,或者是一年,看你自己这个周期是多少,自己确定,那这个累积型快照事实表,只不过是在确定事实的时候确定多个事实表的度量值,就完事了

接着就是DIM层,主要是维度相关的一些处理,就是一般没有度量值只要一些属性信息,这些通常都是维度,那在这里面主要做的比如说对用户做拉链表,因为用户表的特点就是缓慢变化,有的时候一天变化一次,一变化一次,确定,而且数据量还比较大,所以要做拉链表,对它进行处理,拉链表也比较简单,只需要在用户表末尾加上开始日期结束日期,然后进行初始化,接下来跟第二天获取的新增和变化的数据进行关联,之后取出最新的数据放到新分,旧的数据放到旧分区,并且在DIM层我们还进行了维度整合,比如说我们将商品表,商品品表,商品一级分类,二级分类,三级分类整合成商品维度表,将省份地区整合成地区维度表将这个活动信息,活动规则整合成活动维度表就完事了

DWD和DIM层还做了一些ETL清洗,比如说清洗我们用的是HQL,这里面判断一些核心字段不能、一些重复数据的处理、相关数据的脱敏等一系列操作

这之后,整个建模就结束了,那接着我们就开始进行一个指标体系建设它是自上而下先有ADS再有DWSADS我们统计的指标日活新增、留存率,最近7日内连续3日下单用户数各品牌商品收藏次数Top3等等,有了这些指标之后,需要进行拆分,把派生指标变换成原指标 + 统计周期 + 统计粒度 + 业务限定,拆分完之找公共的务过程,统计周期,统计粒度,然后建DWS层宽表,去进行构建,构建完成之后,DWS建完,那整个数仓也就建完了,那建完之后就是要把数仓中处理好的数据往外部系统中导那这里就可以用DataX进行同步,DataX这边还会产生空值问题,如果说你这里面是\N,再往MySql里面导,DataX就有对应的参数。 最终我们用superset进行可视化,虽然说它的页面比较丑,但免费,这点老板比较喜欢,后面调度时我们用的DS,进行数仓的调度,因为它一方面是国产的并且这团队也比较强大,各方面性能也比较OK,最后就选了他,当然在DS里面,每天跑的指标,平时也就100多个节假日的情况下,一般是150到200个左右。 之后数据管理这块,都是我同事做的,可以用atlist,也可以自己实现,自己去解析相关的这个程序。这个不是我主导的了解的不是太多。  这些整完之后呢,就是整个集群的监控,是Prometheus+Grafana,可以监控各个组件,某一个进程挂了可以直接触发报警

这就是整个离线项目的大致过程。​​​​​​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/26916.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习部署:FastDeploy部署教程(CSharp版本)

FastDeploy部署教程(CSharp版本) 1. FastDeploy介绍 FastDeploy是一款全场景、易用灵活、极致高效的AI推理部署工具, 支持云边端部署。提供超过 🔥160 Text,Vision, Speech和跨模态模型📦开箱即用的部署体验&#xf…

消息队列常见问题(1)-如何保障不丢消息

目录 1. 为什么消息队列会丢消息? 2. 怎么保障消息可靠传递? 2.1 生产者不丢消息 2.2 服务端不丢消息 2.3 消费者不丢消息 3. 消息丢失如何快速止损? 3.1 完善监控 3.2 完善止损工具 1. 为什么消息队列会丢消息? 现在主流…

支付模块功能实现(小兔鲜儿)【Vue3】

支付 渲染基础数据 支付页有俩个关键数据,一个是要支付的钱数,一个是倒计时数据(超时不支付商品释放) 准备接口 import request from /utils/httpexport const getOrderAPI (id) > {return request({url: /member/order/$…

PyTorch深度学习实战(10)——过拟合及其解决方法

PyTorch深度学习实战(10)——过拟合及其解决方法 0. 前言1. 过拟合基本概念2. 添加 Dropout 解决过拟合3. 使用正则化解决过拟合3.1 L1 正则化3.2 L2 正则化 4. 学习率衰减小结系列链接 0. 前言 过拟合 (Overfitting) 是指在机器学习中,模型…

android studio内存分析之Memory profiler的使用

目录 Android Studio中内存分析工具Memory profiler的使用1. 打开Memory Profiler2. 工具使用3. 内存选项说明4. 内存性能分析器概览5. 内存计算方式6. 查看内存分配7. 捕获java/kotlin方式查看内存分配8. 堆转储文件导入和导出 内存性能分析器中的泄漏检测 Android Studio中内…

【ArcGIS Pro二次开发】(58):数据的本地化存储

在做村规工具的过程中,需要设置一些参数,比如说导图的DPI,需要导出的图名等等。 每次导图前都需要设置参数,虽然有默认值,但还是需要不时的修改。 在使用的过程中,可能会有一些常用的参数,希望…

Sentinel 2.0 微服务零信任的探索与实践

作者:涯客、十眠 从古典朴素的安全哲学谈起 网络安全现状 现在最常见的企业网络安全架构便是在企业网络边界处做安全防护,而在企业网络内部不做安全防范。这确实为企业的安全建设省了成本也为企业提供了一定的防护能力。但是这类比于现实情况的一个小…

单通道 6GSPS 16位采样DAC子卡模块--【资料下载】

FMC147是一款单通道6.4GSPS(或者配置成2通道3.2GSPS)采样率的12位AD采集、单通道6GSPS(或配置成2通道3GSPS)采样率16位DA输出子卡模块,该板卡为FMC标准,符合VITA57.4规范,该模块可以作为一个理想…

力扣:54. 螺旋矩阵(Python3)

题目: 给你一个 m 行 n 列的矩阵 matrix ,请按照 顺时针螺旋顺序 ,返回矩阵中的所有元素。 来源:力扣(LeetCode) 链接:力扣 示例: 示例 1: 输入:matrix [[1,…

手机便签内容不见了怎么恢复正常?

在日常生活和工作中,很多人都需要随手记录事情,例如家庭琐事、孩子相关的事情、指定时间需要完成的工作任务、会议安排等。当我们需要随时随地记录事情的时候,手机便签应用就是非常不多的选择,我们直接打开手机上的便签APP就可以新…

安全基础 --- https详解 + 数组(js)

CIA三属性:完整性(Confidentiality)、保密性(Integrity)、可用性(Availability),也称信息安全三要素。 https 核心技术:用非对称加密传输对称加密的密钥,然后…

【多线程】synchronized 原理

1. 写在前面 本章节主要介绍 synchronized 的一些内部优化机制,这些机制存在的目的呢就是让 synchronized 这把锁更高效更好用! 2. 锁升级/锁膨胀 JVM 将 synchronized 锁分为以下四种状态: 无锁,偏向锁,轻量级锁&…

服务器测试之GPU shoc-master测试

精简版指导 lspci | grep -i nvidia lspci -s 4f:00.0 -vvv 适用版本 cuda_11.8.0_520.61.05_linux.run cuda-samples-11.8.tar.gz NVIDIA-Linux-x86_64-525.116.04.run 安装: ./NVIDIA-Linux-x86_64-525.116.04.run 查看是否为一拖八:nvidia-smi topo …

算法通关村第四关——最大栈问题解析

力扣716,设计一个最大栈数据结构,既支持栈操作,又支持查找栈中最大元素。 分析: 在最大栈的问题上,除了实现普通栈拥有的方法pop、push、top外,还需要实现getMax方法来找到当前栈里的最大值。为了在最短事件…

【CSS】说说对BFC的理解

目录 一、概念 二、BFC的布局规则 三、设置BFC的常用方式 四、BFC的应用场景 1、解决浮动元素令父元素高度坍塌的问题 2、解决非浮动元素被浮动元素覆盖问题 3、解决外边距垂直方向重合的问题 五、总结 一、概念 我们在页面布局的时候,经常出现以下情况&am…

网络安全进阶学习第十二课——SQL手工注入3(Access数据库)

文章目录 注入流程:1、判断数据库类型2、判断表名3、判断列名4、判断列数1)判断显示位 5、判断数据长度6、爆破数据内容 注入流程: 判断数据库类型 ——> 判断表名 ——> 判断列名 ——> 判断列名长度 ——> 查出数据。 asp的网…

商用服务机器人公司【Richtech Robotics】申请纳斯达克IPO上市

来源:猛兽财经 作者:猛兽财经 猛兽财经获悉,总部位于美国内华达州拉斯维加斯由华人领导的商用服务机器人公司【Richtech Robotics】近期已向美国证券交易委员会(SEC)提交招股书,申请在纳斯达克IPO上市&am…

Linux的shell脚本常用命令

1、前提 使用shell脚本可以将所要执行的命令行进行汇总,统一执行,制作为脚本工具,简化重复性工作 1.1、常用命令 1.1.1、启动命令 假设我们拥有一个halloWord.sh的脚本,通过cd 命令进入相对应的目录下 ./halloWord.sh1.1.2、…

SpringBoot 依赖管理和自动配置---带你了解什么是版本仲裁

😀前言 本篇博文是关于SpringBoot 依赖管理和自动配置,希望能够帮助到您😊 🏠个人主页:晨犀主页 🧑个人简介:大家好,我是晨犀,希望我的文章可以帮助到大家,您…

Vue——webpack

webpack 一、Install1.全局安装2.局部安装 二、总结1.打包2.定义脚本3.配置文件定义(webpack.config.js)4.项目重新加载依赖5.webpack打包Css6.style-loader 一、Install 1.全局安装 npm install webpack webpack-cli -g2.局部安装 以项目为单位,一个项…