05 MapReduce应用案例01

1、单词计数

在一定程度上反映了MapReduce设计的初衷--对日志文件进行分析。

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{//该方法循环调用,从文件的split中读取每行调用一次,把该行所在的下标为key,该行的内容为valueprotected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {String[] words = StringUtils.split(value.toString(), ' ');for(String w :words){context.write(new Text(w), new IntWritable(1));}}
}
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{//每组调用一次,这一组数据特点:key相同,value可能有多个。protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum =0;for(IntWritable i: values){sum=sum+i.get();}context.write(key, new IntWritable(sum));}
}
public class RunJob {public static void main(String[] args) {Configuration config =new Configuration();
//	config.set("fs.defaultFS", "hdfs://node1:8020");
//	config.set("yarn.resourcemanager.hostname", "node1");
//      config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");try {FileSystem fs =FileSystem.get(config);Job job =Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName("wc");job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("/usr/input/"));Path outpath =new Path("/usr/output/wc");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f= job.waitForCompletion(true);if(f){System.out.println("job completed!");}} catch (Exception e) {e.printStackTrace();}}
}


2、数据去重

最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。

自然会想到将同一个数据的所有记录都交给一台Reduce机器,无论这个数据出现多少次,只要在最终结果中输出一次就可以了。

将单次计数程序稍加改动即可。

public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable>{protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}
}
public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable>{protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}
}
public class RunJob {public static void main(String[] args) {Configuration config =new Configuration();
//	config.set("fs.defaultFS", "hdfs://node1:8020");
//	config.set("yarn.resourcemanager.hostname", "node1");config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");try {FileSystem fs =FileSystem.get(config);Job job =Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName("dedup");job.setMapperClass(DedupMapper.class);job.setReducerClass(DedupReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job, new Path("/usr/input/"));Path outpath =new Path("/usr/output/dedup");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f= job.waitForCompletion(true);if(f){System.out.println("job completed!");}} catch (Exception e) {e.printStackTrace();}}
}



3、排序

对输入文件中的内容进行排序。

输入文件中的每行内容均为一个数字,即一个数据。

要求在输出中每行有两个间隔的数字,第二个数字代表原始数据,第一个数字代表原始数据的位次。

样例输入:

file1:

2

32

654

32

15

765

65223

file2:

5956

22

650

92

file3:

26

54

6

样例输出:

1 2

2 6

3 15

4 22

5 26

6 32

7 32

8 54

9 92

10 650

11 654

12 756

13 5956

14 65223


设计思路:

可以利用MapReduce过程中默认的排序,而不需要自己再实现排序。

重点:

1、待排序数据作为Map任务的key;

2、需要重写partition类,保证整体有序,具体做法是用输入数据的最大值除以系统partition数量的商作为分割数据的边界,即分割数据的边界为此商的1倍、2倍至numPartitions-1倍,这样就能保证执行完partition后是整体有序的。

3、Reduce获得<key, value-list>,根据value-list中元素的个数将输入的key作为value的输出次数。

package hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Sort
{public static class SortMapper extends Mapper<Object, Text, IntWritable, NullWritable>{private NullWritable nw = NullWritable.get();@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, IntWritable, NullWritable>.Context context)throws IOException, InterruptedException{context.write(new IntWritable(Integer.parseInt(value.toString().trim())), nw);}}public static class SortReducer extends Reducer<IntWritable, NullWritable, IntWritable, IntWritable>{private IntWritable counter = new IntWritable(1);@Overrideprotected void reduce(IntWritable key, Iterable<NullWritable> values,Reducer<IntWritable, NullWritable, IntWritable, IntWritable>.Context context)throws IOException, InterruptedException{for(NullWritable nw : values){context.write(counter, key);counter = new IntWritable(counter.get() + 1);}}}public static class SortPartitioner extends Partitioner<IntWritable, NullWritable>{//numPartitions equals with the number of reduce tasks@Overridepublic int getPartition(IntWritable key, NullWritable value, int numPartitions){int maxNumber = 65223;int bound = maxNumber/numPartitions;int keyNumber = key.get();for (int i = 0; i < numPartitions; ++i){if (keyNumber <= (i+1)*bound)return i;}return 0;}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Sort.class);job.setJobName("sort");job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(NullWritable.class);job.setNumReduceTasks(5);job.setPartitionerClass(SortPartitioner.class);String inputFile = "/home/jinzhao/dataset/input";String outputFile = "/home/jinzhao/dataset/output";FileInputFormat.setInputPaths(job, new Path(inputFile));Path output = new Path(outputFile);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(job, output);job.waitForCompletion(true);}
}


4、单表关联

样例输入:

file:

child parent

Tom Lucy

Tom Jack

Jone Lucy

Jone Jack

Lucy Mary

Lucy Ben

Jack Alice

Jack Jesse

Terry Alice

Terry Jesse

Philip Terry

Philip Alma

Mark Terry

Mark Alma

样例输出:

file:

grandchild grandparent

Tom Alice

Tom Jesse

Jone Alice

Jone Jesse

Tom Mary

Tom Ben

Jone Mary

Jone Ben

Philip Alice

philip Jesse

Mark Alice

Mark Jesse


package hadoop;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class stlink
{private static boolean flag = true;public static class stlinkMapper extends Mapper<Object, Text, Text, Text>{@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException{String[] names = value.toString().trim().split("\t");if (names[0].compareTo("child") != 0){				context.write(new Text(names[0]), new Text("parent:"+names[1]));context.write(new Text(names[1]), new Text("child:"+names[0]));}}}public static class stlinkReducer extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException{if (flag){context.write(new Text("grandchild"), new Text("grandparent"));flag = false;}List<String> children = new ArrayList<String>();List<String> parents = new ArrayList<String>();for(Text t : values){String[] kv = t.toString().split(":");if (kv[0].compareTo("child") == 0)children.add(kv[1]);elseparents.add(kv[1]);}for(String c : children)for(String p : parents)context.write(new Text(c), new Text(p));}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job stlinkJob = Job.getInstance(conf);stlinkJob.setJarByClass(stlink.class);stlinkJob.setJobName("single table link");stlinkJob.setMapperClass(stlinkMapper.class);stlinkJob.setReducerClass(stlinkReducer.class);stlinkJob.setOutputKeyClass(Text.class);stlinkJob.setOutputValueClass(Text.class);stlinkJob.setMapOutputKeyClass(Text.class);stlinkJob.setMapOutputValueClass(Text.class);Path input = new Path("/home/jinzhao/dataset/input");Path output = new Path("/home/jinzhao/dataset/output");FileInputFormat.setInputPaths(stlinkJob, input);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(stlinkJob, output);stlinkJob.waitForCompletion(true);}
}


