Hadoop Streaming详解

一: Hadoop Streaming详解

1、Streaming的作用

Hadoop Streaming框架,最大的好处是,让任何语言编写的map, reduce程序能够在hadoop集群上运行;map/reduce程序只要遵循从标准输入stdin读,写出到标准输出stdout即可

其次,容易进行单机调试,通过管道前后相接的方式就可以模拟streaming, 在本地完成map/reduce程序的调试

# cat inputfile | mapper | sort | reducer > output

最后,streaming框架还提供了作业提交时的丰富参数控制,直接通过streaming参数,而不需要使用java语言修改;很多mapreduce的高阶功能,都可以通过steaming参数的调整来完成

 

 2、Streaming的局限

Streaming默认只能处理文本数据Textfile,对于二进制数据,比较好的方法是将二进制的key, value进行base64编码,转化为文本

Mapper和reducer的前后都要进行标准输入和标准输出的转化,涉及数据拷贝和解析,带来了一定的开销

 

3、Streaming命令的相关参数    (普通选项、streaming选项)

Streaming命令的形式如下:

#  /usr/local/src/hadoop-1.2.1/bin/hadoop jar  hadoop-streaming.jar \

   [普通选项]  [Streaming选项]         #  注意:普通选项一定要写在streaming选项前面

 

普通选项

参数

可选/必选

解释

-conf  配置文件

可选

指定一个应用程序配置文件

-fs  host:port or local

可选

指定一个namenode

-jt   host:port  or local

可选

指定一个jobtracker

-files 文件1,文件2,

 

-files  hdfs://192.168.179.100:9000/file1.txt,

hdfs://192.168.179.100:9000/file2.txt

 

将代替-cacheFile选项

可选

类似-file, 不同的

1)将HDFS中的多个文件进行分发

 

2)文件已经位于HDFS上

 

3)框架会在该作业attemps目录内创建一个符号链接,指向该作业的jar目录(放置所有分发文件)

-archives

 

框架会在作业的attempt目录创建符号链接,指向作业的jar目录,jar目录中才是分发到本地的压缩文件

 

-archives hdfs://host:fs_port/user/testfile.tgz#tgzdir

testfile.tgz是用户上传到HDFS的打包压缩文件

#后的tgzdir是别名,hadoop-1.2.1中必须要别名

可选

逗号分隔的多个压缩文件,已经位于HDFS上

 

框架自动分发压缩文件到计算节点,并且Inputformat会自动进行解压

-D   property=value

可选

重点,很多属性通过-D指定

 

插曲1: mapred-site.xml 指定mapslotreduceslot

Map和reduce在datanode上的运行,会受到slot的限制,并且有各自的slot限制; 每个Datanode读取相应的配置文件, 从而确定每个datanode上能运行的最大map,reduce个数,以及节点能力是否充分发挥

Hadoop1.0中,slot在mapred-site.xml中配(mapreduce作业前配置好), 基本上每个slot在运行1个map, reduce作业后会占用1个CPU core,   最激进的做法是设置map和reduce的slot都是CPU core-1 (Map执行完后才会进行reduce),  预留1个CPU core给tasktracker(比如上报心跳等),  但通常reducer的slot要比reducer少,考虑大多数情况下mapper要比reducer多

默认map的slot为2,reduce的slot也为2

<configuration>

        <property>

                <name>mapred.job.tracker</name>

                <value>http://192.168.179.100:9001</value>

        </property>

        <property>

               <name>mapred.tasktracker.map.tasks.maximum</name>

               <value>15</value>

        </property>

        <property>

               <name>mapreduce.tasktracker.tasks.reduce.maximum</name>

               <value>10</value>

        </property>

</configuration>

 

插曲二: mapred-site.xml 指定map最终输出的merge文件的存放路径

<configuration>

        <property>

                <name>mapred.job.tracker</name>

                <value>http://192.168.179.100:9001</value>

        </property>

        <property>

               <name>mapred.local.dir</name>

               <value>/usr/loca/src/hadoop-1.2.1/tmp/mapoutput</value>

        </property>

</configuration>

 

当1个作业被提交并在tasktracer的管理下开始运行时,会对每个job创建1个目录,所有分发的文件,都放置在这里

${mapred.local.dir}/taskTracker/$user/jobcache/$jobid/jars/

 

