大数据技术-Hadoop(三)Mapreduce的介绍与使用

目录

一、概念和定义

二、WordCount案例

1、WordCountMapper

2、WordCountReducer

3、WordCountDriver

三、序列化

1、为什么序列化

2、为什么不用Java的序列化

3、Hadoop序列化特点:

4、自定义bean对象实现序列化接口(Writable)

4.1、bean

4.2、FlowBeanMapper

4.3、FlowReducer

4.4、FlowDriver

四、MapReduce框架原理

1、mapreduce流程

 2、Shuffle机制

3、Partion分区

3.1、 默认分区方法

3.2、自定义分区

4、WritableComparable

5、Combiner合并

6、自定义FileOutputFormat

7、Reduce Join

8、数据清洗 ETL

五、数据压缩

1、参数说明

2、代码示例

六、完整代码

七、参考


一、概念和定义

        请看 https://blog.csdn.net/weixin_48935611/article/details/137856999,这个文章概括的很全面,本文主要展示MapReduce的使用。

二、WordCount案例

1、WordCountMapper

package com.xiaojie.hadoop.mapreduce.wordcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: TODO* @date 2024/12/27 9:00*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {Text kOut = new Text();IntWritable vOut = new IntWritable(1);/*** @param key     偏移量* @param value   文本值* @param context 上下文* @description:* @return: void* @author 熟透的蜗牛* @date: 2024/12/27 9:01*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//        hello world
//        hello mapreduce
//        hello haddop
//        hadoop
//        java
//        mysql
//        mysql orcale/**这里输出的结果为(hello,1)(world,1)(hello,1) (mapreduce,1)(hello,1)......*///获取一行,输入的内容String line = value.toString();//分隔String[] words = line.split(" ");for (String word : words) {kOut.set(word);//kout 即为单词 vout 单词出现的次数context.write(kOut, vOut);}}
}

2、WordCountReducer

package com.xiaojie.hadoop.mapreduce.wordcount;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: reduce把map的输出当作输入* @date 2024/12/27 9:17*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {int sum;IntWritable v = new IntWritable();/*** @param key     map 输出的key kOut* @param values  map输出的value Vout* @param context* @description:* @return: void* @author 熟透的蜗牛* @date: 2024/12/27 9:22*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {//累加求和,合并map传递过来的值sum = 0;for (IntWritable val : values) {sum += val.get();}//输出结果v.set(sum);context.write(key, v);}
}

3、WordCountDriver

package com.xiaojie.hadoop.mapreduce.wordcount;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: TODO* @date 2024/12/27 9:23*/
public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 1 获取配置信息以及获取job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2 关联本Driver程序的jarjob.setJarByClass(WordCountDriver.class);// 3 关联Mapper和Reducer的jarjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 4 设置Mapper输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5 设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\hello.txt"));FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\wordcount"));// 7 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

三、序列化

1、为什么序列化

一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

2、为什么不用Java的序列化

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。

3、Hadoop序列化特点:

  • 1)紧凑高效使用存储空间。
  • 2)快速:读写数据的额外开销小。
  • (3)互操作:支持多语言的交互

4、自定义bean对象实现序列化接口(Writable)

4.1、bean

package com.xiaojie.hadoop.mapreduce.flow;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: 定义一个bean 实现 writable接口* @date 2024/12/27 10:25*/
public class FlowBean implements Writable {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //总流量//创建无参构造函数public FlowBean() {}//创建gettter setter 方法public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}//重写setSumFlow 方法public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}//重写序列化方法,输出和输入的顺序要保持一致@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readLong();this.downFlow = in.readLong();this.sumFlow = in.readLong();}//结果显示在文本中,重写tostring 方法,@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}
}

4.2、FlowBeanMapper