5、多表关联

样例输入:

factory:

factoryname addressed

Beijing Red Star 1

Shenzhen Thunder 3

Guangzhou Honda 2

Beijing Rising 1

Guangzhou Development Bank 2

Tencent 3

Bank of Beijing 1

address:

1 Beijing

2 Guangzhou

3 Shenzhen

4 Xian

package hadoop;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class mtlink
{private static boolean flag = true;public static class mtlinkMapper extends Mapper<Object, Text, Text, Text>{@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException{String str = value.toString();if (str.contains("factoryname") || str.contains("addressname"))return;String[] infos = str.trim().split(" ");if (infos[0].charAt(0) >= '0' && infos[0].charAt(0) <= '9')context.write(new Text(infos[0]), new Text("right:" + strCombine(infos, "right")));elsecontext.write(new Text(infos[infos.length - 1]), new Text("left:" + strCombine(infos, "left")));}private String strCombine(String[] strs, String direction){StringBuilder sb = new StringBuilder();if (direction.compareTo("right") == 0)for(int i = 1; i < strs.length; ++i)sb.append(strs[i] + " ");elsefor (int i = 0; i < strs.length - 1; ++i)sb.append(strs[i] + " ");return sb.toString().trim();}}public static class mtlinkReducer extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException{if (flag){context.write(new Text("factoryname"), new Text("adressname"));flag = false;}List<String> companies = new ArrayList<String>();String place = "huoxing";for (Text t : values){String[] kv = t.toString().trim().split(":");if (kv[0].compareTo("right") == 0)place = kv[1];elsecompanies.add(kv[1]);}for (String s : companies)context.write(new Text(s), new Text(place));}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job mtlinkJob = Job.getInstance(conf);mtlinkJob.setJarByClass(mtlink.class);mtlinkJob.setJobName("multiple tables link");mtlinkJob.setMapperClass(mtlinkMapper.class);mtlinkJob.setReducerClass(mtlinkReducer.class);mtlinkJob.setOutputKeyClass(Text.class);mtlinkJob.setOutputValueClass(Text.class);Path input = new Path("/home/jinzhao/dataset/input");Path output = new Path("/home/jinzhao/dataset/output");FileInputFormat.setInputPaths(mtlinkJob, input);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(mtlinkJob, output);mtlinkJob.waitForCompletion(true);}
}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/387145.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ios高级开发之多线程(一)

