实例讲解hadoop中的map/reduce查询(python语言实现)

条件,假设你已经装好了hadoop集群,配好了hdfs并可以正常运行。

$hadoop dfs -ls /data/dw/explorer
Found 1 items
drwxrwxrwx     - rsync supergroup                    0 2011-11-30 01:06 /data/dw/explorer/20111129


$ hadoop dfs -ls /data/dw/explorer/20111129
Found 4 items
-rw-r--r--     3 rsync supergroup     12294748 2011-11-29 21:10 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo
-rw-r--r--     3 rsync supergroup             1520 2011-11-29 21:11 /data/dw/explorer/20111129/explorer_20111129_19_part-00000.lzo.index
-rw-r--r--     3 rsync supergroup     12337366 2011-11-29 22:09 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo
-rw-r--r--     3 rsync supergroup             1536 2011-11-29 22:10 /data/dw/explorer/20111129/explorer_20111129_20_part-00000.lzo.index

数据格式如下

20111129/23:59:54 111.161.25.184 182.132.25.243 <Log_Explorer ProductVer="5.05.1026.1111" UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"><UserDoubleClick FileExt="mp3" AssociateKey="Audio.mp3" Count="1"/></Log_Explorer>



1.map脚本取数据explorer_map.py

#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys
import cElementTree

debug = False#设置lzo文件偏移位
if debug:
        lzo = 0
else:
        lzo = 1

for line in sys.stdin:
        try:
                flags = line[:-1].split('\t')
#hadoop查询走标准输入,数据以\t分隔,去掉每行中的\n
                if len(flags) == 0:
                        break
                if len(flags) != 11+lzo:
#hadoop采用lzo则偏移位+1,lzo设置为False则+1
                        continue
                stat_date=flags[0+lzo]#日期
                stat_date_bar = stat_date[:4]+"-"+stat_date[4:6]+'-'+stat_date[6:8]#拼成2011-11-29格式
                version = flags[4+lzo]
                xmlstr = flags[10+lzo]
                #xmlstr=line
                dom = cElementTree.fromstring(xmlstr)
#xml字段对象,以下均为取值操作
                uuid = dom.attrib['UUID']
                node = dom.find('UserDoubleClick')
                associateKey=node.get('AssociateKey')
                associateKeys=associateKey.split('.')
                player = associateKeys[0]
                fileext=node.get('FileExt')
                count=node.get('Count')
                print stat_date_bar+','+version+','+fileext+','+player+','+associateKey+'\t'+count
#输出map后的数据,这里map不对数据做任何处理,只做取值,拼接操作
#将\t前的字符串作为key输入reduce,\t后的count作为reduce计算用的value
except Exception,e:
print e
#抛出异常        

2.reduce脚本计算结果并输出explorer_red.py

#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys
import cElementTree
import os
import string

res = {}

for line in sys.stdin:
        try:
                flags = line[:-1].split('\t')
#拆分\t以获得map传过来的key和value
                if len(flags) != 2:
#\t切割后,如果数据有问题,元素多于2或者少于2则认为数据不合法,跳出继续下一行
                        continue
                skey= flags[0]
#取出第一个元素作为key
                count=int(flags[1])
#取出第二个元素作为value
                if res.has_key(skey) == False:
                        res[skey]=0
                res[skey] += count
#计算count总和
        except Exception,e:
                pass
#不抛出,继续执行

for key in res.keys():
        print key+','+'%s' % res[key]
#格式化输出,以放入临时文件

3.放入crontab执行的脚本

#!/bin/sh

[ $1 ] && day=$1 DATE=`date -d "$1" +%Y%m%d`
[ $1 ] || day=`date -d "1 day ago" +%Y%m%d`     DATE=`date -d "1 day ago" +%Y%m%d`
#取昨天日期

cd /opt/modules/hadoop/hadoop-0.20.203.0/
#进入hadoop工作目录
bin/hadoop jar contrib/streaming/hadoop-streaming-0.20.203.0.jar -file /home/rsync/explorer/explorer_map.py -file /home/rsync/explorer/explorer_red.py -mapper /home/rsync/explorer/explorer_map.py -reducer /home/rsync/explorer/explorer_red.py -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -input /data/dw/explorer/$DATE -output /tmp/explorer_$DATE
#执行map/reduce,并将排序完结果放入hdfs:///tmp/explorer

bin/hadoop fs -copyToLocal /tmp/explorer_$DATE /tmp
#将m/r结果从hdfs://tmp/explorer_$DATE 保存到本地/tmp下
bin/hadoop dfs -rmr /tmp/explorer_$DATE
#删除hdfs下临时文件夹

cd
#返回自身目录
cd explorer
#进入explorer文件夹
./rm.py $DATE
执行入库和删除临时文件夹脚本


4.将/tmp生成的结果入库并删除临时文件夹

#!/usr/bin/python