普通选项中的-D  property=value
-D    普通选项,使用最多的高级参数,替代-jobconf(参数将被废弃),需要注意的是 -D选项要放在streaming参数的前面,一般我会放在参数的开头

类别

 

 

 

 

指定目录

-D  dfs.data.dir=/tmp

修改本地临时目录

 

-D  mapred.local.dir=/tmp/local

-D  mapred.system.dir=/tmp/system

-D  mapred.tmp.dir=/tmp/tmp

指定额外的本地临时目录

 

指定作业名

-D  mapred.job.name=”Test001”

 

 

指定只有map的作业

-D  mapred.reduce.tasks=0

该作业只有mapper, mapper的输出直接作为作业的输出

 

指定reducer个数

-D  mapred.reduce.tasks=2

 

 

指定mapper个数

-D  mapred.map.tasks=2

指定了不一定生效输入文件为压缩文件时,mapper和压缩文件个数一一对应,

输入数据为压缩文件时,mapper和文件个数一一对应,比较好的控制Mapper数量的方法

指定Mapper输出的key,value分隔符

-D stream.map.output.field.separator=.

-D stream.num.map.output.key.fields=4

Mapper的输出使用.做分割符,并且第4个.之前的部分作为key, 剩余的部分作为value (包含剩余的.)

如果mapper的输出没有4个., 则整体一行作为key, value为空

默认:

使用

\t做分隔符,第1个\t之前的部分作为key, 剩余为value, 如果mapper输出没有\t,则整体一行作为key,value为空

指定reducer输出的value, key分隔符

-D stream.reduce.output.field.seperator=.

-D stream.num.reduce.output.key.fields=4

指定reduce输出根据.分割,直到第4个.之前的内容为key,其他为value

Reducer程序要根据指定进行key,value的构造

不常用

-D stream.map.input.field.seperator

Inputformat如何分行,默认\n

 

不常用

-D stream.reduce.input.field.seperator

 

 

作业优先级

-D  mapred.job.priority=HIGH

VERY_LOW, LOW, NORMAL, HIGH, VERY_HIGH

 

最多同时运行的map任务数

-D mapred.job.map.capacity=5

 

 

最多同时运行的reduce任务数

-D mapred.job.reduce.capacity=3

 

 

Task没有响应(输入输出)的最大时间

-D mapred.task.timeout=6000

毫秒

超时后,该task被终止

Map的输出是否压缩

-D mapred.compress.map.output=True

 

 

Map的输出的压缩方式

-D mapred.map.output.comression.codec=

 

 

Reduce的输出是否压缩

-D mapred.output.compress=True

 

 

Reducer的输出的压缩方式

-D mapred.output.compression.codec=

 

 

 

-D 指定job名称
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D mapred.job.name=”Test001”
-D 指定reduce任务、map任务个数
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-D mapred.job.name=”Teset001”
-D mapred.reduce.tasks=2    # reduce task个数,一定生效
-D mapred.map.tasks=5       # map task个数,不一定生效
-D 指定mapper的输出分隔符
-D stream.map.output.field.seperator=.  # 指定mapper每条输出key,value分隔符
-D stream.num.map.output.key.fields=4   # 第4个.之前的部分为key,剩余为value
-D map.output.key.field.separator=.      # 设置map输出中,Key内部的分隔符

 

-D 指定基于哪些key进行分桶

基于指定的Key进行分桶,打标签

指定列数

-D num.key.fields.for.partition=1       # 只用1列Key做分桶
-D num.key.fields.for.partition=2       # 使用1,2共两列key做分桶

指定某些字段做key

-D mapred.text.key.partitioner.option =-k1,2   # 第1,2列Key做分桶
-D mapred.text.key.partitioner.option =-k2,2   # 第2列key做分桶

都要修改partition为能够只基于某些Key进行分桶的类

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-D 指定将reducer的输出进行压缩
-D mapred.output.compress=true-D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-D 指定将mapper的输出进行压缩
-D mapred.compress.map.output=true-D mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec

 

-D 指定Comparatorkey进行数字、倒序排序
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \  # 使用keyFieldBasedComparator进行key排序-D stream.map.output.field.separator=. \-D stream.num.map.output.key.fields=4 \-D map.output.key.field.separator=. \-D mapred.text.key.comparator.options=-k2,2nr \# -k2,2只用第二列排序,n数字排序,r倒序(从大到小)-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer org.apache.hadoop.mapred.lib.IdentityReducer

 