1.概念&#xff1a; 多线程&#xff08;multithreading&#xff09;到底是什么呢&#xff0c;它是指在软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件的支持&#xff0c;而能够在同一时间执行多个线程&#xff0c;进而提升整体处理性能。在一个程序…

v-if的简单应用

<span v-if"item.status0"> 项目状态&#xff1a;未提交 </span> <span v-if"item.status1"> 项目状态&#xff1a;审批中 </span> <span v-if"item.status2"> 项目状态&#xff1a;审批退回 </span> <s…

05 MapReduce应用案例02

6、統計每個月份中&#xff0c;最高的三個溫度。 輸入格式&#xff1a;年月日 空格 時分秒 TAB 溫度 inputfile: 1949-10-01 14:21:02 34c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1951-12-01 12:21:02 23c 1950-10-…

05 MapReduce应用案例03

8、PageRank Page-rank源于Google&#xff0c;用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。 Page-rank实现了将链接价值概念作为排名因素。 算法原理 – 入链 投票 • Page-rank 让链接来“ 投票 “ ,到一个页面的超链接相当于对该页投一票。 – 入…

利用微信的weui框架上传、预览和删除图片

jQuery WeUI 是专为微信公众账号开发而设计的一个框架&#xff0c;jQuery WeUI的官网&#xff1a;http://jqweui.com/ 需求&#xff1a;需要在微信公众号网页添加上传图片功能 技术选型&#xff1a;实现上传图片功能可选百度的WebUploader、饿了么的Element和微信的jQuery WeUI…

【转】Java Socket编程基础及深入讲解

原文&#xff1a;https://www.cnblogs.com/yiwangzhibujian/p/7107785.html#q2.3.3 Socket是Java网络编程的基础&#xff0c;了解还是有好处的&#xff0c; 这篇文章主要讲解Socket的基础编程。Socket用在哪呢&#xff0c;主要用在进程间&#xff0c;网络间通信。本篇比较长&am…

使用 vue-i18n 切换中英文

使用 vue-i18n 切换中英文vue-i18n 仓库地址&#xff1a;https://github.com/kazupon/vue-i18n兼容性&#xff1a;支持 Vue.js 2.x 以上版本安装方法&#xff1a;&#xff08;此处只演示 npm&#xff09;npm install vue-i18n使用方法&#xff1a;1、在 main.js 中引入 vue-i18…

ZooKeeper数据模型

Zookeeper的数据模型 层次化的目录结构&#xff0c;命名符合常规文件系统规范&#xff08;Linux&#xff09; 每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识 节点Znode可以包含数据和子节点(即子目录)&#xff0c;但是EPHEMERAL类型的节点不能有子节点 Znod…

