sql
insert overwrite table dwintdata.dw_f_da_enterprise2
select *
from dwintdata.dw_f_da_enterprise;
hdfs文件大小数量展示
注意这里文件数有17个 共计321M 最后是划分为了21个task
为什么会有21个task?不是128M 64M 或者说我这里小于128 每个文件一个map吗?
tez ui日志
再仔细看每个task的日志
map0 data_source=CSIG/HIVE_UNION_SUBDIR_1/000008_0:0+16903572
map1 data_source=CSIG/HIVE_UNION_SUBDIR_1/000000_0:0+16960450
map2 data_source=CSIG/HIVE_UNION_SUBDIR_1/000003_0:0+16808165
map3 data_source=CSIG/HIVE_UNION_SUBDIR_1/000001_0:0+17007259
map4 data_source=CSIG/HIVE_UNION_SUBDIR_1/000006_0:0+16877230
map5 data_source=CSIG/HIVE_UNION_SUBDIR_1/000004_0:0+16941186
map6 data_source=hehe/HIVE_UNION_SUBDIR_2/000004_0:0+16777216
map7 data_source=hehe/HIVE_UNION_SUBDIR_2/000002_0:0+16777216
map8 data_source=CSIG/HIVE_UNION_SUBDIR_1/000002_0:0+16946639
map9 data_source=CSIG/HIVE_UNION_SUBDIR_1/000009_0:0+16855768
map10 data_source=hehe/HIVE_UNION_SUBDIR_2/000001_0:0+16777216
map11 data_source=CSIG/HIVE_UNION_SUBDIR_1/000005_0:0+16872517
map12 data_source=hehe/HIVE_UNION_SUBDIR_2/000000_0:0+16777216
map13 data_source=hehe/HIVE_UNION_SUBDIR_2/000006_0:0+16777216
map14 data_source=hehe/HIVE_UNION_SUBDIR_2/000000_0:16777216+729642 注意这里啊
data_source=hehe/HIVE_UNION_SUBDIR_2/000001_0:16777216+7188613
map15 data_source=CSIG/HIVE_UNION_SUBDIR_1/000007_0:0+16761291
map16 data_source=hehe/HIVE_UNION_SUBDIR_2/000005_0:0+16777216
map17 data_source=hehe/HIVE_UNION_SUBDIR_2/000003_0:0+16777216
map18 data_source=hehe/HIVE_UNION_SUBDIR_2/000002_0:16777216+7404916
data_source=hehe/HIVE_UNION_SUBDIR_2/000005_0:16777216+7341669
map19 data_source=hehe/HIVE_UNION_SUBDIR_2/000003_0:16777216+7378488
data_source=hehe/HIVE_UNION_SUBDIR_2/000006_0:16777216+7268763
map20 data_source=hehe/HIVE_UNION_SUBDIR_2/000004_0:16777216+7070700
data_source=hehe/000001_0:0+12488
16777216 这是一个什么数?大家要敏感下 1024*1024*16=16777216=16M 说明map只能读16M?
我18个文件中其中总量是321M 因为有一些不满16M 所以最后分为21个map 。那为什么是16M的map呢?
tez.grouping.min-size=16777216
tez.grouping.max-size=134217728 --128M
tez.grouping.split-waves=1.7
借用以前看到过的一篇文章
Hive 基于Tez引擎 map和reduce数的参数控制原理与调优经验_tez.grouping.max-size_abcdggggggg的博客-CSDN博客
这里好像说的通,但是又好像说不通 最低16M最高128M 那如果我是64M的文件呢?
分成1个map 还是64/16=4个map 还是64+64 2个map
mapper数的测试
测试参数set tez.grouping.min-size
测试1
set tez.grouping.min-size=16777216;
map数21个,reduce数2个,文件数21个,花费时间6s
测试2
set tez.grouping.min-size=67108864; --64M
map数8个,reduce数2个,生成文件数12个 花费时间8s
测试3
set tez.grouping.min-size=134217728 ;
map数5个,reduce个数2个,生成文件数8个,花费时间11s
结论分析
说明map读取的越大,时间越快(不一定啊。你要是map设置1k。。) 生成的文件越少
源码解析
看到上面那位博主写到一个参数tez.grouping.split-count,始终找不到只能找源码了。我发现
老哥也没好好搞。
https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java#L187
set tez.grouping.by-length=true 默认是true
set tez.grouping.by-count=false 默认是false
set tez.grouping.max-size=1024*1024*1024L --这是java定义的 你自己算了写好。
set tez.grouping.min-size=50*1024*1024
//originalSplits 数据文件分成了多少个切片
//之前算出来的切片数public List<GroupedSplitContainer> getGroupedSplits(Configuration conf,List<SplitContainer> originalSplits, int desiredNumSplits,String wrappedInputFormatName,SplitSizeEstimatorWrapper estimator,SplitLocationProviderWrapper locationProvider) throwsIOException, InterruptedException {LOG.info("Grouping splits in Tez");Objects.requireNonNull(originalSplits, "Splits must be specified");//这里获取设置的参数tez.grouping.by-countint configNumSplits = conf.getInt(TEZ_GROUPING_SPLIT_COUNT, 0);if (configNumSplits > 0) {// always use config override if specified//desiredNumSplits 是tez算大概要多少 我们设置了始终以我们的为准desiredNumSplits = configNumSplits;LOG.info("Desired numSplits overridden by config to: " + desiredNumSplits);}if (estimator == null) {estimator = DEFAULT_SPLIT_ESTIMATOR;}if (locationProvider == null) {locationProvider = DEFAULT_SPLIT_LOCATION_PROVIDER;}List<GroupedSplitContainer> groupedSplits = null;String emptyLocation = "EmptyLocation";String localhost = "localhost";String[] emptyLocations = {emptyLocation};groupedSplits = new ArrayList<GroupedSplitContainer>(desiredNumSplits);//看所有文件是不是都是本地,个人猜测是数据文件都是有3个节点么,这个任务比如说运行再node11,数据有可能再node12 和node11 node13boolean allSplitsHaveLocalhost = true;long totalLength = 0;Map<String, LocationHolder> distinctLocations = createLocationsMap(conf);// go through splits and add them to locationsfor (SplitContainer split : originalSplits) {totalLength += estimator.getEstimatedSize(split);String[] locations = locationProvider.getPreferredLocations(split);if (locations == null || locations.length == 0) {locations = emptyLocations;allSplitsHaveLocalhost = false;}//判断是不是本地。for (String location : locations ) {if (location == null) {location = emptyLocation;allSplitsHaveLocalhost = false;}if (!location.equalsIgnoreCase(localhost)) {allSplitsHaveLocalhost = false;}distinctLocations.put(location, null);}}//如果我们配置了group_count 并且文件切片数量>0//或者我们没有配置group_count 并且文件数==0 就走if 肯定是上面的情况if (! (configNumSplits > 0 ||originalSplits.size() == 0)) {// numSplits has not been overridden by config// numSplits has been set at runtime// there are splits generated// desired splits is less than number of splits generated// Do sanity checks//desiredNumSplits已经等于我们配置的数量了,int splitCount = desiredNumSplits>0?desiredNumSplits:originalSplits.size();//获取文件总大小 320M 337336647/ 3 =112,445,549long lengthPerGroup = totalLength/splitCount;//获取我们配置的group最大sizelong maxLengthPerGroup = conf.getLong(TEZ_GROUPING_SPLIT_MAX_SIZE,TEZ_GROUPING_SPLIT_MAX_SIZE_DEFAULT);//获取我们配置的group最小sizelong minLengthPerGroup = conf.getLong(TEZ_GROUPING_SPLIT_MIN_SIZE,TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT);if (maxLengthPerGroup < minLengthPerGroup ||minLengthPerGroup <=0) {throw new TezUncheckedException("Invalid max/min group lengths. Required min>0, max>=min. " +" max: " + maxLengthPerGroup + " min: " + minLengthPerGroup);}//如果我们配置的group count 不合理? 比如100G的文件 你配置了1个count 此时1个group100G 属于 >128M或者这里1Gif (lengthPerGroup > maxLengthPerGroup) {//切片太大了。// splits too big to work. Need to override with max size.//就按照总大小/max +1来 因为没除尽所以+1 也就是按最大的来 int newDesiredNumSplits = (int)(totalLength/maxLengthPerGroup) + 1;LOG.info("Desired splits: " + desiredNumSplits + " too small. " +" Desired splitLength: " + lengthPerGroup +" Max splitLength: " + maxLengthPerGroup +" New desired splits: " + newDesiredNumSplits +" Total length: " + totalLength +" Original splits: " + originalSplits.size());desiredNumSplits = newDesiredNumSplits;} else if (lengthPerGroup < minLengthPerGroup) {// splits too small to work. Need to override with size.int newDesiredNumSplits = (int)(totalLength/minLengthPerGroup) + 1;/*** This is a workaround for systems like S3 that pass the same* fake hostname for all splits.*/if (!allSplitsHaveLocalhost) {desiredNumSplits = newDesiredNumSplits;}LOG.info("Desired splits: " + desiredNumSplits + " too large. " +" Desired splitLength: " + lengthPerGroup +" Min splitLength: " + minLengthPerGroup +" New desired splits: " + newDesiredNumSplits +" Final desired splits: " + desiredNumSplits +" All splits have localhost: " + allSplitsHaveLocalhost +" Total length: " + totalLength +" Original splits: " + originalSplits.size());}}if (desiredNumSplits == 0 ||originalSplits.size() == 0 ||desiredNumSplits >= originalSplits.size()) {// nothing set. so return all the splits as isLOG.info("Using original number of splits: " + originalSplits.size() +" desired splits: " + desiredNumSplits);groupedSplits = new ArrayList<GroupedSplitContainer>(originalSplits.size());for (SplitContainer split : originalSplits) {GroupedSplitContainer newSplit =new GroupedSplitContainer(1, wrappedInputFormatName, cleanupLocations(locationProvider.getPreferredLocations(split)),null);newSplit.addSplit(split);groupedSplits.add(newSplit);}return groupedSplits;}
//总大小处于切片数 by-lengthlong lengthPerGroup = totalLength/desiredNumSplits;
//数据所在的节点数 int numNodeLocations = distinctLocations.size();
//每个节点含有的切片数 by-nodeint numSplitsPerLocation = originalSplits.size()/numNodeLocations;
//每个group含有的切片数int numSplitsInGroup = originalSplits.size()/desiredNumSplits;// allocation loop here so that we have a good initial size for the listsfor (String location : distinctLocations.keySet()) {distinctLocations.put(location, new LocationHolder(numSplitsPerLocation+1));}Set<String> locSet = new HashSet<String>();
//对所有切片开始遍历for (SplitContainer split : originalSplits) {locSet.clear();String[] locations = locationProvider.getPreferredLocations(split);if (locations == null || locations.length == 0) {locations = emptyLocations;}for (String location : locations) {if (location == null) {location = emptyLocation;}locSet.add(location);}for (String location : locSet) {LocationHolder holder = distinctLocations.get(location);holder.splits.add(split);}}
//按大小划分group 默认trueboolean groupByLength = conf.getBoolean(TEZ_GROUPING_SPLIT_BY_LENGTH,TEZ_GROUPING_SPLIT_BY_LENGTH_DEFAULT);
//按指定count划分groupboolean groupByCount = conf.getBoolean(TEZ_GROUPING_SPLIT_BY_COUNT,TEZ_GROUPING_SPLIT_BY_COUNT_DEFAULT);
//按照节点划分groupboolean nodeLocalOnly = conf.getBoolean(TEZ_GROUPING_NODE_LOCAL_ONLY,TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT);if (!(groupByLength || groupByCount)) {throw new TezUncheckedException("None of the grouping parameters are true: "+ TEZ_GROUPING_SPLIT_BY_LENGTH + ", "+ TEZ_GROUPING_SPLIT_BY_COUNT);}
//打印日志信息分析LOG.info("Desired numSplits: " + desiredNumSplits +" lengthPerGroup: " + lengthPerGroup +" numLocations: " + numNodeLocations +" numSplitsPerLocation: " + numSplitsPerLocation +" numSplitsInGroup: " + numSplitsInGroup +" totalLength: " + totalLength +" numOriginalSplits: " + originalSplits.size() +" . Grouping by length: " + groupByLength +" count: " + groupByCount +" nodeLocalOnly: " + nodeLocalOnly);// go through locations and group splits
//处理到第几个切片了int splitsProcessed = 0;List<SplitContainer> group = new ArrayList<SplitContainer>(numSplitsInGroup);Set<String> groupLocationSet = new HashSet<String>(10);boolean allowSmallGroups = false;boolean doingRackLocal = false;int iterations = 0;
//对每一个切片开始遍历while (splitsProcessed < originalSplits.size()) {iterations++;int numFullGroupsCreated = 0;for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {group.clear();groupLocationSet.clear();String location = entry.getKey();LocationHolder holder = entry.getValue();SplitContainer splitContainer = holder.getUnprocessedHeadSplit();if (splitContainer == null) {// all splits on node processedcontinue;}int oldHeadIndex = holder.headIndex;long groupLength = 0;int groupNumSplits = 0;do {
//这个groupgroup.add(splitContainer);groupLength += estimator.getEstimatedSize(splitContainer);groupNumSplits++;holder.incrementHeadIndex();splitContainer = holder.getUnprocessedHeadSplit();} while(splitContainer != null&& (!groupByLength ||(groupLength + estimator.getEstimatedSize(splitContainer) <= lengthPerGroup))&& (!groupByCount ||(groupNumSplits + 1 <= numSplitsInGroup)));if (holder.isEmpty()&& !allowSmallGroups&& (!groupByLength || groupLength < lengthPerGroup/2)&& (!groupByCount || groupNumSplits < numSplitsInGroup/2)) {// group too small, reset itholder.headIndex = oldHeadIndex;continue;}numFullGroupsCreated++;// One split group createdString[] groupLocation = {location};if (location == emptyLocation) {groupLocation = null;} else if (doingRackLocal) {for (SplitContainer splitH : group) {String[] locations = locationProvider.getPreferredLocations(splitH);if (locations != null) {for (String loc : locations) {if (loc != null) {groupLocationSet.add(loc);}}}}groupLocation = groupLocationSet.toArray(groupLocation);}GroupedSplitContainer groupedSplit =new GroupedSplitContainer(group.size(), wrappedInputFormatName,groupLocation,// pass rack local hint directly to AM((doingRackLocal && location != emptyLocation)?location:null));for (SplitContainer groupedSplitContainer : group) {groupedSplit.addSplit(groupedSplitContainer);Preconditions.checkState(groupedSplitContainer.isProcessed() == false,"Duplicates in grouping at location: " + location);groupedSplitContainer.setIsProcessed(true);splitsProcessed++;}if (LOG.isDebugEnabled()) {LOG.debug("Grouped " + group.size()+ " length: " + groupedSplit.getLength()+ " split at: " + location);}groupedSplits.add(groupedSplit);}if (!doingRackLocal && numFullGroupsCreated < 1) {// no node could create a regular node-local group.// Allow small groups if that is configured.if (nodeLocalOnly && !allowSmallGroups) {LOG.info("Allowing small groups early after attempting to create full groups at iteration: {}, groupsCreatedSoFar={}",iterations, groupedSplits.size());allowSmallGroups = true;continue;}// else go rack-localdoingRackLocal = true;// re-create locationsint numRemainingSplits = originalSplits.size() - splitsProcessed;Set<SplitContainer> remainingSplits = new HashSet<SplitContainer>(numRemainingSplits);// gather remaining splits.for (Map.Entry<String, LocationHolder> entry : distinctLocations.entrySet()) {LocationHolder locHolder = entry.getValue();while (!locHolder.isEmpty()) {SplitContainer splitHolder = locHolder.getUnprocessedHeadSplit();if (splitHolder != null) {remainingSplits.add(splitHolder);locHolder.incrementHeadIndex();}}}if (remainingSplits.size() != numRemainingSplits) {throw new TezUncheckedException("Expected: " + numRemainingSplits+ " got: " + remainingSplits.size());}// doing all this now instead of up front because the number of remaining// splits is expected to be much smallerRackResolver.init(conf);Map<String, String> locToRackMap = new HashMap<String, String>(distinctLocations.size());Map<String, LocationHolder> rackLocations = createLocationsMap(conf);for (String location : distinctLocations.keySet()) {String rack = emptyLocation;if (location != emptyLocation) {rack = RackResolver.resolve(location).getNetworkLocation();}locToRackMap.put(location, rack);if (rackLocations.get(rack) == null) {// splits will probably be located in all racksrackLocations.put(rack, new LocationHolder(numRemainingSplits));}}distinctLocations.clear();HashSet<String> rackSet = new HashSet<String>(rackLocations.size());int numRackSplitsToGroup = remainingSplits.size();for (SplitContainer split : originalSplits) {if (numRackSplitsToGroup == 0) {break;}// Iterate through the original splits in their order and consider them for grouping.// This maintains the original ordering in the list and thus subsequent grouping will// maintain that orderif (!remainingSplits.contains(split)) {continue;}numRackSplitsToGroup--;rackSet.clear();String[] locations = locationProvider.getPreferredLocations(split);if (locations == null || locations.length == 0) {locations = emptyLocations;}for (String location : locations ) {if (location == null) {location = emptyLocation;}rackSet.add(locToRackMap.get(location));}for (String rack : rackSet) {rackLocations.get(rack).splits.add(split);}}remainingSplits.clear();distinctLocations = rackLocations;// adjust split length to be smaller because the data is non localfloat rackSplitReduction = conf.getFloat(TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION,TEZ_GROUPING_RACK_SPLIT_SIZE_REDUCTION_DEFAULT);if (rackSplitReduction > 0) {long newLengthPerGroup = (long)(lengthPerGroup*rackSplitReduction);int newNumSplitsInGroup = (int) (numSplitsInGroup*rackSplitReduction);if (newLengthPerGroup > 0) {lengthPerGroup = newLengthPerGroup;}if (newNumSplitsInGroup > 0) {numSplitsInGroup = newNumSplitsInGroup;}}LOG.info("Doing rack local after iteration: " + iterations +" splitsProcessed: " + splitsProcessed +" numFullGroupsInRound: " + numFullGroupsCreated +" totalGroups: " + groupedSplits.size() +" lengthPerGroup: " + lengthPerGroup +" numSplitsInGroup: " + numSplitsInGroup);// dont do smallGroups for the first passcontinue;}if (!allowSmallGroups && numFullGroupsCreated <= numNodeLocations/10) {// a few nodes have a lot of data or data is thinly spread across nodes// so allow small groups nowallowSmallGroups = true;LOG.info("Allowing small groups after iteration: " + iterations +" splitsProcessed: " + splitsProcessed +" numFullGroupsInRound: " + numFullGroupsCreated +" totalGroups: " + groupedSplits.size());}if (LOG.isDebugEnabled()) {LOG.debug("Iteration: " + iterations +" splitsProcessed: " + splitsProcessed +" numFullGroupsInRound: " + numFullGroupsCreated +" totalGroups: " + groupedSplits.size());}}LOG.info("Number of splits desired: " + desiredNumSplits +" created: " + groupedSplits.size() +" splitsProcessed: " + splitsProcessed);return groupedSplits;}
set tez.grouping.by-length=true 默认是true
set tez.grouping.by-count=false 默认是false
set tez.grouping.node.local.only 默认是false
set tez.grouping.max-size=1024*1024*1024L=1G --这是java定义的 你自己算了写好。
set tez.grouping.min-size=50*1024*1024=50M
set tez.grouping.split-count=0
以我的为例
max=128M min=16M count=10 by-count=true by-length=true split-count=10
文件总共是320M
前面逻辑不看。反正就是以min=16M读取 有21个切片分为了21个group
如果split-count=10 320M/10=32M 32M between min and max 最后数量就是10个
如果split-count=2 320M/2=160M 不在min和max之间 所以320m/max=3 3+1=4个
后面太长 懒得看了。
反正还要根据 tez.grouping.rack-split-reduction=0.75f 再去调整一波。。
总之这个参数有用by-count有点用
测试参数tez.grouping.split-count
测试1
set tez.grouping.by-count=true;
set tez.grouping.split-count=50;
26个containner 26core 104448MB
测试2
set tez.grouping.by-count=true;
set tez.grouping.split-count=15;
26个containner 26core 104448MB
测试3
set tez.grouping.by-count=true;
set tez.grouping.split-count=10;
16个container 16core 63488MB
测试4
set tez.grouping.by-count=true;
set tez.grouping.split-count=5;
9个container 9core 34816Mb
测试5
set tez.grouping.by-count=true;
set tez.grouping.split-count=2;
5个container 5core 18432Mb
说明啥 这个参数确实有用,但是不是特别好控制map数。(可能是我不太了解源码)
但是突然感觉不对
测试6
set tez.grouping.split-count=2;
set tez.grouping.by-count=true;
set tez.grouping.by-length=false;
说明这个bycount=true 和by-length=false时才会起作用
测试7
set tez.grouping.split-count=10;
set tez.grouping.by-count=true;
set tez.grouping.by-length=false;
测试fileinputformat.split.minsize
mapreduce.input.fileinputformat.split.maxsize=256000000
mapreduce.input.fileinputformat.split.minsize=1
测试1
set mapreduce.input.fileinputformat.split.minsize=128000000
这里是18个原因就是文件个数
测试2
set mapreduce.input.fileinputformat.split.minsize=64000000
这里也是文件个数
测试3
set mapreduce.input.fileinputformat.split.minsize=16000000;
这里23就是18+多出来的那部分
测试4
set mapreduce.input.fileinputformat.split.minsize=8000000;
这里25其实也差不多 为什么不是上面23*46呢? 因为tez.min.size=16M
set mapreduce.input.fileinputformat.split.minsize=8000000;
set tez.grouping.min-size=8388608; 8M
看 果然被我猜对了。!!!!!!!!!
至此mapper数的参数调整 好像也差不多
开始测试reduce的个数
参数 | 默认值 | 说明 |
mapred.reduce.tasks | -1 | 指定reduce的个数 |
hive.exec.reducers.bytes.per.reducer | 67108864 | 每个reduce的数据处理量 |
hive.exec.reducers.max | 1009 | reduce的最大个数 |
hive.tez.auto.reducer.parallelism | true | 是否启动reduce自动并行 |
有点累了。
测试mapred.reduce.tasks
set mapred.reduce.tasks=4
reduce数变多了。22个container 88064 MB
set mapred.reduce.tasks=10
22个container 88064Mb
set mapred.reduce.tasks=20
28个container112640MB
测试hive.exec.reducers.bytes.per.reducer=67108864
这个默认试64M 安导里我的reduce也差不多320M 也要分成5个reduce呀;
set hive.exec.reducers.bytes.per.reducer=33554432
set hive.exec.reducers.bytes.per.reducer=8388608
没啥用,我记得这个参数以前有用的。可能引擎不一样了吧
有点累了。后面在看怎么调整container的大小