-D 指定每个reduce task申请的内存数量
-D mapreduce.reduce.memory.mb=512  #单位为M

 

 

Streaming选项

参数

可选/必选

参数描述

-input <HDFS目录或文件路径>

支持*通配符,指定多个文件或目录,多次-input,指定多个输入文件/目录

必选

Mapper的输入数据,文件要在任务提交前手动上传到HDFS

-output <HDFS目录>

# 路径不能已存在,否则认为是其他job的输出

必选

reducer输出结果的HDFS存放路径,  不能已存在,但脚本中一定要配置

-mapper <可执行命令或java>

 

-mapper “python map.py”

-mapper “bash map.sh”

-mapper “perl map.perl”

必选

Mapper程序

-reducer <可执行命令或java>

 

-reducer “python reducer.py”

-reducer “bash reducer.sh”

-reducer “perl reducer.sh”

可选

Reducer程序,不需要reduce处理就不指定

-combiner <可执行命令或java>

 

-combiner “python map.py”

-combiner “bash map.sh”

-combiner “perl map.perl”

可选

处理mapper输出的combiner程序

-file

<本地mapperreducer程序文件、程序运行需要的其他文件>

 

-file map.py

-file reduce.py

-file white_list

可选                            文件在本地,小文件

将本地文件分发给计算节点

 

文件作为作业的一部分,一起被打包并提交,所有分发的文件最终会被放置在datanodejob的同一个专属目录下:jobcache/job_xxx/jar

 

-cacheFile

hdfs://master:9000/cachefile_dir/white_list

 

分发HDFS文件

 

Job运行需要的程序,辅助文件都先放到HDFS上,指定HDFS文件路径,将HDFS文件拷贝到计算节点,也是都放置在job的同一个专属目录下:

jobcache/job_xxx/jar

-cacheArchive

 

hdfs://master:9000/w.tar.gz#WLDIR

 

分发HDFS压缩文件、压缩文件内部具有目录结构

 

 

-numReduceTasks  <数字>

 

-numReduceTasks  2

可选

指定该任务的reducer个数

-inputformat  <Java类名>

可选

指定自己定义的inputformat类,默认TextInputformat类

-outputformat  <Java类名>

可选

指定自己定义的outputformat类,默认TextOutputformat类

-cmdenv  name=value

可选

传递给streaming命令的环境变量

 

 

二、Mapper输入/输出,根据哪些key分桶,根据哪些key进行排序

 

先看看Hadoop-1.2.1 文档原文中的解释

As the mapper task runs, it converts its inputs into lines and feed the lines to the stdin of the process. In the meantime, the mapper collects the line oriented outputs from the stdout of the process and converts each line into a key/value pair, which is collected as the output of the mapper. By default, the prefix of a line up to the first tab character is the key and the rest of the line (excluding the tab character) will be the value. If there is no tab character in the line, then entire line is considered as key and the value is null. However, this can be customized, as discussed later.

 

Mapper输入:

每一个mapper开始运行时,输入文件会被转换成多行(TextInputformat根据\n来进行分行),并将每一行传递给stdin, 作为Mapper的输入 mapper直接对stdin中的每行内容做处理

 

Mapper输出分隔符:

默认情况下hadoop设置mapper输出的key, value通过tab进行分隔,可以重新指定

-D stream.map.output.field.seperator=.    # 指定mapper每条输出key,value分隔符

-D stream.num.map.output.key.fields=4   # 第4个.之前的部分为key,剩余为value

 

mapper的输出会经历

1、 partition前,根据mapper输出分隔符分离出KeyValue

-D stream.map.output.field.separator=.    # 指定mapper每条输出key,value分隔符

-D stream.num.map.output.key.fields=4   # 第4个.之前的为key, 剩下的为value

-D  map.output.key.field.separator=.        # 设置map输出中,Key内部的分隔符

 

2、 根据 “分桶分隔符”,确定哪些key被用来做partition(默认是用所有key, 只有1列; 或者是Mapper输出分隔符分离出的所有key都被用于Partition)

基于指定的Key进行分桶,打标签

指定列数

-D num.key.fields.for.partition=1        # 只用1列Key做分桶,也就是第一列

-D num.key.fields.for.partition=2       # 使用1,2共两列key做分桶(列数)

 

指定某些字段做key

-D mapred.text.key.partitioner.option =-k1,2   # 第1,2列Key做分桶