堆叠条形图

堆叠条形图 import pandas as pd import numpy as np import matplotlib.pyplot as plt import matplotlib as mpl import matplotlib.dates as mdates#解决能显示中文 mpl.rcParams[font.sans-serif][SimHei] #指定默认字体 SimHei为黑体 mpl.rcParams[axes.unicode_minus]Fal…

spring boot 服务器常用

ps aux|grep tgcwll /opt/nginx/html sudo cp -r /tmp/tgcw/dist/* /opt/nginx/html/design sudo cp -r /tmp/tgcw/dist/* /opt/nginx/html springboot 启动nohup java -jar tgcw-service-usermanagement-0.0.1-SNAPSHOT.jar --spring.profiles.activedemo > /dev/null 2&g…

PHP数组 转 对象/对象 转 数组

/*** 数组 转 对象** param array $arr 数组* return object*/ function array_to_object($arr) {if (gettype($arr) ! array) {return;}foreach ($arr as $k > $v) {if (gettype($v) array || getType($v) object) {$arr[$k] (object)array_to_object($v);}}return (obj…

ZooKeeper編程01--RMI服務的多服務器管理

服務器端與客戶端都要用到&#xff1a; public interface ZkInfo {String ZK_CONNECTION_STRING "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";int ZK_SESSION_TIMEOUT 5000;String ZK_REGISTRY_PATH "/registry";String ZK_PROVIDER_…

org.activiti.engine.ActivitiOptimisticLockingException updated by another transaction concurrently

org.activiti.engine.ActivitiOptimisticLockingException: Task[id5905010, name审核(市场部)] was updated by another transaction concurrentlyat org.activiti.engine.impl.db.DbSqlSession.flushUpdates(DbSqlSession.java:872)at org.activiti.engine.impl.db.DbSqlSess…

DataTable不能通过已删除的行访问该行的信息解决方法

使用dt.Rows[0]["name", DataRowVersion.Original]可以获取转载于:https://www.cnblogs.com/heyiping/p/10616640.html

ZooKeeper編程02--多線程的分佈式鎖

面向過程版&#xff1a; package distributedLockProcess;import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zoo…

01 Python变量和数据类型

Python变量和数据类型 1 数据类型 计算机&#xff0c;顾名思义就是可以做数学计算的机器&#xff0c;因此&#xff0c;计算机程序理所当然也可以处理各种数值。 但是&#xff0c;计算机能处理的远不止数值&#xff0c;还可以处理文本、图形、音频、视频、网页等各种各样的数…

初识Python-1

1&#xff0c;计算机基础。 2&#xff0c;python历史。 宏观上&#xff1a;python2 与 python3 区别&#xff1a; python2 源码不标准&#xff0c;混乱&#xff0c;重复代码太多&#xff0c; python3 统一 标准&#xff0c;去除重复代码。 3&#xff0c;python的环境。 编译型&…

02 List、Tuple、Dict、Set

List 线性表 创建List&#xff1a; >>> classmates [Michael, Bob, Tracy] >>> L [Michael, 100, True] #可以在list中包含各种类型的数据 >>> empty_list [] #空List 按索引访问List&#xff1a; >>> print L[0] #索引从0开始…

Jenkins的一些代码

pipeline {agent anyenvironment { def ITEMNAME "erp"def DESTPATH "/home/ops/testpipe"def codePATH"/var/lib/jenkins/workspace/test_pipeline"}stages { stage(代码拉取){steps {echo "checkout from ${ITEMNAME}"git url:…

利用layui前端框架实现对不同文件夹的多文件上传

利用layui前端框架实现对不同文件夹的多文件上传 问题场景&#xff1a; 普通的input标签实现多文件上传时&#xff0c;只能对同一个文件夹下的多个文件进行上传&#xff0c;如果要同时上传两个或多个文件夹下的文件&#xff0c;是无法实现的。这篇文章就是利用layui中的插件&am…