自定义InputFormat合并小文件
案例需求
无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。
案例分析
小文件的优化无非以下几种方式:
- 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
- 在业务处理之前,在HDFS上使用mapreduce程序对小文件进行合并
- 在mapreduce处理时,可采用combineInputFormat提高效率
案例实现
本节实现的是上述第二种方式
- 首先继承FileInputFormat类自定义MyInputFormat方法,在自定义的MyInputFormat类中需重写RecordReader方法。
- 然后自定义MyRecordReader继承RecordReader,在自定义的MyRecordReader类中需重新initialize(初始化方法)、nextKeyValue(该方法用于获取 <k1,v1>,读取源数据转换为<k1,v1>)、getCurrentKey(返回k1)、getCurrentValue(返回v1)、getProgress(获取文件读取进度)、close(释放资源)等方法。
- 然后在MyInputFormat类中实例化自定义的MyRecordReader类并传递源数据,设置文件是否允许被切割(本案例不允许被切割)。
- Mapper代码按照正常逻辑实现,可以省略Reduce代码,最后主类JobMain中设置文件的读取类为自定义的MyInputFormat。
程序的核心机制:
- 自定义一个InputFormat
- 改写RecordReader,实现一次读取一个完整文件封装为KV
- 在输出时使用SequenceFileOutPutFormat输出合并文件
// 第一步package myinput_format;import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {private Configuration configuration = null;private FileSplit fileSplit = null;// 定义标志位判断文件是否处理完成 ---false表示文件没有被读取完private boolean processed = false;private BytesWritable bytesWritable = new BytesWritable();private FileSystem fileSystem = null;private FSDataInputStream inputStream = null;// 初始化@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {// inputSplit----文件切片包含文件名信息fileSplit = (FileSplit)inputSplit;// 获取Configuration 对象 ---向上提取(以便nextKeyValue函数使用)configuration = taskAttemptContext.getConfiguration();}// 该方法用于获取 <k1,v1>,读取源数据转换为<k1,v1>/** k1-------->NullWritable* v1-------->BytesWritable* */@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {// 判断文件是否读取完成if (!processed){// 1.获取源文件字节输入流// 1.1 获取源文件的文件系统(FileSystem)fileSystem = FileSystem.get(configuration);// 1.2 通过(FileSystem)获取文件字节输入流 ---源文件路径inputStream = fileSystem.open(fileSplit.getPath());// 2.获取源文件数据到普通的字节数组(byte[])long a = fileSplit.getLength();byte[] bytes = new byte[(int) a];
// byte[] bytes = new byte[(int) fileSplit.getLength() ];// 需要容纳下小文件字节大小,强行装换为int类型//2.1源文件数据读取到上述自定义的字节数组IOUtils.readFully(inputStream,bytes,0,(int) fileSplit.getLength());// 3.普通的字节数组转换封装到BytesWritable,得到 v1bytesWritable.set(bytes,0,(int) fileSplit.getLength());processed = true;return true;}return false;}//返回 k1@Overridepublic NullWritable getCurrentKey() throws IOException, InterruptedException {return NullWritable.get();}//返回 v1@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return bytesWritable;}// 获取文件读取进度@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}// 进行资源释放@Overridepublic void close() throws IOException {// 关闭输入流inputStream.close();fileSystem.close();}
}
// 第二步
package myinput_format;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import java.io.IOException;public class MyInputFormat extends FileInputFormat<NullWritable, BytesWritable> {// RecordReader抽象类返回子类,需要自定义LineRecordReader类@Overridepublic RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {// 1.创建自定义RecordReader对象MyRecordReader myRecordReader = new MyRecordReader();// 2.将源数据InputSplit和TaskAttemptContext传递给自定义RecordReader对象myRecordReader.initialize(inputSplit,taskAttemptContext);return myRecordReader;}// 设置文件是否可以被切割(功能实现小文件合并设置为不可被切割)@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}
}
// 第三步
package myinput_format;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable> {@Overrideprotected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {// <k1,v1>转为<k2,v2>// 1.获取文件名作为k2// 根据 context 获取文件切片--其中包含文件名FileSplit fileSplit = (FileSplit) context.getInputSplit();String fileName = fileSplit.getPath().getName();// 2.v2就是v1写入上下文context.write(new Text(fileName),value);}
}
// 第四步
package myinput_format;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.net.URI;public class JobMain extends Configured implements Tool {//该方法用于指定一个job任务@Overridepublic int run(String[] strings) throws Exception {// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称Job job = Job.getInstance(super.getConf(), "input_format");// 打包运行出错添加job.setJarByClass(JobMain.class);// 2.配置 job任务对象(八个步骤)// 2.1 读取文件 ---指定读取类job.setInputFormatClass(MyInputFormat.class);// 指点源文件路径MyInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/myinput_format"));// 2.2 进入指定map阶段处理方式和数据类型// 设置map阶段用的类job.setMapperClass(SequenceFileMapper.class);// 设置Map阶段K2的类型 --- 单词(字符串)job.setMapOutputKeyClass(Text.class);// 设置Map阶段V2的类型 ---job.setMapOutputValueClass(BytesWritable.class);// 2.3(4,5,6) 进入Shuffle阶段 --先采用默认方式处理// 2.7 指定Reduce阶段的数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);// 2.8 设置输出类型 -- 保存在二进制文件中job.setOutputFormatClass(SequenceFileOutputFormat.class);// 设置输出路径
// TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:9000/wordcount_out"));// 判断目标目录是否存在,存在则删除Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/myinput_format_out");SequenceFileOutputFormat.setOutputPath(job,path);// 获取hdfs文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());// --本地测试--
// FileSystem fileSystem = FileSystem.get(new URI("file:///D:\\output"), new Configuration());// 判断目录是否存在boolean exists = fileSystem.exists(path);if (exists){// 删除目标目录fileSystem.delete(path,true);}// 等待任务结束boolean bl = job.waitForCompletion(true);return bl ? 0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// 启动 job任务 --记录任务执行状态 0表示成功int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}
}
自定义outputFormat格式输出文件
案例需求
现在有一些订单的评论数据,需求,将订单的好评与差评进行区分开来,将最终的数据分开到不同的文件夹下面去,数据内容参见资料文件夹,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评
案例分析
程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现。
案例实现
- 首先继承FileOutputFormat类自定义MyOutputFormat类,在自定义的MyOutputFormat类中需重写RecordWriter方法。
- 然后自定义MyRecordWriter继承RecordWriter,在自定义的MyRecordWriter类中需重新write(具体输出数据的方法)、close(释放资源)等方法。
- 然后在MyOutputFormat类中实例化自定义的MyRecotdWriter类(有参构造传递)传递源数据
- Mapper代码按照正常逻辑实现,可以省略Reduce代码,最后主类JobMain中设置文件的输出类为自定义的MyOutputFormat。
// 第一步
package myoutput_format;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class MyRecordWriter extends RecordWriter<Text,NullWritable> {private FSDataOutputStream goodCommentsOutputStream;private FSDataOutputStream badCommentsOutputStream;public MyRecordWriter() {}public MyRecordWriter(FSDataOutputStream goodCommentsOutputStream, FSDataOutputStream badCommentsOutputStream) {this.goodCommentsOutputStream = goodCommentsOutputStream;this.badCommentsOutputStream = badCommentsOutputStream;}/**** @param text 行文本内容* @param nullWritable* @throws IOException* @throws InterruptedException*/@Overridepublic void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {//1:从行文本数据中获取第9个字段String[] split = text.toString().split("\t");String numStr = split[9];//2:根据字段的值,判断评论的类型,然后将对应的数据写入不同的文件夹文件中if(Integer.parseInt(numStr) <= 1){//好评或者中评goodCommentsOutputStream.write(text.toString().getBytes());// 添加换行符goodCommentsOutputStream.write("\r\n".getBytes());}else{//差评badCommentsOutputStream.write(text.toString().getBytes());badCommentsOutputStream.write("\r\n".getBytes());}}@Overridepublic void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {IOUtils.closeStream(goodCommentsOutputStream);IOUtils.closeStream(badCommentsOutputStream);}
}
// 第二步
package myoutput_format;import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {// 1.获取目标文件输出流(两个)FileSystem fileSystem = FileSystem.get(taskAttemptContext.getConfiguration());// 文件写入路径FSDataOutputStream goodCommentsOutputStream = fileSystem.create(new Path("hdfs://hadoop01:9000/hadoop_mapreduce/good_comments/good_comments.txt"));FSDataOutputStream badCommentsOutputStream = fileSystem.create(new Path("hdfs://hadoop01:9000/hadoop_mapreduce/bad_comments/bad_comments.txt"));// 2.传递给----->MyRecotdWriter类使用(有参构造传递)MyRecordWriter myRecordWriter = new MyRecordWriter(goodCommentsOutputStream,badCommentsOutputStream);return myRecordWriter;}}
// 第三步
package myoutput_format;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MyOutputFormatMapper extends Mapper<LongWritable, Text,Text, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(value,NullWritable.get());}
}
// 第四步
package myoutput_format;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.net.URI;public class JobMain extends Configured implements Tool {//该方法用于指定一个job任务@Overridepublic int run(String[] strings) throws Exception {// 1.创建 job任务对象 两个参数 1.configuration 2.job任务名称Job job = Job.getInstance(super.getConf(), "output_format");// 打包运行出错添加job.setJarByClass(JobMain.class);// 2.配置 job任务对象(八个步骤)// 2.1 读取文件 ---指定读取类job.setInputFormatClass(TextInputFormat.class);// 指点源文件路径TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:9000/hadoop_mapreduce/myoutput_format"));// 2.2 进入指定map阶段处理方式和数据类型// 设置map阶段用的类job.setMapperClass(MyOutputFormatMapper.class);// 设置Map阶段K2的类型 --- 单词(字符串)job.setMapOutputKeyClass(Text.class);// 设置Map阶段V2的类型 ---job.setMapOutputValueClass(NullWritable.class);// 2.3(4,5,6) 进入Shuffle阶段 --先采用默认方式处理// 2.7 指定Reduce阶段的数据类型// 2.8 设置输出类型 -- 保存在二进制文件中job.setOutputFormatClass(MyOutputFormat.class);// 设置输出路径
// TextOutputFormat.setOutputPath(job,new Path("hdfs://hadoop01:9000/wordcount_out"));// 判断目标目录是否存在,存在则删除Path path = new Path("hdfs://hadoop01:9000/hadoop_mapreduce/myoutput_format_out");SequenceFileOutputFormat.setOutputPath(job,path);// 获取hdfs文件系统FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000/hadoop_mapreduce/"), new Configuration());// --本地测试--
// FileSystem fileSystem = FileSystem.get(new URI("file:///D:\\output"), new Configuration());// 判断目录是否存在boolean exists = fileSystem.exists(path);if (exists){// 删除目标目录fileSystem.delete(path,true);}// 等待任务结束boolean bl = job.waitForCompletion(true);return bl ? 0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// 启动 job任务 --记录任务执行状态 0表示成功int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);}
}