05 MapReduce应用案例02

6、統計每個月份中,最高的三個溫度。

輸入格式:年月日 空格 時分秒 TAB 溫度

inputfile:

1949-10-01 14:21:02    34c
1949-10-02 14:01:02    36c
1950-01-01 11:21:02    32c
1950-10-01 12:21:02    37c
1951-12-01 12:21:02    23c
1950-10-02 12:21:02    41c
1950-10-03 12:21:02    27c
1951-07-01 12:21:02    45c
1951-07-02 12:21:02    46c
1951-07-03 12:21:03    47c

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WRunner {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJobName("weather");job.setJarByClass(WRunner.class);job.setMapperClass(WMapper.class);job.setReducerClass(WReducer.class);job.setMapOutputKeyClass(MyKey.class);job.setMapOutputValueClass(DoubleWritable.class);job.setPartitionerClass(MyPartitioner.class);job.setSortComparatorClass(MySort.class);job.setGroupingComparatorClass(MyGroup.class);job.setInputFormatClass(KeyValueTextInputFormat.class);job.setNumReduceTasks(3);Path in = new Path("/home/jinzhao/mrtest/input");FileInputFormat.setInputPaths(job, in);Path out = new Path("/home/jinzhao/mrtest/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(out))fs.delete(out, true);FileOutputFormat.setOutputPath(job, out);job.waitForCompletion(true);}static class WMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");NullWritable nw = NullWritable.get();@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {try {Date date = sdf.parse(key.toString());Calendar c = Calendar.getInstance();c.setTime(date);int year = c.get(Calendar.YEAR);int month = c.get(Calendar.MONTH);int day = c.get(Calendar.DAY_OF_MONTH);String h = value.toString().trim();double hot = Double.parseDouble(h.substring(0, h.length()-1));context.write(new MyKey(year, month, day, hot), new DoubleWritable(hot));} catch (ParseException e) {e.printStackTrace();}}}static class WReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{@Overrideprotected void reduce(MyKey key, Iterable<DoubleWritable> values, Context context)throws IOException, InterruptedException {int i=0;for(DoubleWritable v : values){++i;String msg = key.getYear() + "\t" + (key.getMonth() + 1) + "\t" + (key.getDay()+1) + "\t" + v.get();context.write(new Text(msg), NullWritable.get());if (i == 3)break;}}}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** 序列化所传输的对象*/
public class MyKey implements WritableComparable<MyKey> {private int year;private int month;private int day;private double hot;public MyKey(){super();}public MyKey(int year, int month, int day, double hot){this.year = year;this.month = month;this.day = day;this.hot = hot;}public int getYear() {return year;}public void setYear(int year) {this.year = year;}public int getMonth() {return month;}public void setMonth(int month) {this.month = month;}public int getDay() {return day;}public void setDay(int day) {this.day = day;}public double getHot() {return hot;}public void setHot(double hot) {this.hot = hot;}@Overridepublic void readFields(DataInput arg0) throws IOException {this.year = arg0.readInt();this.month = arg0.readInt();this.hot = arg0.readDouble();this.day = arg0.readInt();}@Overridepublic void write(DataOutput arg0) throws IOException {arg0.writeInt(year);arg0.writeInt(month);arg0.writeDouble(hot);	arg0.writeInt(day);}/*** 判断是否是同一个对象,当对象作为key时。*/@Overridepublic int compareTo(MyKey arg0) {int r1 = Integer.compare(this.year, arg0.getYear());if (r1 == 0){int r2 = Integer.compare(this.month, arg0.getMonth());if (r2 == 0){return Double.compare(this.hot, arg0.getHot());}else{return r2;}}elsereturn r1;}}

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 分组,将具有相同年份和月份的MyKey作为一组,即传递给一个reduce函数进行处理。*/
public class MyGroup extends WritableComparator{public MyGroup(){super(MyKey.class, true);}public int compare (WritableComparable a, WritableComparable b){MyKey k1 = (MyKey)a;MyKey k2 = (MyKey)b;int r1 = Integer.compare(k1.getYear(), k2.getYear());if (r1 == 0){return Integer.compare(k1.getMonth(), k2.getMonth());}elsereturn r1;}
}

