Hadoop Streaming 编程

Hadoop Streaming 编程 | 董的博客

Hadoop Streaming 编程

 

Category: Hadoop-MapReduce

View: 5,678 阅
Author: Dong

1、概述

Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:

采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer)

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

-input myInputDirs \

-output myOutputDir \

-mapper cat \

-reducer wc

本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题。文章最后给出了程序下载地址。(本文内容基于Hadoop-0.20.2版本)

(注:如果你采用的语言为C或者C++,也可以使用Hadoop Pipes,具体可参考这篇文章:Hadoop Pipes编程。)

关于Hadoop Streaming高级编程方法,可参考这篇文章:Hadoop Streaming高级编程

2、Hadoop Streaming原理

mapper和reducer会从标准输入中读取用户数据,一行一行处理后发送给标准输出。Streaming工具会创建MapReduce作业,发送给各个tasktracker,同时监控整个作业的执行过程。

如果一个文件(可执行或者脚本)作为mapper,mapper初始化时,每一个mapper任务会把该文件作为一个单独进程启动,mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。 同时,mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成key/value对,作为mapper的输出。 默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value如果没有tab,整行作为key值,value值为null。

对于reducer,类似。

以上是Map/Reduce框架和streaming mapper/reducer之间的基本通信协议。

3、Hadoop Streaming用法

Usage: $HADOOP_HOME/bin/hadoop jar \

$HADOOP_HOME/hadoop-streaming.jar [options]

options:

(1)-input:输入文件路径

(2)-output:输出文件路径

(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本

(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本

(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。

(6)-partitioner:用户自定义的partitioner程序

(7)-combiner:用户自定义的combiner程序(必须用java实现)

(8)-D:作业的一些属性(以前用的是-jonconf),具体有:

             1)mapred.map.tasks:map task数目
             2)mapred.reduce.tasks:reduce task数目
             3)stream.map.input.field.separator/stream.map.output.field.separator: map task输入/输出数
据的分隔符,默认均为\t。
             4)stream.num.map.output.key.fields:指定map task输出记录中key所占的域数目
             5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task输入/输出数据的分隔符,默认均为\t。
             6)stream.num.reduce.output.key.fields:指定reduce task输出记录中key所占的域数目
另外,Hadoop本身还自带一些好用的Mapper和Reducer:
(1)    Hadoop聚集功能

Aggregate提供一个特殊的reducer类和一个特殊的combiner类,并且有一系列的“聚合器”(例如“sum”,“max”,“min”等)用于聚合一组value的序列。用户可以使用Aggregate定义一个mapper插件类,这个类用于为mapper输入的每个key/value对产生“可聚合项”。Combiner/reducer利用适当的聚合器聚合这些可聚合项。要使用Aggregate,只需指定“-reducer aggregate”。

(2)字段的选取(类似于Unix中的‘cut’)

Hadoop的工具类org.apache.hadoop.mapred.lib.FieldSelectionMapReduc帮助用户高效处理文本数据,就像unix中的“cut”工具。工具类中的map函数把输入的key/value对看作字段的列表。 用户可以指定字段的分隔符(默认是tab),可以选择字段列表中任意一段(由列表中一个或多个字段组成)作为map输出的key或者value。 同样,工具类中的reduce函数也把输入的key/value对看作字段的列表,用户可以选取任意一段作为reduce输出的key或value。

4、Mapper和Reducer实现

本节试图用尽可能多的语言编写Mapper和Reducer,包括Java,C,C++,Shell脚本,python等。

由于Hadoop会自动解析数据文件到Mapper或者Reducer的标准输入中,以供它们读取使用,所有应先了解各个语言获取标准输入的方法。

(1)    Java语言:

见Hadoop自带例子

(2)    C++语言

