文章目录
- 1、rand()和rand(int seed)
- 2、distribute by
- 3、distribute by rand和生成文件数的关系
- set hive.exec.reducers.max 对比 set mapred.reduce.tasks
- 4、distribute by rand的风险
- 5、hive 中什么场景下会使用 distirbute by rand() 呢?
- 学习链接
1、rand()和rand(int seed)
两种随机数生成函数,返回值: [0, 1)的随机浮点数。
说明: 如果指定种子seed,则会得到一个稳定的随机数序列。生成的数字序列中每个数是随机的,但是每次生成的序列是固定的。
2、distribute by
Hive官网解释:Hive uses the columns in Distribute By to distribute the rows among reducers. All rows with the same Distribute By columns will go to the same reducer.
翻译:Hive根据distribute by column中的column值将不同的行数据发送到不同reducer,具有相同的column值的所有行将被发送到相同的reducer中。实质是hash(column)%reducer数。
3、distribute by rand和生成文件数的关系
比如:insert into table b select xxx from a;
该sql语句的最后一个job是一个仅有map阶段的任务(以该情况为例,其他场景可能也会有reduce阶段)。
假如有1000个map,该情况下map任务在往hive分区(一个分区)中写数据的时候,每个map都要产生1个文件;如果用到动态分区,假如有10个分区,数据量在每个分区都比较大的情况下,每个map几乎都要产生10个文件,文件个数呈现乘法的放大;
假如distribute by rand() + set hive.exec.reducers.max = 500(或者set mapred.reduce.tasks = 500);
先对rand取哈希然后对reduce数目(500)取余,保证了每条数据分配到所有reducer的可能性是相等的,这样reducer处理的数据量就是均匀的,在数据量比较大的情况下,每个reducer产生的文件数为动态分区的个数,产生的文件总个数50010(reducer个数分区个数)。
假如使用distribute by pmod(hash(1000*rand()), 80) + set mapred.reduce.tasks = 200;
此时 200>80,会有200个redurcer启动,但是只有80个实际接收了数据(即distribute by rand可以控制生成文件的数量);
# 同时,如果设置
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=256000000;
set hive.merge.smallfiles.avgsize=64000000;
# 在落地前可能会进行文件合并,落地文件大小是256m,落地文件数<=min(reduce数*分区个数,distribute by pmod(hash(1000*rand()), xx)*分区个数);
set hive.exec.reducers.max 对比 set mapred.reduce.tasks
1)相同点:
这两个参数均与Hive作业中的reduce任务数量有关。
2)不同点:
set hive.exec.reducers.max是Hive层面的参数,用于限制Hive作业推测出的reduce任务的最大数量。指定10可能是10、也可能是7。
set mapred.reduce.tasks是Hadoop MapReduce框架层面的参数,用于指定一个MapReduce作业的reduce任务数量。指定1就是1,指定10就是10。
4、distribute by rand的风险
distribute by rand() 使用不带随机种子的函数,在mapreduce框架下这种使用方式存在一定风险,可能导致部分数据丢失或者重复;
具体原理如下:
部分reduce shuffle失败导致上游个别map task重新生成数据,如果shuffle分片逻辑包含随机因子,会导致新生成的数据shuffle分片与之前不完全一致,进而导致部分数据重复读取或者数据丢失。新生成的shuffle分片内数据会漏数据也会包含分发到其他reducer的数据。(reduce task从每个map task的结果文件中拉取对应分区的数据。数据在map阶段已经是分好区了,并且会有一个额外的索引文件记录每个分区的起始偏移量。所以reduce task取数的时候直接根据偏移量去拉取数据。)
重要!!! 建议使用rand(int seed)设置随机种子。如果shuffle过程中个别reducer失败,对应上游mapper重试发送数据时,使用带种子的随机函数可以保证重试时生成的随机值相同,发送到该reducer的数据和之前一致。
5、hive 中什么场景下会使用 distirbute by rand() 呢?
在Hive中,DISTRIBUTE BY 子句与 RAND() 函数一起使用时,通常是为了将数据随机(近似均匀)分布到不同的Reduce中。
1)负载均衡:当数据量非常大,并且需要在多个reduce任务之间平衡负载时,使用 DISTRIBUTE BY RAND() 可以随机(近似均匀)分配数据,从而避免某些reduce任务过载而其他任务却空闲的情况。
2)小文件问题:在Hive中,如果有很多小文件,它们可能会导致大量的Seek操作(指HDFS在处理文件时的开销),从而降低查询性能。使用 DISTRIBUTE BY RAND() 可以将数据随机写入更少的文件中,减少小文件的数量(distribute by rand可以控制生成文件的数量;见3、)。
3)数据倾斜:在某些情况下,数据倾斜可能导致某些键(key)的数据量远大于其他键。通过随机分布数据,可以减少数据倾斜的影响。
4)测试和样例:在开发或测试阶段,可能需要从生产表中随机抽取一部分数据进行测试。使用 DISTRIBUTE BY RAND 可以快速创建一个随机样本(每个reduce任务独立随机抽样,而非全局随机抽样)。
-- 使用 DISTRIBUTE BY RAND() 来获取随机样本的 Hive SQL 示例
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/sample_dir'
SELECT *
FROM source_table
DISTRIBUTE BY RAND()
LIMIT 1000;
-- 由于 DISTRIBUTE BY RAND() 会随机分配行到reduce任务,每个reduce任务处理一部分数据,并且 LIMIT 是在每个reduce任务的结果上独立执行的,因此每个reduce任务将返回一定数量的行,最终合并后形成整个随机样本。
学习链接
https://zhuanlan.zhihu.com/p/252776975