使用MapReduce将Mysql数据导入HDFS代码链接
将HDFS数据导入Mysql,代码示例
package com.zhen.mysqlToHDFS;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;/*** @author FengZhen* 将hdfs数据导入mysql* 使用DBOutputFormat将HDFS路径下的结构化数据写入mysql中,结构化数据如下,第一列为key,后边三列为数据* 0 1 Enzo 180.66* 1 2 Din 170.666* */ public class DBOutputFormatApp extends Configured implements Tool{/*** JavaBean* 需要实现Hadoop序列化接口Writable以及与数据库交互时的序列化接口DBWritable* 官方API中解释如下:* public class DBInputFormat<T extends DBWritable>* extends InputFormat<LongWritable, T> implements Configurable* 即Mapper的Key是LongWritable类型,不可改变;Value是继承自DBWritable接口的自定义JavaBean*/public static class BeanWritable implements Writable, DBWritable {private int id;private String name;private double height;public void readFields(ResultSet resultSet) throws SQLException {this.id = resultSet.getInt(1);this.name = resultSet.getString(2);this.height = resultSet.getDouble(3);}public void write(PreparedStatement preparedStatement) throws SQLException {preparedStatement.setInt(1, id);preparedStatement.setString(2, name);preparedStatement.setDouble(3, height);}public void readFields(DataInput dataInput) throws IOException {this.id = dataInput.readInt();this.name = dataInput.readUTF();this.height = dataInput.readDouble();}public void write(DataOutput dataOutput) throws IOException {dataOutput.writeInt(id);dataOutput.writeUTF(name);dataOutput.writeDouble(height);}public void set(int id,String name,double height){this.id = id;this.name = name;this.height = height;}@Overridepublic String toString() {return id + "\t" + name + "\t" + height;}}public static class DBOutputMapper extends Mapper<LongWritable, Text, NullWritable, BeanWritable>{private NullWritable outputKey;private BeanWritable outputValue;@Overrideprotected void setup(Mapper<LongWritable, Text, NullWritable, BeanWritable>.Context context)throws IOException, InterruptedException {this.outputKey = NullWritable.get();this.outputValue = new BeanWritable();}@Overrideprotected void map(LongWritable key, Text value,Mapper<LongWritable, Text, NullWritable, BeanWritable>.Context context)throws IOException, InterruptedException {//插入数据库成功的计数器final Counter successCounter = context.getCounter("exec", "successfully");//插入数据库失败的计数器final Counter faildCounter = context.getCounter("exec", "faild");//解析结构化数据String[] fields = value.toString().split("\t");//DBOutputFormatApp这个MapReduce应用导出的数据包含long类型的key,所以忽略key从1开始if (fields.length > 3) {int id = Integer.parseInt(fields[1]);String name = fields[2];double height = Double.parseDouble(fields[3]);this.outputValue.set(id, name, height);context.write(outputKey, outputValue);//如果插入数据库成功则递增1,表示成功计数successCounter.increment(1L);}else{//如果插入数据库失败则递增1,表示失败计数faildCounter.increment(1L);}}}/*** 输出的key必须是继承自DBWritable的类型,DBOutputFormat要求输出的key必须是DBWritable类型* */public static class DBOutputReducer extends Reducer<NullWritable, BeanWritable, BeanWritable, NullWritable>{@Overrideprotected void reduce(NullWritable key, Iterable<BeanWritable> values,Reducer<NullWritable, BeanWritable, BeanWritable, NullWritable>.Context context)throws IOException, InterruptedException {for (BeanWritable beanWritable : values) {context.write(beanWritable, key);}}}public int run(String[] arg0) throws Exception {Configuration configuration = getConf();//在创建Configuration的时候紧接着配置数据库连接信息DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/hadoop", "root", "123qwe");Job job = Job.getInstance(configuration, DBOutputFormatApp.class.getSimpleName());job.setJarByClass(DBOutputFormatApp.class);job.setMapperClass(DBOutputMapper.class);job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(BeanWritable.class);job.setReducerClass(DBOutputReducer.class);job.setOutputFormatClass(DBOutputFormat.class);job.setOutputKeyClass(BeanWritable.class);job.setOutputValueClass(NullWritable.class);job.setInputFormatClass(TextInputFormat.class);FileInputFormat.setInputPaths(job, arg0[0]);//配置当前作业输出到数据库表、字段信息DBOutputFormat.setOutput(job, "people", new String[]{"id","name","height"});return job.waitForCompletion(true)?0:1;}public static int createJob(String[] args){Configuration conf = new Configuration();conf.set("dfs.datanode.socket.write.timeout", "7200000");conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");int status = 0;try {status = ToolRunner.run(conf,new DBOutputFormatApp(), args);} catch (Exception e) {e.printStackTrace();}return status;}public static void main(String[] args) {args = new String[]{"/user/hadoop/mapreduce/mysqlToHdfs/people"};int status = createJob(args);System.exit(status);}}
打成jar包,放在服务器上,执行hadoop jar命令
hadoop jar /Users/FengZhen/Desktop/Hadoop/other/mapreduce_jar/HDFSToMysql.jar com.zhen.mysqlToHDFS.DBOutputFormatApp
任务结束后mysql表中即可发现数据已经有了。