1
2
3
4
string key;
while(cin>>key){
cin>>value;
 ….

(3)  C语言

1
2
3
4
5
char buffer[BUF_SIZE];
while(fgets(buffer, BUF_SIZE - 1, stdin)){
int len = strlen(buffer);
}

(4)  Shell脚本

用管道

(5)  Python脚本

1
2
3
import sys
for line in sys.stdin:
.......

为了说明各种语言编写Hadoop Streaming程序的方法,下面以WordCount为例,WordCount作业的主要功能是对用户输入的数据中所有字符串进行计数。

(1)C语言实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
//mapper
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#define BUF_SIZE        2048
#define DELIM   "\n"
int main(int argc, char *argv[]){
     char buffer[BUF_SIZE];
     while(fgets(buffer, BUF_SIZE - 1, stdin)){
            int len = strlen(buffer);
            if(buffer[len-1] == '\n')
             buffer[len-1] = 0;
            char *querys  = index(buffer, ' ');
            char *query = NULL;
            if(querys == NULL) continue;
            querys += 1; /*  not to include '\t' */
            query = strtok(buffer, " ");
            while(query){
                   printf("%s\t1\n", query);
                   query = strtok(NULL, " ");
            }
     }
     return 0;
}
//---------------------------------------------------------------------------------------
//reducer
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#define BUFFER_SIZE     1024
#define DELIM   "\t"
int main(int argc, char *argv[]){
 char strLastKey[BUFFER_SIZE];
 char strLine[BUFFER_SIZE];
 int count = 0;
 *strLastKey = '\0';
 *strLine = '\0';
 while( fgets(strLine, BUFFER_SIZE - 1, stdin) ){
 char *strCurrKey = NULL;
 char *strCurrNum = NULL;
 strCurrKey  = strtok(strLine, DELIM);
 strCurrNum = strtok(NULL, DELIM); /* necessary to check error but.... */
 if( strLastKey[0] == '\0'){
 strcpy(strLastKey, strCurrKey);
 }
 if(strcmp(strCurrKey, strLastKey)){
 printf("%s\t%d\n", strLastKey, count);
 count = atoi(strCurrNum);
 }else{
 count += atoi(strCurrNum);
 }
 strcpy(strLastKey, strCurrKey);
 }
 printf("%s\t%d\n", strLastKey, count); /* flush the count */
 return 0;
}

(2)C++语言实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//mapper
#include <stdio.h>
#include <string>
#include <iostream>
using namespace std;
int main(){
        string key;
        string value = "1";
        while(cin>>key){
                cout<<key<<"\t"<<value<<endl;
        }
        return 0;
}
//------------------------------------------------------------------------------------------------------------
//reducer
#include <string>
#include <map>
#include <iostream>
#include <iterator>
using namespace std;
int main(){
        string key;
        string value;
        map<string, int> word2count;
        map<string, int>::iterator it;
        while(cin>>key){
                cin>>value;
                it = word2count.find(key);
                if(it != word2count.end()){
                        (it->second)++;
                }
                else{
                        word2count.insert(make_pair(key, 1));
                }
        }
        for(it = word2count.begin(); it != word2count.end(); ++it){
                cout<<it->first<<"\t"<<it->second<<endl;
        }
        return 0;
}

(3)shell脚本语言实现

1
2
3
4
5
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper cat \
   -reducer  wc

(4)Python脚本语言实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words while removing any empty strings
    words = filter(lambda word: word, line.split())
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)
#---------------------------------------------------------------------------------------------------------
#!/usr/bin/env python
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # parse the input we got from mapper.py
    word, count = line.split()
    # convert count (currently a string) to int
    try:
        count = int(count)
        word2count[word] = word2count.get(word, 0) + count
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
    print '%s\t%s'% (word, count)

5、常见问题

(1)作业总是运行失败:

需要把mapper文件和reducer文件放到各个tasktracker上,保证各个节点均有一份。也可在提交作业时,采用-file选项指定这些文件。

(2)用脚本编写时,第一行需注明脚本解释器,默认是shell

6、参考资料

【1】C++&Python实现Hadoop Streaming的paritioner和模块化

【2】如何在Hadoop中使用Streaming编写MapReduce

【3】Hadoop如何与C++结合

【4】Hadoop Streaming和pipes理解

7、程序打包下载

文章中用到的程序源代码可在此处下载

原创文章,转载请注明: 转载自董的博客