import os
import sys
import string

if len(sys.argv) == 2:
                date = sys.argv[1:][0] #取脚本参数
                os.system ("mysql -h192.168.1.229 -ujobs -p223238 -P3306    bf5_data    -e \"load data local infile '/tmp/explorer_"+date+"/part-00000' into table explorer FIELDS TERMINATED
BY '\,' (stat_date,ver,FileExt,player,AssociateKey,count)\"")#执行入库sql语句,并用load方式将数据加载到统计表中
                os.system ("rm -rf /tmp/explorer_"+date)#删除map/reduce过的数据
else:
                print "Argv error"

#因为没有安装MySQLdb包,所以用运行脚本的方式加载数据。

原始数据和最后完成的输出数据对比,红色为原数据,绿色为输出数据

20111129/23:59:54 111.161.25.184 182.132.25.243 <Log_Explorer ProductVer="5.05.1026.1111" UUID="{C9B80A9B-704E-B106-9134-1ED3581D0123}"><UserDoubleClick FileExt="mp3" AssociateKey="Audio.mp3" Count="1"/></Log_Explorer>

-----------------------------------------------------------

2011-11-29,5.05.1026.1111,mp3,Audio,Audio.mp3,1


5.调试技巧

因为这种方式比较抽象,所以你很难得到一个直观的调试过程。建议调试如下

#将hadoop中的数据文本copy出来一个,lzo需要解压缩,然后将map中的debug模式置为True,也就是不加hadoop中的lzo偏移量。
#用head输入hadoop里的文件,通过管道操作放入map/reduce中执行,看输出结果

$head explorer_20111129 | explorer_map.py | explorer_red.py

一天的数据大概几十个G,以前用awk和perl脚本跑需要至少半小时以上,改用map/reduce方式后,大概20几秒跑完,效率还是提高了很多的。

转载于:https://www.cnblogs.com/java20130722/p/3206898.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/474206.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python中僵尸进程

⼦进程运⾏完成&#xff0c;但是⽗进程迟迟没有进⾏回收&#xff0c;此时⼦进程实际上并没有退出&#xff0c;其仍然占⽤着系统资源&#xff0c;这样的⼦进程称为僵⼫进程。 因为僵⼫进程的资源⼀直未被回收&#xff0c;造成了系统资源的浪费&#xff0c;过多的僵⼫进程将造成…

01.神经网络和深度学习 W3.浅层神经网络

文章目录1. 神经网络概览2. 神经网络的表示3. 神经网络的输出4. 多样本向量化5. 激活函数6. 为什么需要 非线性激活函数7. 激活函数的导数8. 随机初始化作业参考&#xff1a; 吴恩达视频课 深度学习笔记 1. 神经网络概览 xW[1]b[1]}⟹z[1]W[1]xb[1]⟹a[1]σ(z[1])\left.\begin…

多进程修改全局变量

多进程中&#xff0c;每个进程中所有数据&#xff08;包括全局变量&#xff09;都各有拥有⼀份&#xff0c;互不影响 (读时共享&#xff0c;写时复制) import os import time num 100 ret os.fork() # 创建新的进程 一次调用&#xff0c;两次返回 if ret 0: # 子进程…

多进程模块multiprocessing

multiprocessing模块就是跨平台版本的多进程模块&#xff0c;提供了⼀个Process类来代表一个进程对象 创建⼦进程时&#xff0c;只需要传⼊⼀个执⾏函数和函数的参数&#xff0c;创建⼀个 Process实例&#xff0c;⽤start&#xff08;&#xff09;方法启动 &#xff0c;join()…

01.神经网络和深度学习 W2.神经网络基础(作业:逻辑回归 图片识别)

文章目录编程题 11. numpy 基本函数1.1 编写 sigmoid 函数1.2 编写 sigmoid 函数的导数1.3 reshape操作1.4 标准化1.5 广播机制2. 向量化2.1 L1\L2损失函数编程题 2. 图片&#x1f431;识别1. 导入包2. 数据预览3. 算法的一般结构4. 建立算法4.1 辅助函数4.2 初始化参数4.3 前向…

PL/SQL程序设计以及安全管理实验遇到的问题及解决

问题一&#xff1a;当我书写PL/SQL语句调用所创建的函数时&#xff0c;报“此范围不存在名为XXX函数名”的错误。 解决&#xff1a; 我通过查阅相关资料&#xff0c;了解到&#xff1a;这种情况主要是调用的函数的参数或者函数名书写错误&#xff0c; 然而&#xff0c;我经过仔…

PowerDesigner使用教程 —— 概念数据模型 (转)

一、概念数据模型概述 概念数据模型也称信息模型&#xff0c;它以实体&#xff0d;联系(Entity-RelationShip,简称E-R)理论为基础&#xff0c;并对这一理论进行了扩充。它从用户的观点出发对信息进行建模&#xff0c;主要用于数据库的概念级设计。 通常人们先将现实世界抽…

