2019独角兽企业重金招聘Python工程师标准>>>
一、环境安装
1.安装hadoop
http://my.oschina.net/u/204498/blog/519789
2.安装spark
3.启动hadoop
4.启动spark
二、
1.数据准备
从MAPR官网上下载数据DEV360DATA.zip并上传到server上。
[hadoop@hftclclw0001 spark-1.5.1-bin-hadoop2.6]$ pwd
/home/hadoop/spark-1.5.1-bin-hadoop2.6[hadoop@hftclclw0001 spark-1.5.1-bin-hadoop2.6]$ cd test-data/[hadoop@hftclclw0001 test-data]$ pwd
/home/hadoop/spark-1.5.1-bin-hadoop2.6/test-data/DEV360Data[hadoop@hftclclw0001 DEV360Data]$ ll
total 337940
-rwxr-xr-x 1 hadoop root 575014 Jun 24 16:18 auctiondata.csv =>c测试用到的数据
-rw-r--r-- 1 hadoop root 57772855 Aug 18 20:11 sfpd.csv
-rwxrwxrwx 1 hadoop root 287692676 Jul 26 20:39 sfpd.json[hadoop@hftclclw0001 DEV360Data]$ more auctiondata.csv
8213034705,95,2.927373,jake7870,0,95,117.5,xbox,3
8213034705,115,2.943484,davidbresler2,1,95,117.5,xbox,3
8213034705,100,2.951285,gladimacowgirl,58,95,117.5,xbox,3
8213034705,117.5,2.998947,daysrus,10,95,117.5,xbox,3
8213060420,2,0.065266,donnie4814,5,1,120,xbox,3
8213060420,15.25,0.123218,myreeceyboy,52,1,120,xbox,3
...
...#数据结构如下
auctionid,bid,bidtime,bidder,bidrate,openbid,price,itemtype,daystolve#把数据上传到HDFS中
[hadoop@hftclclw0001 DEV360Data]$ hdfs dfs -mkdir -p /spark/exer/mapr
[hadoop@hftclclw0001 DEV360Data]$ hdfs dfs -put auctiondata.csv /spark/exer/mapr
[hadoop@hftclclw0001 DEV360Data]$ hdfs dfs -ls /spark/exer/mapr
Found 1 items
-rw-r--r-- 2 hadoop supergroup 575014 2015-10-29 06:17 /spark/exer/mapr/auctiondata.csv
2.运行spark-shell 我用的scala.并针对以下task,进行分析
tasks:
a.How many items were sold?
b.How many bids per item type?
c.How many different kinds of item type?
d.What was the minimum number of bids?
e.What was the maximum number of bids?
f.What was the average number of bids?
[hadoop@hftclclw0001 spark-1.5.1-bin-hadoop2.6]$ pwd
/home/hadoop/spark-1.5.1-bin-hadoop2.6[hadoop@hftclclw0001 spark-1.5.1-bin-hadoop2.6]$ ./bin/spark-shell
...
...
scala >#首先从HDFS加载数据生成RDD
scala > val originalRDD = sc.textFile("/spark/exer/mapr/auctiondata.csv")
...
...
scala > originalRDD ==>我们来分析下originalRDD的类型 RDD[String] 可以看做是一条条String的数组,Array[String]
res26: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21##根据“,”把每一行分隔使用map
scala > val auctionRDD = originalRDD.map(_.split(","))
scala> auctionRDD ==>我们来分析下auctionRDD的类型 RDD[Array[String]] 可以看做是String的数组,但元素依然是数组即,可以认为Array[Array[string]]
res17: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:23
a.How many items were sold?
==> val count = auctionRDD.map(bid => bid(0)).distinct().count()
根据auctionid去重即可:每条记录根据“,”分隔,再去重,再计数
#获取第一列,即获取auctionid,依然用map
#可以这么理解下面一行,由于auctionRDD是Array[Array[String]]那么进行map的每个参数类型是Array[String],由于actionid是数组的第一位,即获取第一个元素Array(0),注意是()不是[]
scala> val auctionidRDD = auctionRDD.map(_(0))
...
...scala> auctionidRDD ==>我们来分析下auctionidRDD的类型 RDD[String] ,理解为Array[String],即所有的auctionid的数组
res27: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at map at <console>:26#对auctionidRDD去重
scala > val auctionidDistinctRDD=auctionidRDD.distinct()#计数
scala > auctionidDistinctRDD.count()
...
...
b.How many bids per item type?
===> auctionRDD.map(bid => (bid(7),1)).reduceByKey((x,y) => x + y).collect()
#map每一行,获取出第7列,即itemtype那一列,输出(itemtype,1)
#可以看做输出的类型是(String,Int)的数组
scala > auctionRDD.map(bid=>(bid(7),1))
res30: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[26] at map at <console>:26
...#reduceByKey即按照key进行reduce
#解析下reduceByKey对于相同的key,
#(xbox,1)(xbox,1)(xbox,1)(xbox,1)...(xbox,1) ==> reduceByKey ==> (xbox,(..(((1 + 1) + 1) + ... + 1))
scala > auctionRDD.map(bid=>(bid(7),1)).reduceByKey((x,y) => x + y)
#类型依然是(String,Int)的数组 String=>itemtype Int已经是该itemtype的计数总和了
res31: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at reduceByKey at <console>:26#通过collect() 转换成 Array类型数组
scala > auctionRDD.map(bid=>(bid(7),1)).reduceByKey((x,y) => x + y).collect()res32: Array[(String, Int)] = Array((palm,5917), (cartier,1953), (xbox,2784))