本文链接地址: http://dongxicheng.org/mapreduce/hadoop-streaming-programming/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/302823.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数学不好、英语不好、非本专业,想学python数据分析,能安排吗?

全世界有3.14 % 的人已经关注了数据与算法之美“非本专业想转型做数据分析&#xff0c;有救吗&#xff1f;”“数学不好&#xff0c;英语不好&#xff0c;想学Python数据分析&#xff0c;有救吗&#xff1f;”“不懂Python数据分析到底是什么&#xff0c;有救吗&#xff1f;”我…

在 .NET 中使用 C# 处理 YAML

在 .NET 中&#xff0c;可以使用 YamlDotNet 类库解析和生成 YAML 文件。YamlDotNet &#xff1a;https://github.com/aaubry/YamlDotNetNuGet 下载&#xff1a;https://www.nuget.org/packages/YamlDotNet/帮助文档&#xff1a;https://github.com/aaubry/YamlDotNet/wiki序列…

js数组去重,合集等操作

<html> <head><script type"text/javascript"> var a[a,b,c]; var b[a,b,d,f]; var arr1 intersection(a,b); alert("a,b的合集-不重复:"arr1); var arr2 chaji(a,b); alert("a与b不重复的部分:"arr2); var arr3 inANotInB…

java读取图片缩略方法_java 图片缩略图的两种方法

最近网上看到两种不同的java图片缩略图的绘制方案第一种&#xff0c;使用Graphics().drawImage按照一定的比例重新绘制图像。Java代码package com.image.suoluetu;import java.io.*;import java.awt.*;import java.awt.image.*;import com.sun.image.codec.jpeg.*;public class…

Python项目可以有多大?最多可以有多少行代码?

全世界有3.14 % 的人已经关注了数据与算法之美导读&#xff1a;总是看到有人说&#xff0c;动态一时爽&#xff0c;重构火葬场。然而这世界上有的是著名的开源项目&#xff0c; 也有像 Github、Instagram 这样流量巨大的知名网站是基于动态语言开发的&#xff0c;经过了这么多年…

从好买辞职后,为什么我会加入一家开源创业公司?

这是头哥侃码的第240篇原创熟悉我的朋友都知道&#xff0c;我是一个闲不住的人。工作之余&#xff0c;我不仅愿意把自己的经验拿出来与大家分享&#xff0c;而且还总是喜欢在字里行间中表达情感&#xff0c;并抒发命运的奇妙与无常。为什么&#xff1f;因为在我看来&#xff0c…

Apache 虚拟主机 VirtualHost 配置

为什么80%的码农都做不了架构师&#xff1f;>>> 虚拟主机 (Virtual Host) 是在同一台机器搭建属于不同域名或者基于不同 IP 的多个网站服务的技术. 可以为运行在同一物理机器上的各个网站指配不同的 IP 和端口, 也可让多个网站拥有不同的域名. Apache 是世界上使用…

selenium java 参数化_Java+selenium 自动化测试【03】-- 数据驱动之参数化

目录1.前言2.读取txt文件实现参数化3.使用Excel表格参数化4.使用TestNG.xml文件参数化5.使用DataProvider传参前言在上一个随笔中&#xff0c;我们已经自动化测试模型&#xff0c;讲到数据驱动模型。数据驱动&#xff1a;是自动化的一个进步&#xff0c;从本意来讲&#xff0c;…

IT公司老板落水,各部门员工怎么救?

公司高层公司副总A&#xff1a;咱们开个会研究一下这个事情怎么处理。公司副总B&#xff1a;如果老板没有救成功&#xff0c;下任是谁呢&#xff1f;会不会影响公司的上市&#xff1f;公司副总C&#xff1a;我认为咱们开会应该讨论两个方案&#xff0c;一个是救人方案&#xff…

这样用Docker 搭建 Jenkins 实现自动部署,你知道吗?

一、为什么用jenkins主要是我们现在的项目都是采用手动部署的&#xff0c;每当给项目中新增一个功能就必须执行(打包--测试--上传测试修复的包到指定服务器--部署上线)&#xff0c;这个流程操作起来繁杂&#xff0c;不小心就可能导致部署失败&#xff1b;急需找到一个方式来解决…