进程的创建-Process⼦类

from multiprocessing import Process&#xff08;P必须大写 import os import time classSubProcess(Process): """创建Process的子类""" def __init__(self, num, a): super(SubProcess, self).__init__() # 执行父类Process默认的初始化方法…

阿里云 超级码力在线编程大赛初赛 第1场(第245名)

文章目录1. 比赛结果2. 题目1. 树木规划2. 正三角形拼接3. 大楼间穿梭4. 对称前后缀1. 比赛结果 通过了 3 题&#xff0c;第245名&#xff0c;进入复赛了&#xff0c;收获 T恤 一件&#xff0c;哈哈。 2. 题目 1. 树木规划 题目链接 描述 在一条直的马路上&#xff0c;…

python中的进程池Pool

初始化Pool时&#xff0c;可以指定⼀个最大进程池&#xff0c;当有新进程提交时&#xff0c;如果池还没有满&#xff0c;那么就会创建新进程请求&#xff1b;但如果池中达到最大值&#xff0c;那么就会等待&#xff0c;待池中有进程结束&#xff0c;新进程来执行。 非阻塞式&a…

AS3.0面向对象的写法,类和实例

package /*package是包路径&#xff0c;例如AS文件在ActionScript文件夹下&#xff0c;此时路径应为package ActionScript。必须有的。package中只能有一个class&#xff0c;在一个AS文件中可以有若干个package*/ {public class hello /*类的名字*/{public var helloString:Str…

小小算法题(CCF)

题目 淘金 题目描述 在一片n*m的土地上&#xff0c;每一块1*1的区域里都有一定数量的金子。这一天&#xff0c;你到这里来淘金&#xff0c;然而当地人告诉你&#xff0c;如果你要挖(x, y)区域的金子&#xff0c;就不能挖(x-1&#xff0c;y),(x1, y)以及横坐标为y-1和y1的金…

01.神经网络和深度学习 W3.浅层神经网络(作业:带一个隐藏层的神经网络)

文章目录1. 导入包2. 预览数据3. 逻辑回归4. 神经网络4.1 定义神经网络结构4.2 初始化模型参数4.3 循环4.3.1 前向传播4.3.2 计算损失4.3.3 后向传播4.3.4 梯度下降4.4 组建Model4.5 预测4.6 调节隐藏层单元个数4.7 更改激活函数4.8 更改学习率4.9 其他数据集下的表现选择题测试…

python中的异步与同步

异步&#xff1a; 多任务&#xff0c; 多个任务之间执行没有先后顺序&#xff0c;可以同时运行&#xff0c;执行的先后顺序不会有什么影响&#xff0c;存在的多条运行主线 同步&#xff1a; 多任务&#xff0c; 多个任务之间执行的时候要求有先后顺序&#xff0c;必须一个先执行…

C语言指针祥讲

前言:复杂类型说明 要了解指针,多多少少会出现一些比较复杂的类型,所以我先介绍一下如何完全理解一个复杂类型,要理解复杂类型其实很简单,一个类型里会出现很多运算符,他们也像普通的表达式一样,有优先级,其优先级和运算优先级一样,所以我总结了一下其原则:从变量名处起,根…

编程挑战(6)

组合算法&#xff1a;开一个数组&#xff0c;其下标表示1到m个数&#xff0c;数组元素的值为1表示其下标代表的数被选中&#xff0c;为0则没有选中。 首先初始化&#xff0c;将数组前n个元素置1&#xff0c;表示第一个组合为前n个数&#xff1b;然后从左到右扫描数组元素值的“…

[编程启蒙游戏] 2. 奇偶数

文章目录1. 游戏前提2. 游戏目的3. python代码1. 游戏前提 孩子知道奇偶数是什么&#xff0c;不知道也没关系 还可以采用掰手指演示&#xff0c;伸出两个手指能配对&#xff0c;所有伸出来的手指都两两配对了&#xff0c;伸出来的手指个数就是偶数如果还有1个没有找到朋友的手…

进程间通信-Queue 消息队列 先进先出

Process之间有时需要通信&#xff0c;操作系统提供了很多机制来实现进程间的通信。 multiprocessing模块的Queue实现多进程之间的数据传递&#xff0c;Queue本身是一个消息列队程序 初始化Queue()对象时&#xff08;例如&#xff1a;qQueue()&#xff09;&#xff0c;若括号中没…

过压保护(1)

征一个简单、可靠的电源过压保护电路 http://www.amobbs.com/thread-5542005-1-1.html 防过压&#xff1a;过压之后TVS导通&#xff0c;电流由正极流经自恢复保险再流经TVS到负极&#xff0c;自恢复保险升温&#xff0c;阻值变大&#xff0c;相当于断开&#xff0c;等电流撤去&…