MR任务
mr任务参考链接
set hive.exec.reducers.max=3
set hive.exec.dynamic.partition.mode = true; --使用动态分区时,设置为ture。 set hive.exec.dynamic.partition.mode = nonstrict; --动态分区模式,默认值:strict,表示必须指定一个分区为静态分区;nonstrict模式表示允许所有的分区字段都可以使用动态分区。一般需要设置为nonstrict。 set hive.exec.max.dynamic.partitions.pernode =10; --在每个执行MR的节点上,最多可以创建多少个动态分区,默认值:100。 set hive.exec.max.dynamic.partitions =1000; --在所有执行MR的节点上,最多一共可以创建多少个动态分区,默认值:1000。 set hive.exec.max.created.files = 100000; --整个MR Job中最多可以创建多少个HDFS文件,默认值:100000。 set hive.error.on.empty.partition = false; --当有空分区产生时,是否抛出异常,默认值:false。 Hive文件产生大量小文件的原因: 一是文件本身的原因:小文件多,以及文件的大小; 二是使用动态分区,可能会导致产生大量分区,从而产生很多小文件,也会导致产生很多Mapper; 三是Reduce数量较多,Hive SQL输出文件的数量和Reduce的个数是一样的。 小文件带来的影响: 文件的数量和大小决定Mapper任务的数量,小文件越多,Mapper任务越多,每一个Mapper都会启动一个JVM来运行,所以这些任务的初始化和执行会花费大量的资源,严重影响性能。 在NameNode中每个文件大约占150字节,小文件多,会严重影响NameNode性能。 解决小文件问题: 如果动态分区数量不可预测,最好不用。如果用,最好使用distributed by分区字段,这样会对字段进行一个hash操作,把相同的分区给同一个Reduce处理; 减少Reduce数量; 进行以一些参数调整。
Hdfs文件数
指定目录下的文件夹,文件,容量大小
[root@mz-hadoop-01 ~]# hdfs dfs -count /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed568 7433 6065483664 /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed[root@mz-hadoop-01 ~]# hdfs dfs -count -h /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed568 7.3 K 5.6 G /user/hive/warehouse/paascloud_tcm.db/dwd/dwd_t_record_detailed
Hive文件数
SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS WHERE tbl_id='97387'
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idGROUP BY tbl_id
ORDER BY file_cnts DESC;TBL_ID file_cnts
------ -----------97387 2082
所有文件数
SELECT SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idfile_cnts
-----------340323
表文件数topN
SELECT e.*,f.*
FROM
(SELECT c.*,d.db_id,d.tbl_name
FROM
(
SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idGROUP BY tbl_id
ORDER BY file_cnts DESC
) cLEFT JOIN (SELECT * FROM tbls
) d
ON c.tbl_id=d.tbl_id) e LEFT JOIN
(SELECT db_id AS db_id2,`desc`,DB_LOCATION_URI,NAME as db_name,OWNER_NAME,OWNER_TYPE FROM dbs
)f ON e.db_id=f.DB_ID2
库文件数topN
select
db_id,db_name,DB_LOCATION_URI,sum(file_cnts) as file_cnts
from (SELECT e.*,f.*
FROM
(SELECT c.*,d.db_id,d.tbl_name
FROM
(
SELECT tbl_id,SUM(PARAM_VALUE) AS file_cnts
FROM
(
SELECT * FROM PARTITIONS
) aLEFT JOIN (SELECT * FROM partition_params WHERE PARAM_KEY='numFiles' ) b
ON a.part_id=b.part_idGROUP BY tbl_id
ORDER BY file_cnts DESC
) cLEFT JOIN (SELECT * FROM tbls
) d
ON c.tbl_id=d.tbl_id) e LEFT JOIN
(SELECT db_id AS db_id2,`desc`,DB_LOCATION_URI,NAME as db_name,OWNER_NAME,OWNER_TYPE FROM dbs
)f ON e.db_id=f.DB_ID2)g group by db_id,db_name,DB_LOCATION_URI order by file_cnts desc
小文件压缩任务
package com.mingzhi.common.universalimport com.mingzhi.common.interf.{IDate, MySaveMode}
import com.mingzhi.common.utils.{HiveUtil, SinkUtil, SparkUtils, TableUtils}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel/*** 处理只有一个分区dt的表*/
object table_compress_process {private var hive_dbs: String = "paascloud"private var hive_tables: String = "dwd_order_info_abi"private var dt: String = "2023-06-30"private var dt1: String = "2023-06-30"def main(args: Array[String]): Unit = {System.setProperty("HADOOP_USER_NAME", "root")val builder = SparkUtils.getBuilderif (System.getProperties.getProperty("os.name").contains("Windows")) {builder.master("local[*]")} else {hive_dbs = args(0)hive_tables = args(1)dt = args(2)dt1 = args(3)}val spark: SparkSession = builder.appName("clean_process").getOrCreate()HiveUtil.openDynamicPartition(spark)spark.sql("set spark.sql.shuffle.partitions=1")if ("all".equalsIgnoreCase(hive_dbs)) {val builder = new StringBuilder()val frame_db = spark.sql("show databases").select("databaseName")frame_db.show(false)frame_db.collect().foreach(db => {builder.append(db.toString().replace("[", "").replace("]", ","))})println("dbs:" + builder.toString())hive_dbs = builder.toString()}hive_dbs.split(",").foreach(db => {if (StringUtils.isNotBlank(db)) {if ("all".equalsIgnoreCase(hive_tables)) {compress_all_table(spark, db)} else {hive_tables.split(",").foreach(t => {compress_the_table(spark, db, t)})}}})spark.stop()}private def compress_the_table(spark: SparkSession, hive_db: String, table: String): Unit = {println("compress_the_table======>:" + hive_db + "." + table)spark.sql(s"use $hive_db")if (TableUtils.tableExists(spark, hive_db, table)) {try {new IDate {override def onDate(dt: String): Unit = {/*** 建议:对需要checkpoint的RDD,先执行persist(StorageLevel.DISK_ONLY)*/val f1 = spark.sql(s"""||select * from $hive_db.$table where dt='$dt'|""".stripMargin).persist(StorageLevel.MEMORY_ONLY)val r_ck: (DataFrame, String) = SparkUtils.persistDataFrame(spark, f1)val f2 = r_ck._1println("f2 show===>")f2.show(false)val type_ = TableUtils.getCompressType(spark, hive_db, table)if ("HiveFileFormat".equalsIgnoreCase(type_)) {println("sink HiveFileFormat table:" + table)SinkUtil.sink_to_hive_HiveFileFormat(spark, f2, hive_db, table, null)} else {//spark表SinkUtil.sink_to_hive(dt, spark, f2, hive_db, table, type_, MySaveMode.OverWriteByDt, 1)}spark.sql(s"drop table ${r_ck._2} ")}}.invoke(dt, dt1)} catch {case e: org.apache.spark.sql.AnalysisException => {println("exception1:" + e)}case e: Exception => println("exception:" + e)}}}private def compress_all_table(spark: SparkSession, hive_db: String): Unit = {spark.sql(s"use $hive_db")val frame_table = spark.sql(s"show tables")frame_table.show(100, false)frame_table.printSchema()frame_table.filter(r => {!r.getAs[Boolean]("isTemporary")}).select("tableName").collect().foreach(r => {//r:[ads_order_topn]val table = r.toString().replace("[", "").replace("]", "")println("compress table:" + hive_db + "." + table)if (TableUtils.tableExists(spark, hive_db, table)) {try {new IDate {override def onDate(dt: String): Unit = {val f1 = spark.sql(s"""||select * from $hive_db.$table where dt='$dt'|""".stripMargin)SinkUtil.sink_to_hive(dt, spark, f1, hive_db, table, "orc", MySaveMode.OverWriteByDt, 1)}}.invoke(dt, dt1)} catch {case e: org.apache.spark.sql.AnalysisException => {println("exception1:" + e)}case e: Exception => println("exception:" + e)}}})}
}