在大数据领域,Hive SQL 是一种常用的查询语言,用于在 Hadoop上进行数据分析和处理。为了确保代码的可读性、维护性和性能,制定一套规范化的 Hive SQL 开发规范至关重要。本文将介绍 Hive SQL 的基础知识,并提供一些规范化的开发指南,帮助您高效地编写 Hive SQL 查询。
本系列分为
Hive SQL 开发指南(一)数据类型及函数
Hive SQL 开发指南(二)使用(DDL、DML,DQL)
Hive SQL 开发指南(三)优化及常见异常
一、数据定义命令(DDL)
建表
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name[(col_name data_type [COMMENT col_comment], ...)][COMMENT table_comment][PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)][CLUSTERED BY (col_name, col_name, ...)[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS][ROW FORMAT row_format][STORED AS file_format][LOCATION hdfs_path]
注意事项
- CREATE TABLE 创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用 IF NOT EXIST 选项来忽略这个异常
- EXTERNAL 关键字可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径(LOCATION)
- LIKE 允许用户复制现有的表结构,但是不复制数据
- COMMENT可以为表与字段增加描述
- ROW FORMAT
DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char]
[MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
| SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]
用户在建表的时候可以自定义 SerDe 或者使用自带的 SerDe。如果没有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,将会使用自带的 SerDe。在建表的时候,用户还需要为表指定列,用户在指定表的列的同时也会指定自定义的 SerDe,Hive 通过 SerDe 确定表的具体的列的数据。
6.STORED AS
SEQUENCEFILE
| TEXTFILE
| RCFILE
| INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname
如果文件数据是纯文本,可以使用 STORED AS TEXTFILE。如果数据需要压缩,使用 STORED AS SEQUENCE 。
使用示例
创建简单表:
CREATE TABLE pokes (foo INT, bar STRING);
创建外部表:
CREATE EXTERNAL TABLE page_view(viewTime INT, userid BIGINT,page_url STRING, referrer_url STRING,ip STRING COMMENT 'IP Address of the User',country STRING COMMENT 'country of origination')COMMENT 'This is the staging page view table'ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054'STORED AS TEXTFILELOCATION '<hdfs_location>';
建分区表
CREATE TABLE par_table(viewTime INT, userid BIGINT,page_url STRING, referrer_url STRING,ip STRING COMMENT 'IP Address of the User')COMMENT 'This is the page view table'PARTITIONED BY(date STRING, pos STRING)ROW FORMAT DELIMITED ‘\t’FIELDS TERMINATED BY '\n'STORED AS SEQUENCEFILE;
建Bucket表
CREATE TABLE par_table(viewTime INT, userid BIGINT,page_url STRING, referrer_url STRING,ip STRING COMMENT 'IP Address of the User')COMMENT 'This is the page view table'PARTITIONED BY(date STRING, pos STRING)CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETSROW FORMAT DELIMITED ‘\t’FIELDS TERMINATED BY '\n'STORED AS SEQUENCEFILE;
创建表并创建索引字段ds
CREATE TABLE invites (foo INT, bar STRING) PARTITIONED BY (ds STRING);
复制一个空表
CREATE TABLE empty_key_value_storeLIKE key_value_store;
修改表结构
增加分区、删除分区
增加
alter table 表名 add if not exists partition(dt='2024-02-12');
alter table table1 add if not exists partition(dt='2024-02-12') location '/dws/table1/dt=2024-02-12';
删除
alter table 表名 drop if exists partition(dt='2024-03-06');
重命名表
ALTER TABLE table_name RENAME TO new_table_name
修改列的名字、类型、位置、注释
ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment] [FIRST|AFTER column_name]
增加/更新列
ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...)
增加表的元数据信息
ALTER TABLE table_name SET TBLPROPERTIES table_properties table_properties::[property_name = property_value…..]
删表
DROP TABLE [IF EXISTS] table_name
二、 数据操作命令(DML)
LOAD DATA
语法规则
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
使用说明
- LOAD DATA命令主要用于装载已有文件到新的TABLE中,只是拷贝或搬移文件,并不做内容的校验。
- 语法规则中,LOCAL表示从本地文件系统LOAD文件,否则就是从HDFS中取文件,OVERWRITE表示覆盖已有的数据。
使用示例
LOAD DATA LOCAL INPATH './examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;
INSERT
基本模式
INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement
多插入模式
FROM from_statementINSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1[INSERT OVERWRITE TABLE tablename2 [PARTITION ...] select_statement2] ...
将查询结果写入HDFS文件系统
INSERT OVERWRITE [LOCAL] DIRECTORY directory1 SELECT ... FROM ...FROM from_statementINSERT OVERWRITE [LOCAL] DIRECTORY directory1 select_statement1[INSERT OVERWRITE [LOCAL] DIRECTORY directory2 select_statement2]
INSERT INTO
INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement
使用示例
将查询数据输出至目录:
INSERT OVERWRITE DIRECTORY '/tmp/hdfs_out' SELECT a.* FROM invites a WHERE a.ds='<DATE>';
将查询结果输出至本地目录:
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/local_out' SELECT a.* FROM pokes a;
选择所有列到本地目录 :
hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a;
hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a WHERE a.key < 100;
hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/reg_3' SELECT a.* FROM events a;
hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_4' select a.invites, a.pokes FROM profiles a;
hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT COUNT(1) FROM invites a WHERE a.ds='<DATE>';
hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT a.foo, a.bar FROM invites a;
hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/sum' SELECT SUM(a.pc) FROM pc1 a;
将一个表的统计结果插入另一个表中:
hive> FROM invites a INSERT OVERWRITE TABLE events SELECT a.bar, count(1) WHERE a.foo > 0 GROUP BY a.bar;
hive> INSERT OVERWRITE TABLE events SELECT a.bar, count(1) FROM invites a WHERE a.foo > 0 GROUP BY a.bar;
JOIN
hive> FROM pokes t1 JOIN invites t2 ON (t1.bar = t2.bar) INSERT OVERWRITE TABLE events SELECT t1.bar, t1.foo, t2.foo;
将多表数据插入到同一表中:
FROM src
INSERT OVERWRITE TABLE dest1 SELECT src.* WHERE src.key < 100
INSERT OVERWRITE TABLE dest2 SELECT src.key, src.value WHERE src.key >= 100 and src.key < 200
INSERT OVERWRITE TABLE dest3 PARTITION(ds='2008-04-08', hr='12') SELECT src.key WHERE src.key >= 200 and src.key < 300
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/dest4.out' SELECT src.value WHERE src.key >= 300;
将文件流直接插入文件:
hive> FROM invites a INSERT OVERWRITE TABLE events SELECT TRANSFORM(a.foo, a.bar) AS (oof, rab) USING '/bin/cat' WHERE a.ds > '2008-08-09';
三、数据查询DQL
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
FROM table_reference
[WHERE where_condition]
[GROUP BY col_list [HAVING condition]]
[ CLUSTER BY col_list| [DISTRIBUTE BY col_list] [SORT BY| ORDER BY col_list]
]
[LIMIT number]
使用说明
1)使用ALL和DISTINCT选项区分对重复记录的处理。默认是ALL,表示查询所有记录。DISTINCT表示去掉重复的记录
2)Where 条件类似我们传统SQL的where 条件,目前支持 AND,OR ,0.9版本支持between,IN, NOT IN,不支持EXIST ,NOT EXIST
3)ORDER BY与SORT BY的不同
ORDER BY 全局排序,只有一个Reduce任务
SORT BY 只在本机做排序
4)Limit
Limit 可以限制查询的记录数
例如:SELECT * FROM t1 LIMIT 5
实现Top k 查询,下面的查询语句查询销售记录最大的 5 个销售代表。
例如:
SET mapred.reduce.tasks = 1
SELECT * FROM test SORT BY amount DESC LIMIT 5
5)REGEX Column Specification
SELECT 语句可以使用正则表达式做列选择,下面的语句查询除了 ds 和 hr 之外的所有列:
SELECT `(ds|hr)?+.+` FROM test
基于Partition的查询
使用说明
1)一般 SELECT 查询会扫描整个表,使用 PARTITIONED BY 子句建表,查询就可以利用分区剪枝(input pruning)的特性
2)Hive 当前的实现是,只有分区断言出现在离 FROM 子句最近的那个WHERE 子句中,才会启用分区剪枝
JOIN
语法规则
join_table: table_reference JOIN table_factor [join_condition] | table_reference {LEFT|RIGHT|FULL} [OUTER] JOIN table_reference join_condition | table_reference LEFT SEMI JOIN table_reference join_condition table_reference: table_factor | join_table table_factor: tbl_name [alias] | table_subquery alias | ( table_references ) join_condition: ON equality_expression ( AND equality_expression )* equality_expression: expression = expression
使用说明
1)Hive 只支持等值连接(equality joins)、外连接(outer joins)和(left semi joins)。Hive 不支持所有非等值的连接,因为非等值连接非常难转化到 map/reduce 任务
2)LEFT,RIGHT和FULL OUTER关键字用于处理join中空记录的情况
3)LEFT SEMI JOIN 是 IN/EXISTS 子查询的一种更高效的实现
4)join 时,每次 map/reduce 任务的逻辑是这样的:reducer 会缓存 join 序列中除了最后一个表的所有表的记录,再通过最后一个表将结果序列化到文件系统
5)实践中,应该把最大的那个表写在最后
6)join 查询时,需要注意几个关键点
6.1) 只支持等值joinSELECT a.* FROM a JOIN b ON (a.id = b.id) SELECT a.* FROM a JOIN b ON (a.id = b.id AND a.department = b.department)
6.2) 可以 join 多于 2 个表,例如
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
6.3) 如果join中多个表的 join key 是同一个,则 join 会被转化为单个 map/reduce 任务
7)LEFT,RIGHT和FULL OUTER
例子SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key)
7.1) 如果你想限制 join 的输出,应该在 WHERE 子句中写过滤条件——或是在 join 子句中写
7.2) 容易混淆的问题是表分区的情况
SELECT c.val, d.val FROM c LEFT OUTER JOIN d ON (c.key=d.key) WHERE a.ds='2010-07-07' AND b.ds='2010-07-07‘
如果 d 表中找不到对应 c 表的记录,d 表的所有列都会列出 NULL,包括 ds 列。也就是说,join 会过滤 d 表中不能找到匹配 c 表 join key 的所有记录。这样的话,LEFT OUTER 就使得查询结果与 WHERE 子句无关
解决办法SELECT c.val, d.val FROM c LEFT OUTER JOIN d ON (c.key=d.key AND d.ds='2009-07-07' AND c.ds='2009-07-07')
8)LEFT SEMI JOIN
LEFT SEMI JOIN 的限制是, JOIN 子句中右边的表只能在 ON 子句中设置过滤条件,在 WHERE 子句、SELECT 子句或其他地方过滤都不行SELECT a.key, a.value FROM a WHERE a.key in (SELECT b.key FROM B);可以被重写为:SELECT a.key, a.val FROM a LEFT SEMI JOIN b on (a.key = b.key)
9)UNION ALL
用来合并多个select的查询结果,需要保证select中字段须一致
select_statement UNION ALL select_statement UNION ALL select_statement ...
四、HIVE SQL使用注意事项
HIVE不支持非等值连接
-- SQL中对两表内联可以写成:
select * from dual a,dual b where a.key = b.key;
-- Hive中应为
select * from dual a join dual b on a.key = b.key;
-- 而不是传统的格式:
SELECT t1.a1 as c1, t2.b1 as c2FROM t1, t2 WHERE t1.a2 = t2.b2
分号字符
分号是SQL语句结束标记,在HiveQL中也是,但是在HiveQL中,对分号的识别没有那么智慧,例如:
select concat(key,concat(';',key)) from dual;
但HiveQL在解析语句时提示:
FAILED: Parse Error: line 0:-1 mismatched input '<EOF>' expecting ) in function specification
解决的办法是,使用分号的八进制的ASCII码进行转义,那么上述语句应写成:
select concat(key,concat('\073',key)) from dual;
IS [NOT] NULL
SQL中null代表空值, 值得警惕的是, 在HiveQL中String类型的字段若是空(empty)字符串, 即长度为0, 那么对它进行IS NULL的判断结果是False
Hive支持动态设置环境变量
例如,当启动一个执行环境后,可以调用如下命令:
set mapred.job.queue.nam=queue01;
Hive环境默认是default数据库,需用use切换数据库
例如,当启动一个执行环境后,可以调用如下命令:
use test_databases;
查询注意事项
inner join中的join key可以作为过滤条件,过滤条件放置位置必须规范,where条件放置过滤条件,on条件放置等值条件,例如下面语句
select * from test a join test b on a.id=b.id and a.id=3;
select * from test a join test b on a.id=b.id where a.id=3;
select * from test a join test b where a.id=b.id and a.id=3;
这三个语句执行结果一致,且执行计划一致(因为Hive内部帮我们优化了执行计划),但我们还是应该规范写法,按照语句二写SQL
left/right outer join
left/right outer join类的join key不能作为驱动表的过滤条件,要实现过滤的话可以通过on + where组合
select * from test a left join test b on a.id=b.id and a.id=3;
select * from test a left join test b on a.id=b.id where a.id=3;
其中第一个语句中没有把条件放入过滤中,而是直接作为join key值,而语句二放入where中,针对这种情况我们需要特殊情况特殊处理,特别针对partition的时候,如果希望过滤某批数据,应放到where后面处理
hive mapjoin查询注意
1.有一个极小的表<1000行
2.需要做不等值join的where操作(a.x < b.y 或者 a.x like b.y等,注:目前版本join下不支持不等值操作,不等值需加到where条件里)
3.默认情况下,hive会优化join,自动转换为mapjoin,但是某些情况下,mapjoin执行会失败,主要是由于hive依靠文件大小判断是否进入mapjoin,然后由于文件行数过大,导致map过程中出现OOM而失败,比如下面语句:
SELECT T.BD_ID AS BD_ID,T.BD_NAME AS BD_NAME,T.ITEM_CODE AS ITEM_CODE,T.ITEM_DESC AS ITEM_DESC,T.VENDOR_CODE AS VENDOR_CODE,T.VENDOR_NAME AS VENDOR_NAME,48 AS DATASOURCE_NUM_IDFROM GKEI.G_SRM_PRICE_ORDER_ALL_DTL TLEFT JOIN TEMP.G_SRM_PRICE_ORDER_TEMP TEMPON T.ORGANIZATION_ID = TEMP.ORGANIZATION_IDAND T.ITEM_ID = TEMP.INVENTORY_ITEM_IDWHERE TEMP.INVENTORY_ITEM_ID IS NULL;
其中主表不到40万,重表 30万,但是一直跑不出来,经过查看,map执行失败,这种情况下,我们应该设置参数set hive.auto.convert.join=false;阻止进行map join查询,让它走common join
特殊不等值查询
-- 语句一
SELECT t.statdate,c.cname,count(t.cookieid)
FROM tmpdb.city c
JOIN ecdata.ext_trackflow t ON (t.area1= c.cnameOR t.area2 =c.cnameOR t.area3 = c.cname)
WHERE t.statdate>='20140818' and t.statdate<='20140824'AND platform='pc'
GROUP BY t.statdate,
c.cname;-- 把上面的语句修改为
-- 语句二
SELECT dt,name,count(cid)
FROM(SELECT t.statdate dt,c.cname name,t.cookieid cidFROM tmpdb.city cJOIN ecdata.ext_trackflow t ON t.area1 =c.cnameWHERE t.statdate>='20140818'AND t.statdate<='20140824'AND platform='pc'UNION ALL SELECT t.statdate dt,c.cname name,t.cookieid cidFROM tmpdb.city cJOIN ecdata.ext_trackflow t ON t.area2 =c.cnameWHERE t.statdate>='20140818'AND t.statdate<='20140824'AND platform='pc'UNION ALL SELECT t.statdate dt,c.cname name,t.cookieid cidFROM tmpdb.city cJOIN ecdata.ext_trackflow t ON t.area3 =c.cnameWHERE t.statdate>='20140818'AND t.statdate<='20140824'AND platform='pc') tmp_trackflow
GROUP BY dt,name;-- 而不是修改为
-- 语句三
SELECT t.statdate,c.cname,count(t.cookieid)
FROM tmpdb.city c
JOIN ecdata.ext_trackflow t
WHERE t.statdate>='20140818'AND t.statdate<='20140824'AND platform='pc'AND (t.area1= c.cnameOR t.area2 =c.cnameOR t.area3 = c.cname)
GROUP BY t.statdate,c.cname;
-- 因为语句三会做笛卡尔积,很慢
谨防数据倾斜
如:
insert overwrite table temp.temp_css_phone_addr
select distinct a.user_id, b.mobile, a.longitude, a.latitude
fromdw_dmp.dw_user_addr_info a
left outer joindw_dmp.dw_user_phone_info b
on(a.user_id = b.user_id)
whereb.mobile is not NULL and b.mobile <> "" anda.longitude is not NULL and a.latitude is not NULL anda.longitude <> "" and a.latitude <> "" and a.user_id not in (select distinct user_id from temp.temp_css_id_integration_result);
其实这个语句不是那么复杂,可是整个语句跑了10几个小时也跑不出来,并且还失败了,为什么呢?
其实这里就这个reduce执行的时间过长,且还把磁盘空间撑爆了,原来是内部有条数据特别多,导致reduce数据倾斜,这种怎么处理呢
1. 设置参数
hive.map.aggr=true
Map 端部分聚合,相当于Combiner
hive.groupby.skewindata=true
有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作
2.在业务逻辑优化效果的不大情况下,有些时候是可以将倾斜的数据单独拿出来处理。最后union回去(具体参考后文数据倾斜处理)