package hadoop.wheather;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 自定义的排序,先分组,再排序*/
public class MySort extends WritableComparator{public MySort(){super(MyKey.class, true);}public int compare (WritableComparable a, WritableComparable b){MyKey k1 = (MyKey)a;MyKey k2 = (MyKey)b;int r1 = Integer.compare(k1.getYear(), k2.getYear());if (r1 == 0){int r2 = Integer.compare(k1.getMonth(), k2.getMonth());if (r2 == 0){return -Double.compare(k1.getHot(), k2.getHot());}elsereturn r2;}elsereturn r1;}
}

package hadoop.wheather;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;/*** 分区,每个分区由一个reduce进程来处理*/
public class MyPartitioner extends Partitioner<MyKey, DoubleWritable>{@Overridepublic int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {return(key.getYear() - 1949)%numReduceTasks;}}

7、社交網路的朋友推薦算法

格式:用戶 TAB 朋友1 空格 朋友2 空格 ...

inputfile:

小明    老王 如花 林志玲
老王    小明 凤姐
如花    小明 李刚 凤姐
林志玲    小明 李刚 凤姐 郭美美
李刚    如花 凤姐 林志玲
郭美美    凤姐 林志玲
凤姐    如花 老王 林志玲 郭美美

第一次輸出:

格式:用戶1 空格 用戶2 TAB 次數

第二次輸出:

