在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat和OutputFormat,就可以完成这个需求,这里简单的介绍一个从MongoDB中读数据,并写出数据到MongoDB中的一种情况,只是一个Demo,所以数据随便找的一个。
一、自定义InputFormat
MapReduce中Map阶段的数据输入是由InputFormat决定的,我们查看org.apache.hadoop.mapreduce.InputFormat的源码可以看到以下代码内容,我们可以看到除了实现InputFormat抽象类以外,我们还需要自定义InputSplit和自定义RecordReader类,这两个类的主要作用分别是:split确定数据分片的大小以及数据的位置信息,recordReader具体的读取数据。
public abstract class InputFormat<K, V> {public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; // 获取Map阶段的数据分片集合信息 public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 创建具体的数据读取对象 }
1、自定义InputSplit
自定义InputSplit主要需要实现的方法有一下几个:
public abstract class InputSplit { public abstract long getLength() throws IOException, InterruptedException; // 获取当前分片的长度大小 public abstract String[] getLocations() throws IOException, InterruptedException; // 获取当前分片的位置信息 }
2、自定义RecordReader
自定义RecordReader的主要实现方法有一下几个:
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable { public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 初始化,如果在构造函数中初始化了,那么该方法可以为空public abstract boolean nextKeyValue() throws IOException, InterruptedException; //是否存在下一个key/value,如果存在返回true。否则返回false。public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; // 获取当然keypublic abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; // 获取当然valuepublic abstract float getProgress() throws IOException, InterruptedException; // 获取进度信息public abstract void close() throws IOException; // 关闭资源 }
二、自定义OutputFormat
MapReduce中Reducer阶段的数据输出是由OutputFormat决定的,决定数据的输出目的地和job的提交对象,我们查看org.apache.hadoop.mapreduce.OutputFormat的源码可以看到以下代码内容,我们可以看到除了实现OutputFormat抽象类以外,我们还需要自定义RecordWriter和自定义OutputCommitter类,其中OutputCommitter类由于不涉及到具体的输出目的地,所以一般情况下,不用重写,可直接使用FileOutputcommitter对象;RecordWriter类是具体的定义如何将数据写到目的地的。
public abstract class OutputFormat<K, V> { public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取具体的数据写出对象public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException; // 检查输出配置信息是否正确public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取输出job的提交者对象 }
1、自定义RecordWriter
查看RecordWriter源码,我们可以看到主要需要实现的有下列三个方法,分别是:
public abstract class RecordWriter<K, V> { public abstract void write(K key, V value) throws IOException, InterruptedException; // 具体的写数据的方法public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException; // 关闭资源 }
三、详细代码
自定义InputFormat&InputSplit
1 package com.gerry.mongo.hadoop2x.mr.mongodb.lib; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.util.ArrayList; 7 import java.util.List; 8 import java.util.Map; 9 10 import org.apache.hadoop.conf.Configurable; 11 import org.apache.hadoop.conf.Configuration; 12 import org.apache.hadoop.io.LongWritable; 13 import org.apache.hadoop.io.Writable; 14 import org.apache.hadoop.mapreduce.InputFormat; 15 import org.apache.hadoop.mapreduce.InputSplit; 16 import org.apache.hadoop.mapreduce.JobContext; 17 import org.apache.hadoop.mapreduce.MRJobConfig; 18 import org.apache.hadoop.mapreduce.RecordReader; 19 import org.apache.hadoop.mapreduce.TaskAttemptContext; 20 import org.apache.log4j.Logger; 21 22 import com.mongodb.BasicDBObject; 23 import com.mongodb.BasicDBObjectBuilder; 24 import com.mongodb.DB; 25 import com.mongodb.DBCollection; 26 import com.mongodb.DBObject; 27 import com.mongodb.Mongo; 28 import com.mongodb.MongoException; 29 30 public class MongoDBInputFormat<T extends MongoDBWritable> extends InputFormat<LongWritable, T> implements Configurable { 31 private static final Logger LOG = Logger.getLogger(MongoDBInputFormat.class); 32 33 /** 34 * 空的对象,主要作用是不进行任何操作,类似于NullWritable 35 */ 36 public static class NullMongoDBWritable implements MongoDBWritable, Writable { 37 @Override 38 public void write(DBCollection collection) throws MongoException { 39 // TODO Auto-generated method stub 40 } 41 42 @Override 43 public void readFields(DBObject object) throws MongoException { 44 // TODO Auto-generated method stub 45 } 46 47 @Override 48 public void write(DataOutput out) throws IOException { 49 // TODO Auto-generated method stub 50 } 51 52 @Override 53 public void readFields(DataInput in) throws IOException { 54 // TODO Auto-generated method stub 55 } 56 57 @Override 58 public DBObject fetchWriteDBObject(DBObject old) throws MongoException { 59 // TODO Auto-generated method stub 60 return old; 61 } 62 63 } 64 65 /** 66 * MongoDB的input split类 67 */ 68 public static class MongoDBInputSplit extends InputSplit implements Writable { 69 private long end = 0; 70 private long start = 0; 71 72 /** 73 * 默认构造方法 74 */ 75 public MongoDBInputSplit() { 76 } 77 78 /** 79 * 便利的构造方法 80 * 81 * @param start 82 * 集合中查询的文档开始行号 83 * @param end 84 * 集合中查询的文档结束行号 85 */ 86 public MongoDBInputSplit(long start, long end) { 87 this.start = start; 88 this.end = end; 89 } 90 91 public long getEnd() { 92 return end; 93 } 94 95 public long getStart() { 96 return start; 97 } 98 99 @Override 100 public void write(DataOutput out) throws IOException { 101 out.writeLong(this.start); 102 out.writeLong(this.end); 103 } 104 105 @Override 106 public void readFields(DataInput in) throws IOException { 107 this.start = in.readLong(); 108 this.end = in.readLong(); 109 } 110 111 @Override 112 public long getLength() throws IOException, InterruptedException { 113 // 分片大小 114 return this.end - this.start; 115 } 116 117 @Override 118 public String[] getLocations() throws IOException, InterruptedException { 119 // TODO 返回一个空的数组,表示不进行数据本地化的优化,那么map执行节点随机选择。 120 return new String[] {}; 121 } 122 123 } 124 125 protected MongoDBConfiguration mongoConfiguration; // mongo相关配置信息 126 protected Mongo mongo; // mongo连接 127 protected String databaseName; // 连接的数据库名称 128 protected String collectionName; // 连接的集合名称 129 protected DBObject conditionQuery; // 选择条件 130 protected DBObject fieldQuery; // 需要的字段条件 131 132 @Override 133 public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException { 134 DBCollection dbCollection = null; 135 try { 136 dbCollection = this.getDBCollection(); 137 // 获取数量大小 138 long count = dbCollection.count(this.getConditionQuery()); 139 int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1); 140 long chunkSize = (count / chunks); // 分片数量 141 142 // 开始分片,只是简单的分配每个分片的数据量 143 List<InputSplit> splits = new ArrayList<InputSplit>(); 144 for (int i = 0; i < chunks; i++) { 145 MongoDBInputSplit split = null; 146 if ((i + 1) == chunks) { 147 split = new MongoDBInputSplit(i * chunkSize, count); 148 } else { 149 split = new MongoDBInputSplit(i * chunkSize, (i * chunkSize) + chunkSize); 150 } 151 splits.add(split); 152 } 153 return splits; 154 } catch (Exception e) { 155 throw new IOException(e); 156 } finally { 157 dbCollection = null; 158 closeConnection(); // 关闭资源的连接 159 } 160 } 161 162 @Override 163 public RecordReader<LongWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 164 return createRecordReader((MongoDBInputSplit) split, context.getConfiguration()); 165 } 166 167 protected RecordReader<LongWritable, T> createRecordReader(MongoDBInputSplit split, Configuration conf) { 168 // 获取从mongodb中读取数据需要转换成的value class,默认为NullMongoDBWritable 169 Class<? extends MongoDBWritable> valueClass = this.mongoConfiguration.getValueClass(); 170 return new MongoDBRecordReader<T>(split, valueClass, conf, getDBCollection(), getConditionQuery(), getFieldQuery()); 171 } 172 173 @Override 174 public void setConf(Configuration conf) { 175 mongoConfiguration = new MongoDBConfiguration(conf); 176 databaseName = this.mongoConfiguration.getInputDatabaseName(); // 输入数据的数据库 177 collectionName = this.mongoConfiguration.getInputCollectionName(); // 输入数据的集合 178 getMongo(); // 初始化 179 getConditionQuery(); // 初始化 180 getFieldQuery(); // 初始化 181 } 182 183 @Override 184 public Configuration getConf() { 185 return this.mongoConfiguration.getConfiguration(); 186 } 187 188 public Mongo getMongo() { 189 try { 190 if (null == this.mongo) { 191 this.mongo = this.mongoConfiguration.getMongoConnection(); 192 } 193 } catch (Exception e) { 194 throw new RuntimeException(e); 195 } 196 return mongo; 197 } 198 199 public DBObject getConditionQuery() { 200 if (null == this.conditionQuery) { 201 Map<String, String> conditions = this.mongoConfiguration.getInputConditions(); 202 BasicDBObjectBuilder builder = new BasicDBObjectBuilder(); 203 for (Map.Entry<String, String> entry : conditions.entrySet()) { 204 if (entry.getValue() != null) { 205 builder.append(entry.getKey(), entry.getValue()); 206 } else { 207 builder.push(entry.getKey()); 208 } 209 } 210 if (builder.isEmpty()) { 211 this.conditionQuery = new BasicDBObject(); 212 } else { 213 this.conditionQuery = builder.get(); 214 } 215 } 216 return this.conditionQuery; 217 } 218 219 public DBObject getFieldQuery() { 220 if (fieldQuery == null) { 221 String[] fields = this.mongoConfiguration.getInputFieldNames(); 222 if (fields != null && fields.length > 0) { 223 BasicDBObjectBuilder builder = new BasicDBObjectBuilder(); 224 for (String field : fields) { 225 builder.push(field); 226 } 227 fieldQuery = builder.get(); 228 } else { 229 fieldQuery = new BasicDBObject(); 230 } 231 } 232 return fieldQuery; 233 } 234 235 protected DBCollection getDBCollection() { 236 DB db = getMongo().getDB(this.databaseName); 237 if (this.mongoConfiguration.isEnableAuth()) { 238 String username = this.mongoConfiguration.getUsername(); 239 String password = this.mongoConfiguration.getPassword(); 240 if (!db.authenticate(username, password.toCharArray())) { 241 throw new RuntimeException("authenticate failure with the username:" + username + ",pwd:" + password); 242 } 243 } 244 return db.getCollection(collectionName); 245 } 246 247 protected void closeConnection() { 248 try { 249 if (null != this.mongo) { 250 this.mongo.close(); 251 this.mongo = null; 252 } 253 } catch (Exception e) { 254 LOG.debug("Exception on close", e); 255 } 256 } 257 }
自定义RecordReader
package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.util.ReflectionUtils;import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject;public class MongoDBRecordReader<T extends MongoDBWritable> extends RecordReader<LongWritable, T> {private Class<? extends MongoDBWritable> valueClass;private LongWritable key;private T value;private long pos;private Configuration conf;private MongoDBInputFormat.MongoDBInputSplit split;private DBCollection collection;private DBObject conditionQuery;private DBObject fieldQuery;private DBCursor cursor;public MongoDBRecordReader(MongoDBInputFormat.MongoDBInputSplit split, Class<? extends MongoDBWritable> valueClass, Configuration conf, DBCollection collection, DBObject conditionQuery,DBObject fieldQuery) {this.split = split;this.valueClass = valueClass;this.collection = collection;this.conditionQuery = conditionQuery;this.fieldQuery = fieldQuery;this.conf = conf;}@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {// do nothing }@SuppressWarnings("unchecked")@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {try {if (key == null) {key = new LongWritable();}if (value == null) {value = (T) ReflectionUtils.newInstance(valueClass, conf);}if (null == cursor) {cursor = executeQuery();}if (!cursor.hasNext()) {return false;}key.set(pos + split.getStart()); // 设置keyvalue.readFields(cursor.next()); // 设置valuepos++;} catch (Exception e) {throw new IOException("Exception in nextKeyValue", e);}return true;}protected DBCursor executeQuery() {try {return collection.find(conditionQuery, fieldQuery).skip((int) split.getStart()).limit((int) split.getLength());} catch (IOException | InterruptedException e) {throw new RuntimeException(e);}}@Overridepublic LongWritable getCurrentKey() throws IOException, InterruptedException {return this.key;}@Overridepublic T getCurrentValue() throws IOException, InterruptedException {return this.value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return pos;}@Overridepublic void close() throws IOException {if (collection != null) {collection.getDB().getMongo().close();}}}
自定义OutputFormat&RecordWriter
package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.Logger;import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.Mongo;public class MongoDBOutputFormat<K extends MongoDBWritable, V extends MongoDBWritable> extends OutputFormat<K, V> {private static Logger LOG = Logger.getLogger(MongoDBOutputFormat.class);/*** A RecordWriter that writes the reduce output to a MongoDB collection* * @param <K>* @param <T>*/public static class MongoDBRecordWriter<K extends MongoDBWritable, V extends MongoDBWritable> extends RecordWriter<K, V> {private Mongo mongo;private String databaseName;private String collectionName;private MongoDBConfiguration dbConf;private DBCollection dbCollection;private DBObject dbObject;private boolean enableFetchMethod;public MongoDBRecordWriter(MongoDBConfiguration dbConf, Mongo mongo, String databaseName, String collectionName) {this.mongo = mongo;this.databaseName = databaseName;this.collectionName = collectionName;this.dbConf = dbConf;this.enableFetchMethod = this.dbConf.isEnableUseFetchMethod();getDbCollection();// 创建连接 }protected DBCollection getDbCollection() {if (null == this.dbCollection) {DB db = this.mongo.getDB(this.databaseName);if (this.dbConf.isEnableAuth()) {String username = this.dbConf.getUsername();String password = this.dbConf.getPassword();if (!db.authenticate(username, password.toCharArray())) {throw new RuntimeException("authenticate failure, the username:" + username + ", pwd:" + password);}}this.dbCollection = db.getCollection(this.collectionName);}return this.dbCollection;}@Overridepublic void write(K key, V value) throws IOException, InterruptedException {if (this.enableFetchMethod) {this.dbObject = key.fetchWriteDBObject(null);this.dbObject = value.fetchWriteDBObject(this.dbObject);// 写数据this.dbCollection.insert(this.dbObject);// 在这里可以做一个缓存,一起提交,如果数据量大的情况下。this.dbObject = null;} else {// 直接调用写方法 key.write(dbCollection);value.write(dbCollection);}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if (this.mongo != null) {this.dbCollection = null;this.mongo.close();}}}@Overridepublic RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {try {MongoDBConfiguration dbConf = new MongoDBConfiguration(context.getConfiguration());String databaseName = dbConf.getOutputDatabaseName();String collectionName = dbConf.getOutputCollectionName();Mongo mongo = dbConf.getMongoConnection();return new MongoDBRecordWriter<K, V>(dbConf, mongo, databaseName, collectionName);} catch (Exception e) {LOG.error("Create the record writer occur exception.", e);throw new IOException(e);}}@Overridepublic void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {// 不进行检测 }@Overridepublic OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {// 由于outputcommitter主要作用是提交jar,分配jar的功能。所以我们这里直接使用FileOutputCommitterreturn new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);}/*** 设置output属性* * @param job* @param databaseName* @param collectionName*/public static void setOutput(Job job, String databaseName, String collectionName) {job.setOutputFormatClass(MongoDBOutputFormat.class);job.setReduceSpeculativeExecution(false);MongoDBConfiguration mdc = new MongoDBConfiguration(job.getConfiguration());mdc.setOutputCollectionName(collectionName);mdc.setOutputDatabaseName(databaseName);}/*** 静止使用fetch方法* * @param conf*/public static void disableFetchMethod(Configuration conf) {conf.setBoolean(MongoDBConfiguration.OUTPUT_USE_FETCH_METHOD_PROPERTY, false);} }
其他涉及到的java代码
package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.lib.db.DBWritable;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat.NullMongoDBWritable; import com.mongodb.Mongo; import com.mongodb.ServerAddress;public class MongoDBConfiguration {public static final String BIND_HOST_PROPERTY = "mapreduce.mongo.host";public static final String BIND_PORT_PROPERTY = "mapreduce.mongo.port";public static final String AUTH_ENABLE_PROPERTY = "mapreduce.mongo.auth.enable";public static final String USERNAME_PROPERTY = "mapreduce.mongo.username";public static final String PASSWORD_PROPERTY = "mapreduce.mongo.password";public static final String PARTITION_PROPERTY = "mapreduce.mongo.partition";public static final String INPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.input.database.name";public static final String INPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.input.collection.name";public static final String INPUT_FIELD_NAMES_PROPERTY = "mapreduce.mongo.input.field.names";public static final String INPUT_CONDITIONS_PROPERTY = "mapreduce.mongo.input.conditions";public static final String INPUT_CLASS_PROPERTY = "mapreduce.mongo.input.class";public static final String OUTPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.output.database.name";public static final String OUTPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.output.collection.name";// 在recordwriter中到底是否调用fetch方法,默认调用。如果设置为不调用,那么就直接使用writer方法public static final String OUTPUT_USE_FETCH_METHOD_PROPERTY = "mapreduce.mongo.output.use.fetch.method";private Configuration conf;public MongoDBConfiguration(Configuration conf) {this.conf = conf;}/*** 获取Configuration对象* * @return*/public Configuration getConfiguration() {return this.conf;}/*** 设置连接信息* * @param host* @param port* @return*/public MongoDBConfiguration configureDB(String host, int port) {return this.configureDB(host, port, false, null, null);}/*** 设置连接信息* * @param host* @param port* @param enableAuth* @param username* @param password* @return*/public MongoDBConfiguration configureDB(String host, int port, boolean enableAuth, String username, String password) {this.conf.set(BIND_HOST_PROPERTY, host);this.conf.setInt(BIND_PORT_PROPERTY, port);if (enableAuth) {this.conf.setBoolean(AUTH_ENABLE_PROPERTY, true);this.conf.set(USERNAME_PROPERTY, username);this.conf.set(PASSWORD_PROPERTY, password);}return this;}/*** 获取MongoDB的连接对象Connection对象* * @return* @throws UnknownHostException*/public Mongo getMongoConnection() throws UnknownHostException {return new Mongo(new ServerAddress(this.getBindHost(), this.getBindPort()));}/*** 获取设置的host* * @return*/public String getBindHost() {return this.conf.get(BIND_HOST_PROPERTY, "localhost");}/*** 获取设置的port* * @return*/public int getBindPort() {return this.conf.getInt(BIND_PORT_PROPERTY, 27017);}/*** 获取是否开启安全验证,默认的Mongodb是不开启的。* * @return*/public boolean isEnableAuth() {return this.conf.getBoolean(AUTH_ENABLE_PROPERTY, false);}/*** 获取完全验证所需要的用户名* * @return*/public String getUsername() {return this.conf.get(USERNAME_PROPERTY);}/*** 获取安全验证所需要的密码* * @return*/public String getPassword() {return this.conf.get(PASSWORD_PROPERTY);}public String getPartition() {return conf.get(PARTITION_PROPERTY, "|");}public MongoDBConfiguration setPartition(String partition) {conf.set(PARTITION_PROPERTY, partition);return this;}public String getInputDatabaseName() {return conf.get(INPUT_DATABASE_NAME_PROPERTY, "test");}public MongoDBConfiguration setInputDatabaseName(String databaseName) {conf.set(INPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getInputCollectionName() {return conf.get(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "test");}public void setInputCollectionName(String tableName) {conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, tableName);}public String[] getInputFieldNames() {return conf.getStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY);}public void setInputFieldNames(String... fieldNames) {conf.setStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);}public Map<String, String> getInputConditions() {Map<String, String> result = new HashMap<String, String>();String[] conditions = conf.getStrings(INPUT_CONDITIONS_PROPERTY);if (conditions != null && conditions.length > 0) {String partition = this.getPartition();String[] values = null;for (String condition : conditions) {values = condition.split(partition);if (values != null && values.length == 2) {result.put(values[0], values[1]);} else {result.put(condition, null);}}}return result;}public void setInputConditions(Map<String, String> conditions) {if (conditions != null && conditions.size() > 0) {String[] values = new String[conditions.size()];String partition = this.getPartition();int k = 0;for (Map.Entry<String, String> entry : conditions.entrySet()) {if (entry.getValue() != null) {values[k++] = entry.getKey() + partition + entry.getValue();} else {values[k++] = entry.getKey();}}conf.setStrings(INPUT_CONDITIONS_PROPERTY, values);}}public Class<? extends MongoDBWritable> getValueClass() {return conf.getClass(INPUT_CLASS_PROPERTY, NullMongoDBWritable.class, MongoDBWritable.class);}public void setInputClass(Class<? extends DBWritable> inputClass) {conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);}public String getOutputDatabaseName() {return conf.get(OUTPUT_DATABASE_NAME_PROPERTY, "test");}public MongoDBConfiguration setOutputDatabaseName(String databaseName) {conf.set(OUTPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getOutputCollectionName() {return conf.get(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, "test");}public void setOutputCollectionName(String tableName) {conf.set(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, tableName);}public boolean isEnableUseFetchMethod() {return conf.getBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, true);}public void setOutputUseFetchMethod(boolean useFetchMethod) {conf.setBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, useFetchMethod);} }
package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MongoException;public interface MongoDBWritable {/*** 往mongodb的集合中写数据* * @param collection* @throws MongoException*/public void write(DBCollection collection) throws MongoException;/*** 获取要写的mongoDB对象* * @param old* @return* @throws MongoException*/public DBObject fetchWriteDBObject(DBObject old) throws MongoException;/*** 从mongodb的集合中读数据* * @param collection* @throws MongoException*/public void readFields(DBObject object) throws MongoException; }
package com.gerry.mongo.hadoop2x.mr.mongodb.nw;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Date; import java.util.HashSet; import java.util.Set;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBConfiguration; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBOutputFormat; import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBWritable; import com.mongodb.BasicDBObject; import com.mongodb.BasicDBObjectBuilder; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MongoException;public class Demo {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();// 设置输入的mongodb的数据库和集合,以及对应的输入对象value,这里的数据库和集合要求存在,否则是没有数据的,当然没有数据不会出问题conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "users");conf.set(MongoDBConfiguration.INPUT_DATABASE_NAME_PROPERTY, "db_java");conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, DemoInputValueAndOutputKey.class, MongoDBWritable.class);Job job = Job.getInstance(conf, "mongodb-demo");job.setJarByClass(Demo.class);job.setMapperClass(DemoMapper.class);job.setReducerClass(DemoReducer.class);job.setOutputKeyClass(DemoInputValueAndOutputKey.class);job.setOutputValueClass(DemoOutputValue.class);job.setMapOutputKeyClass(DemoInputValueAndOutputKey.class);job.setMapOutputValueClass(NullWritable.class);job.setInputFormatClass(MongoDBInputFormat.class);MongoDBOutputFormat.setOutput(job, "foobar2", "users"); // 这个可以不存在 job.waitForCompletion(true);}public static class DemoOutputValue implements Writable, MongoDBWritable {private Date clientTime;private long count;@Overridepublic void write(DBCollection collection) throws MongoException {throw new UnsupportedOperationException();}@Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder = null;Set<String> keys = new HashSet<String>();if (old != null) {keys = old.keySet();builder = BasicDBObjectBuilder.start(old.toMap());} else {builder = new BasicDBObjectBuilder();}// 添加当前对象的value值,如果存在同样的key,那么加序号builder.append(getKey(keys, "time", 0), clientTime).append(getKey(keys, "count", 0), this.count);return builder.get();}@Overridepublic void readFields(DBObject object) throws MongoException {throw new UnsupportedOperationException();}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(this.clientTime.getTime());out.writeLong(this.count);}@Overridepublic void readFields(DataInput in) throws IOException {this.clientTime = new Date(in.readLong());this.count = in.readLong();}public Date getClientTime() {return clientTime;}public void setClientTime(Date clientTime) {this.clientTime = clientTime;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}}public static class DemoInputValueAndOutputKey implements MongoDBWritable, WritableComparable<DemoInputValueAndOutputKey> {private String name;private Integer age;private String sex;@Overridepublic void write(DataOutput out) throws IOException {if (this.name == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.name);}if (this.age == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeInt(this.age);}if (this.sex == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.sex);}}@Overridepublic void readFields(DataInput in) throws IOException {this.name = in.readBoolean() ? in.readUTF() : null;this.age = in.readBoolean() ? Integer.valueOf(in.readInt()) : null;this.sex = in.readBoolean() ? in.readUTF() : null;}@Overridepublic void write(DBCollection collection) throws MongoException {DBObject object = new BasicDBObject();object.put("name", this.name);object.put("age", this.age.intValue());object.put("sex", this.sex);collection.insert(object);}@Overridepublic void readFields(DBObject object) throws MongoException {this.name = (String) object.get("name");this.age = (Integer) object.get("age");this.sex = (String) object.get("sex");}@Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder = null;Set<String> keys = new HashSet<String>();if (old != null) {keys = old.keySet();builder = BasicDBObjectBuilder.start(old.toMap());} else {builder = new BasicDBObjectBuilder();}// 添加当前对象的value值,如果存在同样的key,那么加序号if (this.name != null) {builder.append(getKey(keys, "name", 0), this.name);}if (this.age != null) {builder.append(getKey(keys, "age", 0), this.age.intValue());}if (this.sex != null) {builder.append(getKey(keys, "sex", 0), this.sex);}return builder.get();}@Overridepublic String toString() {return "DemoInputValue [name=" + name + ", age=" + age + ", sex=" + sex + "]";}@Overridepublic int compareTo(DemoInputValueAndOutputKey o) {int tmp;if (this.name == null) {if (o.name != null) {return -1;}} else if (o.name == null) {return 1;} else {tmp = this.name.compareTo(o.name);if (tmp != 0) {return tmp;}}if (this.age == null) {if (o.age != null) {return -1;}} else if (o.age == null) {return 1;} else {tmp = this.age - o.age;if (tmp != 0) {return tmp;}}if (this.sex == null) {if (o.sex != null) {return -1;}} else if (o.sex == null) {return 1;} else {return this.sex.compareTo(o.sex);}return 0;}}/*** 直接输出* * @author jsliuming* */public static class DemoMapper extends Mapper<LongWritable, DemoInputValueAndOutputKey, DemoInputValueAndOutputKey, NullWritable> {@Overrideprotected void map(LongWritable key, DemoInputValueAndOutputKey value, Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}/*** 写出数据,只做一个统计操作* * @author jsliuming* */public static class DemoReducer extends Reducer<DemoInputValueAndOutputKey, NullWritable, DemoInputValueAndOutputKey, DemoOutputValue> {private DemoOutputValue outputValue = new DemoOutputValue();@Overrideprotected void reduce(DemoInputValueAndOutputKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {long sum = 0;for (@SuppressWarnings("unused")NullWritable value : values) {sum++;}outputValue.setClientTime(new Date());outputValue.setCount(sum);context.write(key, outputValue);}}/*** 转换key,作用是当key存在keys集合中的时候,在key后面添加序号* * @param keys* @param key* @param index* @return*/public static String getKey(Set<String> keys, String key, int index) {while (keys.contains(key)) {key = key + (index++);}return key;} }
四、结果截图