package com.xiaojie.hadoop.mapreduce.flow;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: 流量mapper* @date 2024/12/27 10:32*/
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean> {//定义一个输出的keyprivate Text outKey = new Text();//定义输出的value 即 FlowBeanprivate FlowBean outValue = new FlowBean();/*** @param key     map的输入值偏移量* @param value   map 的输入value* @param context* @description:* @return: void* @author 熟透的蜗牛* @date: 2024/12/27 10:35*/@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {//获取一行数据String line = value.toString();//切割数据String[] split = line.split("\t");//抓取我们需要的数据:手机号,上行流量,下行流量String phone = split[1];  //手机号//上行流量 ,由于有的数据没有,这里从后面取值Long upFlow = Long.parseLong(split[split.length - 3]);Long downFlow = Long.parseLong(split[split.length - 2]);//封装输出结果//设置输出的keyoutKey.set(phone);//设置输出的valueoutValue.setUpFlow(upFlow);outValue.setDownFlow(downFlow);outValue.setSumFlow();//写出outK outVcontext.write(outKey, outValue);}
}

4.3、FlowReducer

package com.xiaojie.hadoop.mapreduce.flow;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: 定义流量输出的reduce* @date 2024/12/27 10:46*/
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {private FlowBean finalOutV = new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {long totalUp = 0;long totalDown = 0;//遍历values,将其中的上行流量,下行流量分别累加for (FlowBean bean : values) {totalUp += bean.getUpFlow();totalUp += bean.getDownFlow();}//封装输出结果finalOutV.setUpFlow(totalUp);finalOutV.setDownFlow(totalDown);finalOutV.setSumFlow();//输出结果context.write(key, finalOutV);}
}

4.4、FlowDriver

package com.xiaojie.hadoop.mapreduce.flow;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: 驱动* @date 2024/12/27 10:55*/
public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//获取job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);//设置jarjob.setJarByClass(FlowDriver.class);//设置manpper 和reducerjob.setMapperClass(FlowBeanMapper.class);job.setReducerClass(FlowReducer.class);//设置map输出kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//设置最终输出结果kvjob.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//设置输入输出路径FileInputFormat.setInputPaths(job, new Path("d://hadoop//phone.txt"));FileOutputFormat.setOutputPath(job, new Path("d://hadoop//phone"));//提交任务boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

四、MapReduce框架原理

1、mapreduce流程

直观的效果,图片来自 https://blog.csdn.net/weixin_48935611/article/details/137856999

 2、Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle

(1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中

(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

(3)多个溢出文件会被合并成大的溢出文件

(4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序

(5)ReduceTask根据自己的分区号,去各个MapTask机器上拉取相应的结果分区数据

(6)ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)

(7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

注意:

(1)Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M。

3、Partion分区

3.1、 默认分区方法

public int getPartition(K key, V value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}

分区个数小于1的时候,就不会再执行上面的分区计算

3.2、自定义分区

package com.xiaojie.hadoop.mapreduce.partitioner;import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*** @author 熟透的蜗牛* @version 1.0* @description: 自定义分区* @date 2024/12/29 15:52*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {/*** @param text          键值* @param flowBean      值* @param numPartitions 返回的分区数* @description: 分区逻辑, 手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中* @return: int* @author 熟透的蜗牛* @date: 2024/12/29 15:54*/@Overridepublic int getPartition(Text text, FlowBean flowBean, int numPartitions) {int partition;if (StringUtils.isNotBlank(text.toString())) {if (text.toString().startsWith("136")) {partition = 0;} else if (text.toString().startsWith("137")) {partition = 1;} else if (text.toString().startsWith("138")) {partition = 2;} else if (text.toString().startsWith("139")) {partition = 3;} else {partition = 4;}} else {partition = 4;}return partition;}
}
package com.xiaojie.hadoop.mapreduce.partitioner;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: 驱动* @date 2024/12/27 10:55*/
public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {//获取job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);//设置jarjob.setJarByClass(FlowDriver.class);//设置manpper 和reducerjob.setMapperClass(FlowBeanMapper.class);job.setReducerClass(FlowReducer.class);//设置map输出kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//设置最终输出结果kvjob.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//设施任务数 ,这里设置的要和分区个数一致,如果任务数>分区数则输出文件会有多个为空的文件,如果任务数>1并且<分区数,会有数据无法处理发生异常,// 如果任务数为1 ,只会产生一个文件,分区号必须从0开始,逐渐累加job.setNumReduceTasks(5);//指定自定义分区类job.setPartitionerClass(ProvincePartitioner.class);//设置输入输出路径FileInputFormat.setInputPaths(job, new Path("d://hadoop//phone.txt"));FileOutputFormat.setOutputPath(job, new Path("d://hadoop//phone33"));//提交任务boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

4、WritableComparable

  @Overridepublic int compareTo(FlowBean o) {//按照总流量比较,倒序排列if (this.sumFlow > o.sumFlow) {return -1;} else if (this.sumFlow < o.sumFlow) {return 1;} else {//如果总流量一样,按照上行流量排if (this.upFlow > o.upFlow) {return -1;} else if (this.upFlow < o.upFlow) {return 1;}return 0;}}

5、Combiner合并

package com.xiaojie.hadoop.mapreduce.combiner;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: TODO* @date 2024/12/29 18:50*/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {IntWritable outV= new IntWritable(0);@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum+=val.get();}outV.set(sum);context.write(key, outV);}
}
package com.xiaojie.hadoop.mapreduce.combiner;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: TODO* @date 2024/12/27 9:23*/
public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 1 获取配置信息以及获取job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2 关联本Driver程序的jarjob.setJarByClass(WordCountDriver.class);// 3 关联Mapper和Reducer的jarjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 4 设置Mapper输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5 设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置Combinerjob.setCombinerClass(WordCountCombiner.class);// 6 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\hello.txt"));FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\wordcount13"));// 7 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

6、自定义FileOutputFormat

package com.xiaojie.hadoop.mapreduce.outputformat;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: TODO* @date 2024/12/29 20:29*/
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {//创建一个自定义的RecordWriter返回LogRecordWriter logRecordWriter = new LogRecordWriter(job);return logRecordWriter;}
}
package com.xiaojie.hadoop.mapreduce.outputformat;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: TODO* @date 2024/12/29 20:31*/
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {private FSDataOutputStream fileOut;private FSDataOutputStream otherOut;public LogRecordWriter(TaskAttemptContext job) {try {//获取文件系统对象FileSystem fs = FileSystem.get(job.getConfiguration());//用文件系统对象创建两个输出流对应不同的目录fileOut = fs.create(new Path("d:/hadoop/file.log"));otherOut = fs.create(new Path("d:/hadoop/other.log"));} catch (IOException e) {e.printStackTrace();}}@Overridepublic void write(Text key, NullWritable value) throws IOException, InterruptedException {String log = key.toString();//根据一行的log数据是否包含atguigu,判断两条输出流输出的内容if (log.contains("atguigu")) {fileOut.writeBytes(log + "\n");} else {otherOut.writeBytes(log + "\n");}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {//关流IOUtils.closeStream(fileOut);IOUtils.closeStream(otherOut);}
}

7、Reduce Join

package com.xiaojie.hadoop.mapreduce.join2;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;public class MapJoinDriver {public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {// 1 获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 设置加载jar包路径job.setJarByClass(MapJoinDriver.class);// 3 关联mapperjob.setMapperClass(MapJoinMapper.class);// 4 设置Map输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);// 5 设置最终输出KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 加载缓存数据job.addCacheFile(new URI("file:///D:/hadoop/pd.txt"));// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0job.setNumReduceTasks(0);// 6 设置输入输出路径FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\order"));FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output2222"));// 7 提交boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}
package com.xiaojie.hadoop.mapreduce.join2;import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {private Map<String, String> pdMap = new HashMap<>();private Text text = new Text();//任务开始前将pd数据缓存进pdMap@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//通过缓存文件得到小表数据pd.txtURI[] cacheFiles = context.getCacheFiles();Path path = new Path(cacheFiles[0]);//获取文件系统对象,并开流FileSystem fs = FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(path);//通过包装流转换为reader,方便按行读取BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));//逐行读取,按行处理String line;while (StringUtils.isNotEmpty(line = reader.readLine())) {//切割一行    //01	小米String[] split = line.split("\t");pdMap.put(split[0], split[1]);}//关流IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {//读取大表数据//1001	01	1String[] fields = value.toString().split("\t");//通过大表每行数据的pid,去pdMap里面取出pnameString pname = pdMap.get(fields[1]);//将大表每行数据的pid替换为pnametext.set(fields[0] + "\t" + pname + "\t" + fields[2]);//写出context.write(text,NullWritable.get());}
}

8、数据清洗 ETL

package com.xiaojie.hadoop.mapreduce.etl;import com.xiaojie.hadoop.mapreduce.outputformat.LogDriver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WebLogDriver {public static void main(String[] args) throws Exception {// 输入输出路径需要根据自己电脑上实际的输入输出路径设置args = new String[]{"D:\\hadoop\\weblog", "D:\\hadoop\\outlog"};// 1 获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 加载jar包job.setJarByClass(LogDriver.class);// 3 关联mapjob.setMapperClass(WebLogMapper.class);// 4 设置最终输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 设置reducetask个数为0job.setNumReduceTasks(0);// 5 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 6 提交boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}
package com.xiaojie.hadoop.mapreduce.etl;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: 数据清洗,清洗掉不符合格式的数据* @date 2024/12/29 21:37*/
public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1 获取1行数据String line = value.toString();// 2 解析日志boolean result = parseLog(line, context);// 3 日志不合法退出if (!result) {return;}// 4 日志合法就直接写出context.write(value, NullWritable.get());}// 2 封装解析日志的方法private boolean parseLog(String line, Context context) {// 1 截取String[] fields = line.split(" ");// 2 日志长度大于11的为合法if (fields.length > 11) {return true;} else {return false;}}
}

五、数据压缩

1、参数说明

参数

默认值

阶段

建议

io.compression.codecs

(在core-site.xml中配置)

无,这个需要在命令行输入hadoop checknative查看

输入压缩

Hadoop使用文件扩展名判断是否支持某种编解码器

mapreduce.map.output.compress(在mapred-site.xml中配置)

false

mapper输出

这个参数设为true启用压缩

mapreduce.map.output.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec

mapper输出

企业多使用LZO或Snappy编解码器在此阶段压缩数据

mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)

false

reducer输出

这个参数设为true启用压缩

mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)

org.apache.hadoop.io.compress.DefaultCodec

reducer输出

使用标准工具或者编解码器,如gzip和bzip2

2、代码示例

package com.xiaojie.hadoop.mapreduce.zip;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author 熟透的蜗牛* @version 1.0* @description: TODO* @date 2024/12/27 9:23*/
public class WordCountDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 1 获取配置信息以及获取job对象Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2 关联本Driver程序的jarjob.setJarByClass(WordCountDriver.class);// 3 关联Mapper和Reducer的jarjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 4 设置Mapper输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5 设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置压缩格式FileOutputFormat.setCompressOutput(job, true);// 设置压缩的方式FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);// 6 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\hello.txt"));FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\wordcount111"));// 7 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

六、完整代码

spring-boot: Springboot整合redis、消息中间件等相关代码 - Gitee.com

七、参考

https://blog.csdn.net/weixin_48935611/article/details/137856999

参考内容来自尚硅谷大数据学习

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/891059.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据仓库】SparkSQL数仓实践

文章目录 集成hive metastoreSQL测试spark-sql 语法SQL执行流程两种数仓架构的选择hive on spark数仓配置经验 spark-sql没有元数据管理功能&#xff0c;只有sql 到RDD的解释翻译功能&#xff0c;所以需要和hive的metastore服务集成在一起使用。 集成hive metastore 在spark安…

基本算法——回归

本节将通过分析能源效率数据集&#xff08;Tsanas和Xifara&#xff0c;2012&#xff09;学习基本的回归算法。我们将基 于建筑的结构特点&#xff08;比如表面、墙体与屋顶面积、高度、紧凑度&#xff09;研究它们的加热与冷却负载要 求。研究者使用一个模拟器设计了12种不…

V-Express - 一款针对人像视频生成的开源软件

V-Express是腾讯AI Lab开发的一款针对人像视频生成的开源软件。它旨在通过条件性丢弃&#xff08;Conditional Dropout&#xff09;技术&#xff0c;实现渐进式训练&#xff0c;以改善使用单一图像生成人像视频时的控制信号平衡问题。 在生成过程中&#xff0c;不同的控制信号&…

Java与SQL Server数据库连接的实践与要点

本文还有配套的精品资源&#xff0c;点击获取 简介&#xff1a;Java和SQL Server数据库交互是企业级应用开发中的重要环节。本文详细探讨了使用Java通过JDBC连接到SQL Server数据库的过程&#xff0c;包括加载驱动、建立连接、执行SQL语句、处理异常、资源管理、事务处理和连…

学习记录—正则表达式-基本语法

正则表达式简介-《菜鸟教程》 正则表达式是一种用于匹配和操作文本的强大工具&#xff0c;它是由一系列字符和特殊字符组成的模式&#xff0c;用于描述要匹配的文本模式。 正则表达式可以在文本中查找、替换、提取和验证特定的模式。 本期内容将介绍普通字符&#xff0c;特殊…

企业安装加密软件有什么好处?

加密软件为企业的安全提供了很多便利&#xff0c;从以下几点我们看看比较重要的几个优点&#xff1a; 1、数据保护&#xff1a;企业通常拥有大量的商业机密、客户数据、技术文档等敏感信息。加密软件可以对这些信息进行加密处理&#xff0c;防止未经授权的人员访问。即使数据被…

京东供应链创新与实践:应用数据驱动的库存选品和调拨算法提升履约效率

2024 年度总结系列 2024 年 10 月&#xff0c;京东零售供应链技术团队凭借其在库存选品与调拨技术上的创新与实践&#xff0c;荣获运筹与管理学领域的国际顶级奖项 Daniel H. Wagner Prize。本文为您介绍获奖背后的供应链技术创新和落地应用。 00 摘要 在电商行业中&#x…

大数据技术-Hadoop(二)HDFS的介绍与使用

目录 1、HDFS简介 1.1 什么是HDFS 1.2 HDFS的优点 1.3、HDFS的架构 1.3.1、 NameNode 1.3.2、 NameNode的职责 1.3.3、DataNode 1.3.4、 DataNode的职责 1.3.5、Secondary NameNode 1.3.6、Secondary NameNode的职责 2、HDFS的工作原理 2.1、文件存储 2.2 、数据写…

在 C# 中优化 JPEG 压缩级别和文件大小

此示例可让您检查不同 JPEG 压缩级别的图像质量。使用文件菜单的打开命令加载图像文件。然后使用“JPEG 压缩指数 (CI)”组合框选择压缩级别。程序将图像保存到具有该压缩级别的临时文件中&#xff0c;并显示生成的图像和文件大小。 该程序的关键是以下SaveJpg方法&#xff0c;…

Pandas02

Pandas01: Pandas01 文章目录 内容回顾1 数据的读取和保存1.1 读写Excel文件1.2 读写CSV1.3 读写Mysql 2 DataFrame 数据查询2.1 筛选多列数据2.2 loc 和 iloc2.3 query查询方法和isin 方法 3 DataFrame增 删 改数据3.1 增加一列数据3.2 删除一行/一列数据3.3 数据去重3.4 数据…

Flink定时器

flink的定时器都是基于事件时间&#xff08;event time&#xff09;或事件处理时间&#xff08;processing time&#xff09;的变化来触发响应的。对一部分新手玩家来说&#xff0c;可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解&#xff0c;防止下面懵逼。…

Docker中的分层(Layer)

docker中有分层的概念&#xff0c;如下图所示 上面是容器层&#xff08;Container layer&#xff09;&#xff0c;下面是镜像层&#xff08;Image layers&#xff09;。 镜像层的内容是静态的&#xff0c;读和写的操作&#xff0c;都是在容器层发生&#xff0c;专门为容器的读…

RoboMIND:多体现基准 机器人操纵的智能规范数据

我们介绍了 RoboMIND&#xff0c;这是机器人操纵的多体现智能规范数据的基准&#xff0c;包括 4 个实施例、279 个不同任务和 61 个不同对象类别的 55k 真实世界演示轨迹。 工业机器人企业 埃斯顿自动化 | 埃夫特机器人 | 节卡机器人 | 珞石机器人 | 法奥机器人 | 非夕科技 | C…

python报错ModuleNotFoundError: No module named ‘visdom‘

在用虚拟环境跑深度学习代码时&#xff0c;新建的环境一般会缺少一些库&#xff0c;而一般解决的方法就是直接conda install&#xff0c;但是我在conda install visdom之后&#xff0c;安装是没有任何报错的&#xff0c;conda list里面也有visdom的信息&#xff0c;但是再运行代…

C语言性能优化:从基础到高级的全面指南

引言 C 语言以其高效、灵活和功能强大而著称&#xff0c;被广泛应用于系统编程、嵌入式开发、游戏开发等领域。然而&#xff0c;要写出高性能的 C 语言代码&#xff0c;需要对 C 语言的特性和底层硬件有深入的了解。本文将详细介绍 C 语言性能优化的背后技术&#xff0c;并通过…

go多版本管理工具g win安装配置

go多版本管理工具g 基本介绍仓库安装配置配置环境配置系统变量配置path变量测试使用配置完环境变量之后&#xff0c;打开终端进行测试使用查看 g 的环境变量配置&#xff0c;g env 为环境变量配置&#xff0c;g -v为当前版本信息查看可下载列表下载安装指定版本go&#xff0c;并…

PlasmidFinder:质粒复制子的鉴定和分型

质粒&#xff08;Plasmid&#xff09;是一种细菌染色体外的线性或环状DNA分子&#xff0c;也是一种重要的遗传元素&#xff0c;它们具有自主复制能力&#xff0c;可以在细菌之间传播&#xff0c;并携带多种重要的基因(如耐药基因与毒力基因等)功能。根据质粒传播的特性&#xf…

细说STM32F407单片机通过IIC读写EEPROM 24C02

目录 一、操作说明 二、工程配置 1、时钟、DEBUG、GPIO、USART6、NVIC、Code Generator 2、 IIC2 &#xff08;1&#xff09;Master Features组&#xff0c;主设备参数 &#xff08;2&#xff09;Slave Features组&#xff0c;从设备参数 三、软件设计 1、KELED 2、E…

神经网络-Inception

Inception网络是由Google开发的一种深度卷积神经网络架构&#xff0c;旨在解决计算机视觉领域中的图像分类和物体识别任务。 Inception网络最初在2014年被提出&#xff0c;并在ImageNet图像分类挑战赛上取得了很好的结果。其设计灵感来自于模块化的思想&#xff0c;将不同尺度…

PyTorch Instance Normalization介绍

Instance Normalization(实例归一化) 是一种标准化技术,与 Batch Normalization 类似,但它对每个样本独立地对每个通道进行归一化,而不依赖于小批量数据的统计信息。这使得它非常适合小批量训练任务以及图像生成任务(如风格迁移)。 Instance Normalization 的原理 对每…