struts.properties属性解释

Struts 2框架有两个核心配置文件:struts.xml和struts.properties 其中struts.xml文件主要负责管理应用中的Action映射&#xff0c;以及该Action包含的Result定义等。除此之外&#xff0c;Struts2框架还包含一个struts.properties文件&#xff0c;该文件定义了Struts 2框架的大量…

java if or android_RxJava switchIfEmpty操作符实现Android检查本地缓存逻辑判断

switchIfEmpty(Observable emptyObservable)操作符从字面意思上就很好理解&#xff0c;就是当为空的时候跳转到emptyObservable。那末如何理解当为空的时候. 下面将会使用实际案例解释这个switchIfEmpty的使用方法。业务需求假设我们的app里有加载文章列表功能&#xff0c;要求…

这是对R的误解!R的应用原来这么广!

R作为一种统计分析软件&#xff0c;广泛应用于生物、医学、电商、新闻等数据相关行业&#xff0c;是目前主流数据应用软件之一。为了更好地帮助大家了解并快速入门R语言&#xff0c;现超级数学建模携手柯老师以R语言为基础&#xff0c;向大家隆重推出《R语言基础》系列课。柯老…

WinDBg定位asp.net mvc项目异常崩溃源码位置

项目介绍&#xff1a;asp.net mvc angular iiswindows server系统莫名崩溃最近有个系统默认奇妙崩溃50x&#xff0c;服务整体变成无响应&#xff0c;当运维告知我只有重启应用程序池项目才能正常。我问他如何重现&#xff0c;得到的回复是我这里无法重现&#xff0c;但客户使用…

selenium webdirver之ruby-开发ide安装

这里用到的ide是netbeans ruby环境安装好后&#xff0c;下载netbeans&#xff0c;然后进行安装 netbeans ruby plugins download URL&#xff1a;http://jruby.org.s3.amazonaws.com/downloads/community-ruby/community-ruby_7_1_preview1.zip 解压 打开netbeans&#xff0c;工…

mysql如何和qt连接使用_Qt5学习:连接MySQL数据库

一、环境QT版本&#xff1a;QT 5.8.0(msvc2013_64)MySQL版本&#xff1a;mysql 5.7.19二、配置之前 mysql 数据库一直都连接不上&#xff0c;网上也搜了很多资料&#xff0c;主要还是库文件的问题。重新将 mysql.pro 编译一下&#xff0c;将生成的 .dll 和 .lib 文件拷贝到 QT …

TensorFlow框架的这些操作你肯定不知道!

谷歌在上周正式推出了深度学习框架TensorFlow 1.11.0 版本&#xff0c;那么TensorFlow框架到底是什么&#xff1f;TensorFlow™ 是一个采用数据流图&#xff08;data flow graphs&#xff09;&#xff0c;用于数值计算的开源软件库。最初由Google大脑小组的研究员和工程师们开发…

C#中HashTable、Dictionary、ConcurrentDictionary区别

一、HashTableHashTable表示键/值对的集合。在.NET Framework中&#xff0c;Hashtable是System.Collections命名空间提供的一个容器&#xff0c;用于处理和表现类似key-value的键值对&#xff0c;其中key通常可用来快速查找&#xff0c;同时key是区分大小写&#xff1b;value用…

CDA数据分析师备考必看,L1L2通用

CDA数据分析师L1,L2均可 可安排当月月底的线上考试 线上考的为双机位监考&#xff0c;但是不用担心 安全无隐患&#xff0c;需要联系 当月拿证

[SIR数据集实验][2]Java类数据集相应工具使用的小经验

这段时间安排一个师弟在继续学习SIR数据集的使用和实验方法&#xff0c;这里我先总结点经验。 如果要生成某一个数据集的Fault Matrix&#xff0c;&#xff08;虽然一般在info目录下有已经生成好的Fault Matrix&#xff0c;但按照Java Object Handbook里面的建议&#xff0c;最…