-D mapred.text.key.partitioner.option =-k2,2   # 第2列key做分桶

 

#都要修改partition为能够只基于某些Key进行分桶的类

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

 

3、Spill时根据Partition标签和所有Key进行排序

4Partition标签和key之间,也是通过mapper输出分隔符来隔离

5、reducer前的文件会删除partition标签,并根据Mapper输出分隔符确定出key, 进行Reducer前的归并排序;(reducer前的归并排序,基于所有mapper的key进行排序

因此如果要定义新的Mapper输出分隔符就要做到:1)mapper代码中根据新分隔符来构建输出到stdout的内容;2)提交作业时,通过—D 指定新的Mapper输出分隔符,以及第几个分隔符来分离Key

 

Reducer的输入:

每个Reducer的每条输入,就是去除Partition标签(根据Mapper分隔符分离出partition标签)后的内容,和Mapper输出到stdout中的内容相同,但不同记录之间已经做了排序;因此如果重新指定了Mapper的输出分隔符,Reducer程序就要修改为根据新的Mapper输出分隔符来分离Key,value;

 

Reducer的输出:

Reducer的输出,默认也是根据tab来分离key,value, 这也是reducer程序要根据tab来组合key,value输出给stdout的原因;  Reducer输出分隔符重新指定,Reducer程序中输出给stdout的内容也要配合新的分隔符来构造(Reducer->stdout-> outputformat ->file,  outputformat根据reducer的输出分隔符来分离key,value,  并写入文件

-D stream.reduce.output.field.seperator=.      # reducer输出key,value间的分隔符

-D stream.num.reduce.output.key.fields=4     # 第4个.之前的内容为key, 其他为value

转载于:https://www.cnblogs.com/shay-zhangjin/p/7714868.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/391027.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mongodb分布式集群搭建手记

一、架构简介 目标 单机搭建mongodb分布式集群(副本集 分片集群)&#xff0c;演示mongodb分布式集群的安装部署、简单操作。 说明 在同一个vm启动由两个分片组成的分布式集群&#xff0c;每个分片都是一个PSS(Primary-Secondary-Secondary)模式的数据副本集&#xff1b; Confi…

归约归约冲突_JavaScript映射,归约和过滤-带有代码示例的JS数组函数

归约归约冲突Map, reduce, and filter are all array methods in JavaScript. Each one will iterate over an array and perform a transformation or computation. Each will return a new array based on the result of the function. In this article, you will learn why …

为什么Java里面的静态方法不能是抽象的

为什么Java里面的静态方法不能是抽象的&#xff1f; 问题是为什么Java里面不能定义一个抽象的静态方法&#xff1f;例如&#xff1a; abstract class foo {abstract void bar( ); // <-- this is okabstract static void bar2(); //<-- this isnt why? }回答一 因为抽…

python16_day37【爬虫2】

一、异步非阻塞 1.自定义异步非阻塞 1 import socket2 import select3 4 class Request(object):5 def __init__(self,sock,func,url):6 self.sock sock7 self.func func8 self.url url9 10 def fileno(self): 11 return self.soc…

朴素贝叶斯实现分类_关于朴素贝叶斯分类及其实现的简短教程

朴素贝叶斯实现分类Naive Bayes classification is one of the most simple and popular algorithms in data mining or machine learning (Listed in the top 10 popular algorithms by CRC Press Reference [1]). The basic idea of the Naive Bayes classification is very …

python:改良廖雪峰的使用元类自定义ORM

概要本文仅仅是对廖雪峰老师的使用元类自定义ORM进行改进&#xff0c;并不是要创建一个ORM框架 编写fieldclass Field(object):def __init__(self, column_type,max_length,**kwargs):1&#xff0c;删除了参数name&#xff0c;field参数全部为定义字段类型相关参数&#xff0c;…

2019年度年中回顾总结_我的2019年回顾和我的2020年目标(包括数量和收入)

2019年度年中回顾总结In this post were going to take a look at how 2019 was for me (mostly professionally) and were also going to set some goals for 2020! &#x1f929; 在这篇文章中&#xff0c;我们将了解2019年对我来说(主要是职业)如何&#xff0c;我们还将为20…

在Java里重写equals和hashCode要注意什么问题

问题&#xff1a;在Java里重写equals和hashCode要注意什么问题 重写equals和hashCode有哪些问题或者陷阱需要注意&#xff1f; 回答一 理论&#xff08;对于语言律师或比较倾向于数学的人&#xff09;&#xff1a; equals() (javadoc) 必须定义为一个相等关系&#xff08;它…

vray阴天室内_阴天有话:第1部分

vray阴天室内When working with text data and NLP projects, word-frequency is often a useful feature to identify and look into. However, creating good visuals is often difficult because you don’t have a lot of options outside of bar charts. Lets face it; ba…

【codevs2497】 Acting Cute

这个题个人认为是我目前所做的最难的区间dp了&#xff0c;以前把环变成链的方法在这个题上并不能使用&#xff0c;因为那样可能存在重复计算 我第一遍想的时候就是直接把环变成链了&#xff0c;wa了5个点&#xff0c;然后仔细思考一下就发现了问题 比如这个样例 5 4 1 2 4 1 1 …

渐进式web应用程序_渐进式Web应用程序与加速的移动页面:有什么区别,哪种最适合您?

渐进式web应用程序Do you understand what PWAs and AMPs are, and which might be better for you? Lets have a look and find out.您了解什么是PWA和AMP&#xff0c;哪一种可能更适合您&#xff1f; 让我们看看并找出答案。 So many people own smartphones these days. T…

高光谱图像分类_高光谱图像分析-分类

高光谱图像分类初学者指南 (Beginner’s Guide) This article provides detailed implementation of different classification algorithms on Hyperspectral Images(HSI).本文提供了在高光谱图像(HSI)上不同分类算法的详细实现。 目录 (Table of Contents) Introduction to H…

在Java里如何给一个日期增加一天

在Java里如何给一个日期增加一天 我正在使用如下格式的日期: yyyy-mm-dd. 我怎么样可以给一个日期增加一天&#xff1f; 回答一 这样应该可以解决问题 String dt "2008-01-01"; // Start date SimpleDateFormat sdf new SimpleDateFormat("yyyy-MM-dd&q…

CentOS 7安装和部署Docker

版权声明&#xff1a;本文为博主原创文章&#xff0c;未经博主允许不得转载。 https://blog.csdn.net/u010046908/article/details/79553227 Docker 要求 CentOS 系统的内核版本高于 3.10 &#xff0c;查看本页面的前提条件来验证你的CentOS 版本是否支持 Docker 。通过 uname …

JavaScript字符串方法终极指南-拆分

The split() method separates an original string into an array of substrings, based on a separator string that you pass as input. The original string is not altered by split().split()方法根据您作为输入传递的separator字符串&#xff0c;将原始字符串分成子字符串…

机器人的动力学和动力学联系_通过机器学习了解幸福动力学(第2部分)

机器人的动力学和动力学联系Happiness is something we all aspire to, yet its key factors are still unclear.幸福是我们所有人都渴望的东西&#xff0c;但其关键因素仍不清楚。 Some would argue that wealth is the most important condition as it determines one’s li…

在Java里怎将字节数转换为我们可以读懂的格式?

问题&#xff1a;在Java里怎将字节数转换为我们可以读懂的格式&#xff1f; 在Java里怎将字节数转换为我们可以读懂的格式 像1024应该变成"1 Kb"&#xff0c;而1024*1024应该变成"1 Mb". 我很讨厌为每个项目都写一个工具方法。在Apache Commons有没有这…

ubuntu 16.04 安装mysql

2019独角兽企业重金招聘Python工程师标准>>> 1) 安装 sudo apt-get install mysql-server apt-get isntall mysql-client apt-get install libmysqlclient-dev 2) 验证 sudo netstat -tap | grep mysql 如果有 就代表已经安装成功。 3&#xff09;开启远程访问 1、 …

shell:多个文件按行合并

paste file1 file2 file3 > file4 file1内容为&#xff1a; 1 2 3 file2内容为&#xff1a; a b c file3内容为&#xff1a; read write add file4内容为&#xff1a; 1 a read 2 b write 3 c add 转载于:https://www.cnblogs.com/seaBiscuit0922/p/7728444.html

form子句语法错误_用示例语法解释SQL的子句

form子句语法错误HAVING gives the DBA or SQL-using programmer a way to filter the data aggregated by the GROUP BY clause so that the user gets a limited set of records to view.HAVING为DBA或使用SQL的程序员提供了一种过滤由GROUP BY子句聚合的数据的方法&#xff…