HIVE 优化浅谈
hive不怕数据量大,导致运行慢的主要原因是数据倾斜。hive的运行机制这里就不再赘述,咱们直入正题,聊一下hive的优化方法。
优化点一:业务逻辑优化
1.去除冗余逻辑
对于复杂业务逻辑来说,在非数据倾斜的情况下,最有效的优化方式就是对业务逻辑的优化,去掉冗余的逻辑过程或无用的中间过程,能一步完成的不要分两步。尤其对于旧逻辑优化及数据迁移工作中较为常见。
2.重复逻辑落临时表
复杂的业务场景很可能会有复用的逻辑,把重复的逻辑落入临时表中不仅能减少资源消耗,还能有利于后期的代码维护。
优化点二:配置合理的参数
1.在hive-site.xml里面有个配置参数叫:
hive.fetch.task.conversion
配置成more,简单查询就不走map/reduce了,设置为minimal,就任何简单select都会走map/reduce;
2.其他见优化点九、十、十四
优化点三:设置合理的map reduce的task数量
1. map task数量设置
mapred.min.split.size: 指的是数据的最小分割单元大小;min的默认值是1B
mapred.max.split.size: 指的是数据的最大分割单元大小;max的默认值是128MB
通过调整max可以起到调整map数的作用:减小max可以增加map数,增大max可以减少map数。
*注意*:直接调整mapred.map.tasks这个参数是没有效果的。
因为map任务启动和初始化的时间远远大于逻辑处理的时间,如果map数过多,就会造成很大的资源浪费,同时可执行的map数是受限的,这样就会造成运行过慢。如果map数过少,一个map要处理上千万行的数据也肯定没有多个map并行处理的快,这种情况也需要优化。所以我们需要合理地调整map数。
1.1 减少map数
如果数据中有大量文件,每个文件都远小于128M,为避免每个小文件开启一个map程序,我们可以通过如下配置合并小文件,从而减少map数:
set mapred.max.split.size=100000000;(100M)
set mapred.min.split.size.per.node=100000000;(100M)
set mapred.min.split.size.per.rack=100000000;(100M)
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
其中:set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;表示执行前进行小文件合并,
前面三个参数确定合并文件块的大小,大于文件block大小128m的,按照128m来分隔;小于128m,大于100m的,按照100m来分隔;把那些小于100m的(包括小文件和分隔大文件剩下的)进行合并。
1.2 增加map数
当input的文件都很大,任务逻辑复杂或者字段少造成行数过多,map执行非常慢的情况下,可以考虑增加Map数,来减少每个map的任务量,从而提高执行效率。
2. reduce task数量设置
Reduce的个数对整个作业的运行性能有很大影响。如果Reduce数量设置的过多,那么将会产生很多小文件,对NameNode会产生一定的影响,而且整个作业的运行时间未必会减少;如果Reduce设置的过小,那么单个Reduce处理的数据将会加大,很可能会引起OOM异常。
如果设置了mapred.reduce.tasks/mapreduce.job.reduces参数,那么Hive会直接使用它的值作为Reduce的个数;如果mapred.reduce.tasks/mapreduce.job.reduces的值没有设置(也就是-1),那么Hive会根据输入文件的大小估算出Reduce的个数。根据输入文件估算Reduce的个数可能未必很准确,因为Reduce的输入是Map的输出,而Map的输出可能会比输入要小,所以最准确的数根据Map的输出估算Reduce的个数。
不指定reduce个数的情况下,Hive会基于以下两个设定reduce个数:
参数1:hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)参数2:hive.exec.reducers.max(每个任务最大的reduce数,默认为999)
计算reducer数的公式很简单N=min(参数2,总输入数据量/参数1)
即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;
2.1 注意 1:reduce个数并不是越多越好;
同map一样,启动和初始化reduce也会消耗时间和资源;
另外,有多少个reduce,就会有个多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
2.2 注意2 :有的逻辑只会产生一个reduce
只有一个reduce的情况也比较多,一种情况是数据量比较少,这种当然无需优化。但有的时候我们会发现不管数据量多大,无论怎么调整reduce个数的参数,任务中一直都只有一个reduce任务,例如:
- 没有group by的汇总,比如:
select pt,count(1) from a where pt = '20220215' group by pt; -- 多个reduce
select count(1) from a where pt = '20220215'; -- 单个reduce
- 用了全局排序:Order by
- 有笛卡尔集
3.小文件合并
小文件数目过多,容易在文件存储端造成瓶颈,给HDFS带来压力,影响处理效率。对此,可以通过合并Map和Reduce的结果文件来消除这样的影响。
用于设置合并的参数有:
是否合并Map输出文件:hive.merge.mapfiles=true(默认值为true)
是否合并Reduce端输出文件:hive.merge.mapredfiles=false(默认值为false)
合并文件的大小:hive.merge.size.per.task=256*1000*1000(默认值为256000000)
小文件是如何产生的:
1.动态分区插入数据,产生大量的小文件,从而导致map数量剧增;
2.reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的);
3.数据源本身就包含大量的小文件。
小文件问题的影响:
1.从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。
2.在HDFS中,每个小文件对象约占150byte,如果小文件过多会占用大量内存。这样NameNode内存容量严重制约了集群的扩展。
小文件问题的解决方案:
从小文件产生的途径就可以从源头上控制小文件数量,方法如下:
1.使用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件;
2.减少reduce的数量(可以使用参数进行控制);
3.少用动态分区,用时记得按distribute by分区,distribute by 可以控制分发到reduce的方式,避免数据倾斜。
对于已有的小文件,我们可以通过以下几种方案解决:
1.使用hadoop archive命令把小文件进行归档;
2.重建表,建表时减少reduce数量;
3.通过参数进行调节,设置map/reduce端的相关参数,如下:
//每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;
//一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;
//一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;
//执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; 设置map输出和reduce输出进行合并的相关参数:
[java] view plain copy
//设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true
//设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true
//设置合并文件的大小
set hive.merge.size.per.task = 256*1000*1000
//当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。
set hive.merge.smallfiles.avgsize=16000000
优化点四:sql优化
1.减少读取数据量,节约读取开销
1.1 列裁剪
数据量较大的情况下,使用具体的所需字段代替select * 操作。
比如黑科技清单表中有a,b,c,d…z 26列数据,我们需要使用其中的a,b,c三列,使用 "select a,b,c from 黑科技清单"的性能要比"select * from 黑科技清单"好。实际上生产环境一般也会禁止使用select * 操作的。
1.2 分区裁剪
对于数据量比较大的表,我们一般会进行分区。业务查询过程中通过选择所需分区的方式过滤数据量,也会对查询性能有明显的提升。
1.3 过滤操作
如果表数据量较大,需要把对业务没有价值的数据过滤掉再进行关联等其他查询操作。
2. 用group by 语句替代COUNT(DISTINCT)
1.数据量小的时候COUNT(DISTINCT)性能可能比group by 好,因为group by语句需要子查询,里外层两个select 会产生两个job。
2.数据量大的情况下,由于COUNT DISTINCT操作需要只用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般这种情况下,COUNT DISTINCT需要采用先GROUP BY再COUNT的方式替换。
3.优化in/exists语句
hive1.2.1后支持了in/exists操作,但还是推荐使用hive的一个高效替代方案:left semi join,例如:
select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a where exists (select id from b where a.id = b.id);
应该转换成:
select a.id, a.name from a left semi join b on a.id = b.id;
4. Group by操作
默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。进行GROUP BY操作时需要注意以下几点:
Map端部分聚合
事实上并不是所有的聚合操作都需要在reduce部分进行,很多聚合操作都可以先在Map端进行部分聚合,然后reduce端得出最终结果。
(1)开启Map端聚合参数设置set hive.map.aggr=true
(2)在Map端进行聚合操作的条数,group的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置set hive.grouby.mapaggr.checkinterval=100000
(3)有数据倾斜的时候进行负载均衡(默认是false)set hive.groupby.skewindata = true
有数据倾斜时进行负载均衡
此处需要设定hive.groupby.skewindata,当选项设定为true时,生成的查询计划有两个MapReduce任务。在第一个MapReduce中,map的输出结果集合会随机分布到reduce中,每个reduce做部分聚合操作,并输出结果。这样处理的结果是,相同的Group By Key有可能分发到不同的reduce中,从而达到负载均衡的目的;第二个MapReduce任务再根据预处理的数据结果按照Group By Key分布到reduce中(这个过程可以保证相同的Group By Key分布到同一个reduce中),最后完成最终的聚合操作。
5. 利用Hive对UNION ALL优化的特性
多表union all会优化成一个job。
问题:比如推广效果表要和商品表关联,效果表中的auction_id列既有32位字符串商品id,也有数字id,和商品表关联得到商品的信息。
解决方法:Hive SQL性能会比较好
SELECT *
FROM effect a
JOIN(SELECT auction_id AS auction_id FROM auctionsUNION ALLSELECT auction_string_id AS auction_id FROM auctions
) bON a.auction_id=b.auction_id
比分别过滤数字id,字符串id然后分别和商品表关联性能要好。商品表只读一次,推广效果表只读取一次。
6.解决Hive对UNION ALL优化的短板
Hive对union all的优化的特性:对union all优化只局限于非嵌套查询。
6.1 消灭子查询内的group by
SELECT *
FROM(SELECT * FROM t1 GROUP BY c1,c2,c3UNION ALLSELECT * FROM t2 GROUP BY c1,c2,c3
) t3
GROUP BY c1,c2,c3
从业务逻辑上说,子查询内的GROUP BY怎么看都是多余(功能上的多余,除非有COUNT(DISTINCT)),如果不是因为Hive Bug或者性能上的考量(曾经出现如果不执行子查询GROUP BY,数据得不到正确的结果的Hive Bug)。所以这个Hive按经验转换成如下所示:
SELECT * FROM (SELECT * FROM t1 UNION ALL SELECT * FROM t2) t3 GROUP BY c1,c2,c3
经过测试,并未出现union all的Hive Bug,数据是一致的。MapReduce的作业数由3减少到1。
t1相当于一个目录,t2相当于一个目录,对Map/Reduce程序来说,t1、t2可以作为Map/Reduce作业的mutli inputs。这可以通过一个Map/Reduce来解决这个问题。Hadoop的计算框架,不怕数据多,就怕作业数多。
但如果换成是其他计算平台如Oracle,那就不一定了,因为把大输入拆成两个输入,分别排序汇总成merge(假如两个子排序是并行的话),是有可能性能更优的(比如希尔排序比冒泡排序的性能更优)。
6.2 消灭子查询内的COUNT(DISTINCT),MAX,MIN
6.3 消灭子查询内的JOIN
SELECT * FROM(SELECT * FROM t1UNION ALLSELECT * FROM t4UNION ALLSELECT * FROM t2 JOIN t3ON t2.id=t3.id
) x
GROUP BY c1,c2;
-- 上面代码运行会有5个jobs。加入先JOIN生存临时表的话t5,然后UNION ALL,会变成2个jobs。
INSERT OVERWRITE TABLE t5
SELECT * FROM t2 JOIN t3 ON t2.id=t3.id;
SELECT * FROM (t1 UNION ALL t4 UNION ALL t5);
-- 调优结果显示:针对千万级别的广告位表,由原先5个Job共15分钟,分解为2个job,一个8-10分钟,一个3分钟。
7. JOIN操作
7.1 小表、大表JOIN(新版本已经优化,纪念一下青春吧)
在使用写有Join操作的查询语句时有一条原则:应该将条目少的表/子查询放在Join操作符的左边。原因是在Join操作的Reduce阶段,位于Join操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生OOM错误的几率;再进一步,可以使用Group让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。
实际测试发现:新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。小表放在左边和右边已经没有明显区别。
优化点五:使用向量化查询
向量化查询执行通过一次性批量执行1024行而不是每次单行执行,从而提供扫描、聚合、筛选器和连接等操作的性能。在Hive 0.13中引入,此功能显着提高了查询执行时间,并可通过两个参数设置轻松启用:
设置hive.vectorized.execution.enabled = true;设置hive.vectorized.execution.reduce.enabled = true;
向量化查询只支持orc等列存储格式。
优化点六:选择引擎
Hive可以使用Apache Tez执行引擎而不是古老的Map-Reduce引擎。在环境中没有默认打开,在Hive查询开头将以下内容设置为‘true’来使用Tez:
使用Tez 引擎:
set hive.execution.engine = tez;
使用spark 引擎:
set hive.execution.engine = spark;
优化点七:存储格式
存储格式 | 存储方式 | 特点 |
---|---|---|
TextFile | 行存储 | 存储空间消耗比较大,并且压缩的text 无法分割和合并 查询的效率最低,可以直接存储,加载数据的速度最高 |
SequenceFile | 行存储 | 存储空间消耗最大,压缩的文件可以分割和合并 查询效率高,需要通过text文件转化来加载 |
RCFile | 数据按行分块 每块按列存储 | 存储空间最小, 查询的效率最高 , 需要通过text文件转化来加载, 加载的速度最低。 压缩快 快速列存取。 读记录尽量涉及到的block最少 读取需要的列只需要读取每个row group 的头部定义。 读取全量数据的操作 性能可能比sequencefile没有明显的优势 |
ORCFile | 数据按行分块 每块按列存储 | 压缩快,快速列存取 ,效率比rcfile高,是rcfile的改良版本 |
Parquet | 列存储 | 相对于PRC,Parquet压缩比较低,查询效率较低,不支持update、insert和ACID.但是Parquet支持Impala查询引擎 |
推荐使用orcFile; |
优化点八:压缩格式
大数据场景下存储格式压缩格式尤为关键,可以提升计算速度,减少存储空间,降低网络io,磁盘io,所以要选择合适的压缩格式和存储格式。
压缩比率,压缩解压缩速度,是否支持Split,这三点是选择压缩格式要考虑的要素。
优化点九:运行模式选择
1.本地模式
对于大多数情况,Hive可以通过本地模式在单台机器上处理所有任务。对于小数据,执行时间可以明显被缩短。开启本地模式,简单查询将不会提交到yarn。
set hive.exec.mode.local.auto=true; -- 开启本地mr
-- 设置local mr的最大输入数据量,当输入数据量小于这个值时采用local mr的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
-- 设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;
2.并行模式
Hive会将一个查询转化成一个或多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。默认情况下,Hive一次只会执行一个阶段,由于job包含多个阶段,而这些阶段并非完全相互依赖,即:这些阶段可以并行执行,可以缩短整个job的执行时间。设置参数,set hive.exec.parallel=true,或者通过配置文件来完成:
3.严格模式
Hive提供一个严格模式,可以防止用户执行那些可能产生意想不到的影响查询,通过设置Hive.mapred.modestrict来完成。
set hive.mapred.mode=strict;
3.1 设置为严格模式后,可以禁止3种类型的查询:
a.不带分区或过滤的分区表查询
如果在一个分区表执行查询,除非where语句中包含分区字段过滤条件来显示数据范围,否则不允许执行。换句话说就是在严格模式下不允许用户扫描所有的分区。
b. 带有order by的查询
对于使用了orderby的查询,要求必须有limit语句。因为orderby为了执行排序过程会将所有的结果分发到同一个reducer中
进行处理,强烈要求用户增加这个limit语句可以防止reducer额外执行很长一段时间。
c. 限制笛卡尔积的查询
严格模式下,进行笛卡尔积的查询会报错,必须要有on条件。
优化点十:JVM重用
JVM重用是Hadoop调优参数的内容,其对Hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或task特别多的场景,这类场景大多数task执行时间都很短。
Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。
<property><name>mapreduce.job.jvm.numtasks</name><value>10</value><description>How many tasks to run per jvm. If set to -1, there is no limit. </description>
</property>
我们也可以在hive当中通过
set mapred.job.reuse.jvm.num.tasks=10;
这个设置来设置我们的jvm重用。
当然,这个功能也是有它的缺点的。开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。
优化点十一:推测执行
在分布式集群环境下,因为程序Bug(包括Hadoop本身的bug),负载不均衡或者资源分布不均等原因,会造成同一个作业的多个任务之间运行速度不一致,有些任务的运行速度可能明显慢于其他任务(比如一个作业的某个任务进度只有50%,而其他所有任务已经运行完毕),则这些任务会拖慢作业的整体执行进度。为了避免这种情况发生,Hadoop采用了推测执行(Speculative Execution)机制,它根据一定的法则推测出“拖后腿”的任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
关于调优这些推测执行变量,还很难给一个具体的建议。如果用户对于运行时的偏差非常敏感的话,那么可以将这些功能关闭掉。如果用户因为输入数据量很大而需要执行长时间的map或者Reduce task的话,那么启动推测执行造成的浪费是非常巨大大。数据量过于庞大,备份task有可能直接打垮集群。
优化点十二:分区、分桶表
1.对于一张比较大的表,将其设计成分区表,避免全表扫描。
2.当很难在列上创建分区时,我们会使用分桶,比如某个经常被筛选的字段,如果将其作为分区字段,会造成大量的分区。在Hive中,会对分桶字段进行哈希,从而提供了中额外的数据结构,进行提升查询效率。
与分区表类似,分桶表的组织方式是将HDFS上的文件分割成多个文件。分桶可以加快数据采样,也可以提升join的性能(join的字段是分桶字段),因为分桶可以确保某个key对应的数据在一个特定的桶内(文件),所以巧妙地选择分桶字段可以大幅度提升join的性能。通常情况下,分桶字段可以选择经常用在过滤操作或者join操作的字段。
我们可以使用set.hive.enforce.bucketing = true启用分桶设置。
当使用分桶表时,最好将bucketmapjoin标志设置为true,具体配置参数为:
SET hive.optimize.bucketmapjoin = true
优化点十三:对中间数据启用压缩
复杂的Hive查询通常会转换为一系列多阶段的MapReduce作业,并且这些作业将由Hive引擎连接起来以完成整个查询。因此,此处的“中间输出”是指上一个MapReduce作业的输出,它将用作下一个MapReduce作业的输入数据。
压缩可以显著减少中间数据量,从而在内部减少了Map和Reduce之间的数据传输量。
我们可以使用以下属性在中间输出上启用压缩。
set hive.exec.compress.intermediate=true;set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;set hive.intermediate.compression.type=BLOCK;
为了将最终输出到HDFS的数据进行压缩,可以使用以下属性:
set hive.exec.compress.output=true;
优化点十四:谓词下推
比如下面的查询:
select a.*,b.*
from a join b on a.col1 = b.col1
where a.col1 > 15 and b.col2 > 16
如果没有谓词下推,则在完成JOIN处理之后将执行过滤条件(a.col1> 15 and b.col2> 16)。因此,在这种情况下,JOIN将首先发生,并且可能产生更多的行,然后在进行过滤操作。
使用谓词下推,这两个谓词**(a.col1> 15和b.col2> 16)**将在JOIN之前被处理,因此它可能会从a和b中过滤掉连接中较早处理的大部分数据行,因此,建议启用谓词下推。
通过将hive.optimize.ppd设置为true可以启用谓词下推。
SET hive.optimize.ppd=true
如果使用外连接,则谓词下推会失效
select a.id,a.c1,b.c2 from a
left join b on a.id=b.id
where b.dt >= '20181201' and b.dt <'20190101'
优化点十五:基于成本的优化
Hive在提交最终执行之前会优化每个查询的逻辑和物理执行计划。基于成本的优化会根据查询成本进行进一步的优化,从而可能产生不同的决策:比如如何决定JOIN的顺序,执行哪种类型的JOIN以及并行度等。
可以通过设置以下参数来启用基于成本的优化。
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
可以使用统计信息来优化查询以提高性能。基于成本的优化器(CBO)还使用统计信息来比较查询计划并选择最佳计划。通过查看统计信息而不是运行查询,效率会很高。
收集表的列统计信息:
ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS;
查看my_db数据库中my_table中my_id列的列统计信息:
DESCRIBE FORMATTED my_db.my_table my_id
优化点十六:insert into操作
1.插入数据量较大
如果插入过程中有多个union all(union all个数大于2),或者插入的数据量比较大,应该拆成多个insert into 语句并行执行,实际测试过程中,执行时间能提升50%。
2.多次扫描表
INSERT INTO temp_table_20201115 SELECT * FROM my_table WHERE dt ='2020-11-15';
INSERT INTO temp_table_20201116 SELECT * FROM my_table WHERE dt ='2020-11-16';
如上面的逻辑,将一张表多次查询写入多张表中,多次读表会消耗很多性能,我们可以优化为读一次表写入多个表:
FROM my_tableINSERT
INSERT INTO temp_table_20201115 SELECT * WHERE dt ='2020-11-15'
INSERT INTO temp_table_20201116 SELECT * WHERE dt ='2020-11-16'
优化点十七:好的模型设计
优化点十八:良好的sql开发能力
参考文献:
https://www.cnblogs.com/swordfall/p/11037539.html
https://blog.csdn.net/weixin_44318830/article/details/103336579
https://baijiahao.baidu.com/s?id=1721265748021502455&wfr=spider&for=pc
https://www.cnblogs.com/liuxinrong/articles/12695181.html
https://www.cnblogs.com/junstudys/p/10056830.html