格式:用戶 TAB 推薦1 空格 推薦2 空格...

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Friends {static class FofMapper extends Mapper<Text, Text, Fof, IntWritable>{@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {String user = key.toString();String[] friends = value.toString().split(" ");for (int i = 0; i < friends.length; ++i){context.write(new Fof(user, friends[i]), new IntWritable(0));for (int j = i + 1; j < friends.length; ++j)context.write(new Fof(friends[i], friends[j]), new IntWritable(1));}}}static class FofReducer extends Reducer<Fof, IntWritable, Fof, IntWritable>{@Overrideprotected void reduce(Fof key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;boolean flag = true;for (IntWritable i : values){if (i.get() == 0){flag = false;break;}else{sum = sum + i.get();}}if (flag)context.write(key, new IntWritable(sum));}}public static void main(String[] args){try {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Friends.class);job.setJobName("friend-I");job.setMapperClass(FofMapper.class);job.setReducerClass(FofReducer.class);job.setMapOutputKeyClass(Fof.class);job.setMapOutputValueClass(IntWritable.class);job.setInputFormatClass(KeyValueTextInputFormat.class);Path in = new Path("/home/jinzhao/mrtest/input");FileInputFormat.setInputPaths(job, in);Path out = new Path("/home/jinzhao/mrtest/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(out))fs.delete(out, true);FileOutputFormat.setOutputPath(job,  out);if ( job.waitForCompletion(true)){Job job2 = Job.getInstance(conf);job2.setJarByClass(Friends.class);job2.setJobName("friend-II");job2.setMapperClass(SortMapper.class);job2.setReducerClass(SortReducer.class);job2.setMapOutputKeyClass(User.class);job2.setMapOutputValueClass(User.class);job2.setInputFormatClass(KeyValueTextInputFormat.class);job2.setSortComparatorClass(FSort.class);job2.setGroupingComparatorClass(FGroup.class);Path in2 = new Path("/home/jinzhao/mrtest/output");FileInputFormat.setInputPaths(job2, in2);Path out2 = new Path("/home/jinzhao/mrtest/output2");if (fs.exists(out2))fs.delete(out2, true);FileOutputFormat.setOutputPath(job2,  out2);job2.waitForCompletion(true);}} catch (Exception e){e.printStackTrace();}}static class SortMapper extends Mapper<Text, Text, User, User>{@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {String[] friends = key.toString().split(" ");int count = Integer.parseInt(value.toString());context.write(new User(friends[0], count), new User(friends[1], count));context.write(new User(friends[1], count), new User(friends[0], count));}}static class SortReducer extends Reducer<User, User, Text, Text>{@Overrideprotected void reduce(User key, Iterable<User> values, Context context)throws IOException, InterruptedException {StringBuilder sb = new StringBuilder();for (User i : values)sb.append(i.getUsername() + "," + i.getCount() + " ");context.write(new Text(key.getUsername()), new Text(sb.toString().trim()));}}
}

import org.apache.hadoop.io.Text;public class Fof extends Text{public Fof(){super();}public Fof(String a, String b){super(getFof(a, b));}public static String getFof(String a, String b){int r = a.compareTo(b);if (r < 0)return a + " " + b;else return b + " " + a;} 
}

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class User implements WritableComparable<User>{private String username;private int count;public User(){}public User(String username, int count){this.username = username;this.count = count;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(username);out.writeInt(count);}@Overridepublic void readFields(DataInput in) throws IOException {this.username = in.readUTF();this.count = in.readInt();}@Overridepublic int compareTo(User arg0) {int c1 = this.username.compareTo(arg0.username);if (c1 == 0){return this.count - arg0.getCount();} elsereturn c1;}}

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class FGroup extends WritableComparator{public FGroup(){super(User.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {User u1 = (User)a;User u2 = (User)b;return  u1.getUsername().compareTo(u2.getUsername());}}

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class FSort extends WritableComparator{public FSort(){super(User.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {User u1 = (User)a;User u2 = (User)b;int c1 = u1.getUsername().compareTo(u2.getUsername());if (c1==0){return u2.getCount() - u1.getCount();} elsereturn c1;}
}





本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/387142.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

05 MapReduce应用案例03

8、PageRank Page-rank源于Google&#xff0c;用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。 Page-rank实现了将链接价值概念作为排名因素。 算法原理 – 入链 投票 • Page-rank 让链接来“ 投票 “ ,到一个页面的超链接相当于对该页投一票。 – 入…

利用微信的weui框架上传、预览和删除图片

jQuery WeUI 是专为微信公众账号开发而设计的一个框架&#xff0c;jQuery WeUI的官网&#xff1a;http://jqweui.com/ 需求&#xff1a;需要在微信公众号网页添加上传图片功能 技术选型&#xff1a;实现上传图片功能可选百度的WebUploader、饿了么的Element和微信的jQuery WeUI…

【转】Java Socket编程基础及深入讲解

原文&#xff1a;https://www.cnblogs.com/yiwangzhibujian/p/7107785.html#q2.3.3 Socket是Java网络编程的基础&#xff0c;了解还是有好处的&#xff0c; 这篇文章主要讲解Socket的基础编程。Socket用在哪呢&#xff0c;主要用在进程间&#xff0c;网络间通信。本篇比较长&am…

使用 vue-i18n 切换中英文

使用 vue-i18n 切换中英文vue-i18n 仓库地址&#xff1a;https://github.com/kazupon/vue-i18n兼容性&#xff1a;支持 Vue.js 2.x 以上版本安装方法&#xff1a;&#xff08;此处只演示 npm&#xff09;npm install vue-i18n使用方法&#xff1a;1、在 main.js 中引入 vue-i18…

ZooKeeper数据模型

Zookeeper的数据模型 层次化的目录结构&#xff0c;命名符合常规文件系统规范&#xff08;Linux&#xff09; 每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识 节点Znode可以包含数据和子节点(即子目录)&#xff0c;但是EPHEMERAL类型的节点不能有子节点 Znod…

堆叠条形图

堆叠条形图 import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib as mpl import matplotlib.dates as mdates#解决能显示中文 mpl.rcParams[font.sans-serif][SimHei] #指定默认字体 SimHei为黑体 mpl.rcParams[axes.unicode_minus]Fal…

spring boot 服务器常用

ps aux|grep tgcwll /opt/nginx/html sudo cp -r /tmp/tgcw/dist/* /opt/nginx/html/design sudo cp -r /tmp/tgcw/dist/* /opt/nginx/html springboot 启动nohup java -jar tgcw-service-usermanagement-0.0.1-SNAPSHOT.jar --spring.profiles.activedemo > /dev/null 2&g…

PHP数组 转 对象/对象 转 数组

/*** 数组 转 对象** param array $arr 数组* return object*/ function array_to_object($arr) {if (gettype($arr) ! array) {return;}foreach ($arr as $k > $v) {if (gettype($v) array || getType($v) object) {$arr[$k] (object)array_to_object($v);}}return (obj…

ZooKeeper編程01--RMI服務的多服務器管理

服務器端與客戶端都要用到&#xff1a; public interface ZkInfo {String ZK_CONNECTION_STRING "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";int ZK_SESSION_TIMEOUT 5000;String ZK_REGISTRY_PATH "/registry";String ZK_PROVIDER_…

org.activiti.engine.ActivitiOptimisticLockingException updated by another transaction concurrently

org.activiti.engine.ActivitiOptimisticLockingException: Task[id5905010, name审核(市场部)] was updated by another transaction concurrentlyat org.activiti.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:872)at org.activiti.engine.impl.db.DbSqlSess…

DataTable不能通过已删除的行访问该行的信息解决方法

使用dt.Rows[0]["name", DataRowVersion.Original]可以获取转载于:https://www.cnblogs.com/heyiping/p/10616640.html

ZooKeeper編程02--多線程的分佈式鎖

面向過程版&#xff1a; package distributedLockProcess;import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zoo…

01 Python变量和数据类型

Python变量和数据类型 1 数据类型 计算机&#xff0c;顾名思义就是可以做数学计算的机器&#xff0c;因此&#xff0c;计算机程序理所当然也可以处理各种数值。 但是&#xff0c;计算机能处理的远不止数值&#xff0c;还可以处理文本、图形、音频、视频、网页等各种各样的数…

初识Python-1

1&#xff0c;计算机基础。 2&#xff0c;python历史。 宏观上&#xff1a;python2 与 python3 区别&#xff1a; python2 源码不标准&#xff0c;混乱&#xff0c;重复代码太多&#xff0c; python3 统一 标准&#xff0c;去除重复代码。 3&#xff0c;python的环境。 编译型&…

02 List、Tuple、Dict、Set

List 线性表 创建List&#xff1a; >>> classmates [Michael, Bob, Tracy] >>> L [Michael, 100, True] #可以在list中包含各种类型的数据 >>> empty_list [] #空List 按索引访问List&#xff1a; >>> print L[0] #索引从0开始…

Jenkins的一些代码

pipeline {agent anyenvironment { def ITEMNAME "erp"def DESTPATH "/home/ops/testpipe"def codePATH"/var/lib/jenkins/workspace/test_pipeline"}stages { stage(代码拉取){steps {echo "checkout from ${ITEMNAME}"git url:…

利用layui前端框架实现对不同文件夹的多文件上传

利用layui前端框架实现对不同文件夹的多文件上传 问题场景&#xff1a; 普通的input标签实现多文件上传时&#xff0c;只能对同一个文件夹下的多个文件进行上传&#xff0c;如果要同时上传两个或多个文件夹下的文件&#xff0c;是无法实现的。这篇文章就是利用layui中的插件&am…

ps、grep和kill联合使用杀掉进程

ps、grep和kill联合使用杀掉进程例如要杀掉hello这个进程&#xff0c;使用下面这个命令就能直接实现。ps -ef |grep hello |awk {print $2}|xargs kill -9这里是输出ps -ef |grep hello 结果的第二列的内容然后通过xargs传递给kill -9,其实第二列内容就是hello的进程号&#xf…

03 控制語句

if语句 if age > 18 print your age is, age else print teenager Python代码的缩进规则&#xff1a;具有相同缩进的代码被视为代码块。 if age > 18 print adult elif age > 6 print teenager elif age > 3 print kid else print baby for循环 L [Adam, L…

yum 来安装 nodejs

要通过 yum 来安装 nodejs 和 npm 需要先给 yum 添加 epel 源&#xff0c;添加方法在 centos 添加epel和remi源 中##添加 epel 源 64位: rpm -ivh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm32位: rpm -ivh http://download.fedoraproj…