[Hadoop] - 自定义Mapreduce InputFormatOutputFormat

  在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat和OutputFormat,就可以完成这个需求,这里简单的介绍一个从MongoDB中读数据,并写出数据到MongoDB中的一种情况,只是一个Demo,所以数据随便找的一个。


一、自定义InputFormat

  MapReduce中Map阶段的数据输入是由InputFormat决定的,我们查看org.apache.hadoop.mapreduce.InputFormat的源码可以看到以下代码内容,我们可以看到除了实现InputFormat抽象类以外,我们还需要自定义InputSplit和自定义RecordReader类,这两个类的主要作用分别是:split确定数据分片的大小以及数据的位置信息,recordReader具体的读取数据。

public abstract class InputFormat<K, V> {public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; // 获取Map阶段的数据分片集合信息 public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 创建具体的数据读取对象
}

  1、自定义InputSplit

    自定义InputSplit主要需要实现的方法有一下几个:

public abstract class InputSplit {  public abstract long getLength() throws IOException, InterruptedException; // 获取当前分片的长度大小  public abstract String[] getLocations() throws IOException, InterruptedException; // 获取当前分片的位置信息  
}

  2、自定义RecordReader

    自定义RecordReader的主要实现方法有一下几个:

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
  public abstract void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException; // 初始化,如果在构造函数中初始化了,那么该方法可以为空public abstract boolean nextKeyValue() throws IOException, InterruptedException; //是否存在下一个key/value,如果存在返回true。否则返回false。public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;  // 获取当然keypublic abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;  // 获取当然valuepublic abstract float getProgress() throws IOException, InterruptedException;  // 获取进度信息public abstract void close() throws IOException; // 关闭资源
}

二、自定义OutputFormat

  MapReduce中Reducer阶段的数据输出是由OutputFormat决定的,决定数据的输出目的地和job的提交对象,我们查看org.apache.hadoop.mapreduce.OutputFormat的源码可以看到以下代码内容,我们可以看到除了实现OutputFormat抽象类以外,我们还需要自定义RecordWriter和自定义OutputCommitter类,其中OutputCommitter类由于不涉及到具体的输出目的地,所以一般情况下,不用重写,可直接使用FileOutputcommitter对象;RecordWriter类是具体的定义如何将数据写到目的地的。

public abstract class OutputFormat<K, V> { public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取具体的数据写出对象public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException; // 检查输出配置信息是否正确public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException; // 获取输出job的提交者对象
}

  1、自定义RecordWriter

    查看RecordWriter源码,我们可以看到主要需要实现的有下列三个方法,分别是:

public abstract class RecordWriter<K, V> {  public abstract void write(K key, V value) throws IOException, InterruptedException;  // 具体的写数据的方法public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException; // 关闭资源
}

三、详细代码

  自定义InputFormat&InputSplit

  1 package com.gerry.mongo.hadoop2x.mr.mongodb.lib;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import java.util.ArrayList;
  7 import java.util.List;
  8 import java.util.Map;
  9 
 10 import org.apache.hadoop.conf.Configurable;
 11 import org.apache.hadoop.conf.Configuration;
 12 import org.apache.hadoop.io.LongWritable;
 13 import org.apache.hadoop.io.Writable;
 14 import org.apache.hadoop.mapreduce.InputFormat;
 15 import org.apache.hadoop.mapreduce.InputSplit;
 16 import org.apache.hadoop.mapreduce.JobContext;
 17 import org.apache.hadoop.mapreduce.MRJobConfig;
 18 import org.apache.hadoop.mapreduce.RecordReader;
 19 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 20 import org.apache.log4j.Logger;
 21 
 22 import com.mongodb.BasicDBObject;
 23 import com.mongodb.BasicDBObjectBuilder;
 24 import com.mongodb.DB;
 25 import com.mongodb.DBCollection;
 26 import com.mongodb.DBObject;
 27 import com.mongodb.Mongo;
 28 import com.mongodb.MongoException;
 29 
 30 public class MongoDBInputFormat<T extends MongoDBWritable> extends InputFormat<LongWritable, T> implements Configurable {
 31     private static final Logger LOG = Logger.getLogger(MongoDBInputFormat.class);
 32 
 33     /**
 34      * 空的对象,主要作用是不进行任何操作,类似于NullWritable
 35      */
 36     public static class NullMongoDBWritable implements MongoDBWritable, Writable {
 37         @Override
 38         public void write(DBCollection collection) throws MongoException {
 39             // TODO Auto-generated method stub
 40         }
 41 
 42         @Override
 43         public void readFields(DBObject object) throws MongoException {
 44             // TODO Auto-generated method stub
 45         }
 46 
 47         @Override
 48         public void write(DataOutput out) throws IOException {
 49             // TODO Auto-generated method stub
 50         }
 51 
 52         @Override
 53         public void readFields(DataInput in) throws IOException {
 54             // TODO Auto-generated method stub
 55         }
 56 
 57         @Override
 58         public DBObject fetchWriteDBObject(DBObject old) throws MongoException {
 59             // TODO Auto-generated method stub
 60             return old;
 61         }
 62 
 63     }
 64 
 65     /**
 66      * MongoDB的input split类
 67      */
 68     public static class MongoDBInputSplit extends InputSplit implements Writable {
 69         private long end = 0;
 70         private long start = 0;
 71 
 72         /**
 73          * 默认构造方法
 74          */
 75         public MongoDBInputSplit() {
 76         }
 77 
 78         /**
 79          * 便利的构造方法
 80          * 
 81          * @param start
 82          *            集合中查询的文档开始行号
 83          * @param end
 84          *            集合中查询的文档结束行号
 85          */
 86         public MongoDBInputSplit(long start, long end) {
 87             this.start = start;
 88             this.end = end;
 89         }
 90 
 91         public long getEnd() {
 92             return end;
 93         }
 94 
 95         public long getStart() {
 96             return start;
 97         }
 98 
 99         @Override
100         public void write(DataOutput out) throws IOException {
101             out.writeLong(this.start);
102             out.writeLong(this.end);
103         }
104 
105         @Override
106         public void readFields(DataInput in) throws IOException {
107             this.start = in.readLong();
108             this.end = in.readLong();
109         }
110 
111         @Override
112         public long getLength() throws IOException, InterruptedException {
113             // 分片大小
114             return this.end - this.start;
115         }
116 
117         @Override
118         public String[] getLocations() throws IOException, InterruptedException {
119             // TODO 返回一个空的数组,表示不进行数据本地化的优化,那么map执行节点随机选择。
120             return new String[] {};
121         }
122 
123     }
124 
125     protected MongoDBConfiguration mongoConfiguration; // mongo相关配置信息
126     protected Mongo mongo; // mongo连接
127     protected String databaseName; // 连接的数据库名称
128     protected String collectionName; // 连接的集合名称
129     protected DBObject conditionQuery; // 选择条件
130     protected DBObject fieldQuery; // 需要的字段条件
131 
132     @Override
133     public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
134         DBCollection dbCollection = null;
135         try {
136             dbCollection = this.getDBCollection();
137             // 获取数量大小
138             long count = dbCollection.count(this.getConditionQuery());
139             int chunks = job.getConfiguration().getInt(MRJobConfig.NUM_MAPS, 1);
140             long chunkSize = (count / chunks); // 分片数量
141 
142             // 开始分片,只是简单的分配每个分片的数据量
143             List<InputSplit> splits = new ArrayList<InputSplit>();
144             for (int i = 0; i < chunks; i++) {
145                 MongoDBInputSplit split = null;
146                 if ((i + 1) == chunks) {
147                     split = new MongoDBInputSplit(i * chunkSize, count);
148                 } else {
149                     split = new MongoDBInputSplit(i * chunkSize, (i * chunkSize) + chunkSize);
150                 }
151                 splits.add(split);
152             }
153             return splits;
154         } catch (Exception e) {
155             throw new IOException(e);
156         } finally {
157             dbCollection = null;
158             closeConnection(); // 关闭资源的连接
159         }
160     }
161 
162     @Override
163     public RecordReader<LongWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
164         return createRecordReader((MongoDBInputSplit) split, context.getConfiguration());
165     }
166 
167     protected RecordReader<LongWritable, T> createRecordReader(MongoDBInputSplit split, Configuration conf) {
168         // 获取从mongodb中读取数据需要转换成的value class,默认为NullMongoDBWritable
169         Class<? extends MongoDBWritable> valueClass = this.mongoConfiguration.getValueClass();
170         return new MongoDBRecordReader<T>(split, valueClass, conf, getDBCollection(), getConditionQuery(), getFieldQuery());
171     }
172 
173     @Override
174     public void setConf(Configuration conf) {
175         mongoConfiguration = new MongoDBConfiguration(conf);
176         databaseName = this.mongoConfiguration.getInputDatabaseName(); // 输入数据的数据库
177         collectionName = this.mongoConfiguration.getInputCollectionName(); // 输入数据的集合
178         getMongo(); // 初始化
179         getConditionQuery(); // 初始化
180         getFieldQuery(); // 初始化
181     }
182 
183     @Override
184     public Configuration getConf() {
185         return this.mongoConfiguration.getConfiguration();
186     }
187 
188     public Mongo getMongo() {
189         try {
190             if (null == this.mongo) {
191                 this.mongo = this.mongoConfiguration.getMongoConnection();
192             }
193         } catch (Exception e) {
194             throw new RuntimeException(e);
195         }
196         return mongo;
197     }
198 
199     public DBObject getConditionQuery() {
200         if (null == this.conditionQuery) {
201             Map<String, String> conditions = this.mongoConfiguration.getInputConditions();
202             BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
203             for (Map.Entry<String, String> entry : conditions.entrySet()) {
204                 if (entry.getValue() != null) {
205                     builder.append(entry.getKey(), entry.getValue());
206                 } else {
207                     builder.push(entry.getKey());
208                 }
209             }
210             if (builder.isEmpty()) {
211                 this.conditionQuery = new BasicDBObject();
212             } else {
213                 this.conditionQuery = builder.get();
214             }
215         }
216         return this.conditionQuery;
217     }
218 
219     public DBObject getFieldQuery() {
220         if (fieldQuery == null) {
221             String[] fields = this.mongoConfiguration.getInputFieldNames();
222             if (fields != null && fields.length > 0) {
223                 BasicDBObjectBuilder builder = new BasicDBObjectBuilder();
224                 for (String field : fields) {
225                     builder.push(field);
226                 }
227                 fieldQuery = builder.get();
228             } else {
229                 fieldQuery = new BasicDBObject();
230             }
231         }
232         return fieldQuery;
233     }
234 
235     protected DBCollection getDBCollection() {
236         DB db = getMongo().getDB(this.databaseName);
237         if (this.mongoConfiguration.isEnableAuth()) {
238             String username = this.mongoConfiguration.getUsername();
239             String password = this.mongoConfiguration.getPassword();
240             if (!db.authenticate(username, password.toCharArray())) {
241                 throw new RuntimeException("authenticate failure with the username:" + username + ",pwd:" + password);
242             }
243         }
244         return db.getCollection(collectionName);
245     }
246 
247     protected void closeConnection() {
248         try {
249             if (null != this.mongo) {
250                 this.mongo.close();
251                 this.mongo = null;
252             }
253         } catch (Exception e) {
254             LOG.debug("Exception on close", e);
255         }
256     }
257 }
MongoDBInputFormat.java

  自定义RecordReader

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;public class MongoDBRecordReader<T extends MongoDBWritable> extends RecordReader<LongWritable, T> {private Class<? extends MongoDBWritable> valueClass;private LongWritable key;private T value;private long pos;private Configuration conf;private MongoDBInputFormat.MongoDBInputSplit split;private DBCollection collection;private DBObject conditionQuery;private DBObject fieldQuery;private DBCursor cursor;public MongoDBRecordReader(MongoDBInputFormat.MongoDBInputSplit split, Class<? extends MongoDBWritable> valueClass, Configuration conf, DBCollection collection, DBObject conditionQuery,DBObject fieldQuery) {this.split = split;this.valueClass = valueClass;this.collection = collection;this.conditionQuery = conditionQuery;this.fieldQuery = fieldQuery;this.conf = conf;}@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {// do nothing
    }@SuppressWarnings("unchecked")@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {try {if (key == null) {key = new LongWritable();}if (value == null) {value = (T) ReflectionUtils.newInstance(valueClass, conf);}if (null == cursor) {cursor = executeQuery();}if (!cursor.hasNext()) {return false;}key.set(pos + split.getStart()); // 设置keyvalue.readFields(cursor.next()); // 设置valuepos++;} catch (Exception e) {throw new IOException("Exception in nextKeyValue", e);}return true;}protected DBCursor executeQuery() {try {return collection.find(conditionQuery, fieldQuery).skip((int) split.getStart()).limit((int) split.getLength());} catch (IOException | InterruptedException e) {throw new RuntimeException(e);}}@Overridepublic LongWritable getCurrentKey() throws IOException, InterruptedException {return this.key;}@Overridepublic T getCurrentValue() throws IOException, InterruptedException {return this.value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return pos;}@Overridepublic void close() throws IOException {if (collection != null) {collection.getDB().getMongo().close();}}}
MongoDBRecordReader.java

  自定义OutputFormat&RecordWriter

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;public class MongoDBOutputFormat<K extends MongoDBWritable, V extends MongoDBWritable> extends OutputFormat<K, V> {private static Logger LOG = Logger.getLogger(MongoDBOutputFormat.class);/*** A RecordWriter that writes the reduce output to a MongoDB collection* * @param <K>* @param <T>*/public static class MongoDBRecordWriter<K extends MongoDBWritable, V extends MongoDBWritable> extends RecordWriter<K, V> {private Mongo mongo;private String databaseName;private String collectionName;private MongoDBConfiguration dbConf;private DBCollection dbCollection;private DBObject dbObject;private boolean enableFetchMethod;public MongoDBRecordWriter(MongoDBConfiguration dbConf, Mongo mongo, String databaseName, String collectionName) {this.mongo = mongo;this.databaseName = databaseName;this.collectionName = collectionName;this.dbConf = dbConf;this.enableFetchMethod = this.dbConf.isEnableUseFetchMethod();getDbCollection();// 创建连接
        }protected DBCollection getDbCollection() {if (null == this.dbCollection) {DB db = this.mongo.getDB(this.databaseName);if (this.dbConf.isEnableAuth()) {String username = this.dbConf.getUsername();String password = this.dbConf.getPassword();if (!db.authenticate(username, password.toCharArray())) {throw new RuntimeException("authenticate failure, the username:" + username + ", pwd:" + password);}}this.dbCollection = db.getCollection(this.collectionName);}return this.dbCollection;}@Overridepublic void write(K key, V value) throws IOException, InterruptedException {if (this.enableFetchMethod) {this.dbObject = key.fetchWriteDBObject(null);this.dbObject = value.fetchWriteDBObject(this.dbObject);// 写数据this.dbCollection.insert(this.dbObject);// 在这里可以做一个缓存,一起提交,如果数据量大的情况下。this.dbObject = null;} else {// 直接调用写方法
                key.write(dbCollection);value.write(dbCollection);}}@Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {if (this.mongo != null) {this.dbCollection = null;this.mongo.close();}}}@Overridepublic RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {try {MongoDBConfiguration dbConf = new MongoDBConfiguration(context.getConfiguration());String databaseName = dbConf.getOutputDatabaseName();String collectionName = dbConf.getOutputCollectionName();Mongo mongo = dbConf.getMongoConnection();return new MongoDBRecordWriter<K, V>(dbConf, mongo, databaseName, collectionName);} catch (Exception e) {LOG.error("Create the record writer occur exception.", e);throw new IOException(e);}}@Overridepublic void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {// 不进行检测
    }@Overridepublic OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {// 由于outputcommitter主要作用是提交jar,分配jar的功能。所以我们这里直接使用FileOutputCommitterreturn new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context);}/*** 设置output属性* * @param job* @param databaseName* @param collectionName*/public static void setOutput(Job job, String databaseName, String collectionName) {job.setOutputFormatClass(MongoDBOutputFormat.class);job.setReduceSpeculativeExecution(false);MongoDBConfiguration mdc = new MongoDBConfiguration(job.getConfiguration());mdc.setOutputCollectionName(collectionName);mdc.setOutputDatabaseName(databaseName);}/*** 静止使用fetch方法* * @param conf*/public static void disableFetchMethod(Configuration conf) {conf.setBoolean(MongoDBConfiguration.OUTPUT_USE_FETCH_METHOD_PROPERTY, false);}
}
MongoDBOutputFormat.java

  其他涉及到的java代码

package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat.NullMongoDBWritable;
import com.mongodb.Mongo;
import com.mongodb.ServerAddress;public class MongoDBConfiguration {public static final String BIND_HOST_PROPERTY = "mapreduce.mongo.host";public static final String BIND_PORT_PROPERTY = "mapreduce.mongo.port";public static final String AUTH_ENABLE_PROPERTY = "mapreduce.mongo.auth.enable";public static final String USERNAME_PROPERTY = "mapreduce.mongo.username";public static final String PASSWORD_PROPERTY = "mapreduce.mongo.password";public static final String PARTITION_PROPERTY = "mapreduce.mongo.partition";public static final String INPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.input.database.name";public static final String INPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.input.collection.name";public static final String INPUT_FIELD_NAMES_PROPERTY = "mapreduce.mongo.input.field.names";public static final String INPUT_CONDITIONS_PROPERTY = "mapreduce.mongo.input.conditions";public static final String INPUT_CLASS_PROPERTY = "mapreduce.mongo.input.class";public static final String OUTPUT_DATABASE_NAME_PROPERTY = "mapreduce.mongo.output.database.name";public static final String OUTPUT_COLLECTION_NAME_PROPERTY = "mapreduce.mongo.output.collection.name";// 在recordwriter中到底是否调用fetch方法,默认调用。如果设置为不调用,那么就直接使用writer方法public static final String OUTPUT_USE_FETCH_METHOD_PROPERTY = "mapreduce.mongo.output.use.fetch.method";private Configuration conf;public MongoDBConfiguration(Configuration conf) {this.conf = conf;}/*** 获取Configuration对象* * @return*/public Configuration getConfiguration() {return this.conf;}/*** 设置连接信息* * @param host* @param port* @return*/public MongoDBConfiguration configureDB(String host, int port) {return this.configureDB(host, port, false, null, null);}/*** 设置连接信息* * @param host* @param port* @param enableAuth* @param username* @param password* @return*/public MongoDBConfiguration configureDB(String host, int port, boolean enableAuth, String username, String password) {this.conf.set(BIND_HOST_PROPERTY, host);this.conf.setInt(BIND_PORT_PROPERTY, port);if (enableAuth) {this.conf.setBoolean(AUTH_ENABLE_PROPERTY, true);this.conf.set(USERNAME_PROPERTY, username);this.conf.set(PASSWORD_PROPERTY, password);}return this;}/*** 获取MongoDB的连接对象Connection对象* * @return* @throws UnknownHostException*/public Mongo getMongoConnection() throws UnknownHostException {return new Mongo(new ServerAddress(this.getBindHost(), this.getBindPort()));}/*** 获取设置的host* * @return*/public String getBindHost() {return this.conf.get(BIND_HOST_PROPERTY, "localhost");}/*** 获取设置的port* * @return*/public int getBindPort() {return this.conf.getInt(BIND_PORT_PROPERTY, 27017);}/*** 获取是否开启安全验证,默认的Mongodb是不开启的。* * @return*/public boolean isEnableAuth() {return this.conf.getBoolean(AUTH_ENABLE_PROPERTY, false);}/*** 获取完全验证所需要的用户名* * @return*/public String getUsername() {return this.conf.get(USERNAME_PROPERTY);}/*** 获取安全验证所需要的密码* * @return*/public String getPassword() {return this.conf.get(PASSWORD_PROPERTY);}public String getPartition() {return conf.get(PARTITION_PROPERTY, "|");}public MongoDBConfiguration setPartition(String partition) {conf.set(PARTITION_PROPERTY, partition);return this;}public String getInputDatabaseName() {return conf.get(INPUT_DATABASE_NAME_PROPERTY, "test");}public MongoDBConfiguration setInputDatabaseName(String databaseName) {conf.set(INPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getInputCollectionName() {return conf.get(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "test");}public void setInputCollectionName(String tableName) {conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, tableName);}public String[] getInputFieldNames() {return conf.getStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY);}public void setInputFieldNames(String... fieldNames) {conf.setStrings(MongoDBConfiguration.INPUT_FIELD_NAMES_PROPERTY, fieldNames);}public Map<String, String> getInputConditions() {Map<String, String> result = new HashMap<String, String>();String[] conditions = conf.getStrings(INPUT_CONDITIONS_PROPERTY);if (conditions != null && conditions.length > 0) {String partition = this.getPartition();String[] values = null;for (String condition : conditions) {values = condition.split(partition);if (values != null && values.length == 2) {result.put(values[0], values[1]);} else {result.put(condition, null);}}}return result;}public void setInputConditions(Map<String, String> conditions) {if (conditions != null && conditions.size() > 0) {String[] values = new String[conditions.size()];String partition = this.getPartition();int k = 0;for (Map.Entry<String, String> entry : conditions.entrySet()) {if (entry.getValue() != null) {values[k++] = entry.getKey() + partition + entry.getValue();} else {values[k++] = entry.getKey();}}conf.setStrings(INPUT_CONDITIONS_PROPERTY, values);}}public Class<? extends MongoDBWritable> getValueClass() {return conf.getClass(INPUT_CLASS_PROPERTY, NullMongoDBWritable.class, MongoDBWritable.class);}public void setInputClass(Class<? extends DBWritable> inputClass) {conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, inputClass, DBWritable.class);}public String getOutputDatabaseName() {return conf.get(OUTPUT_DATABASE_NAME_PROPERTY, "test");}public MongoDBConfiguration setOutputDatabaseName(String databaseName) {conf.set(OUTPUT_DATABASE_NAME_PROPERTY, databaseName);return this;}public String getOutputCollectionName() {return conf.get(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, "test");}public void setOutputCollectionName(String tableName) {conf.set(MongoDBConfiguration.OUTPUT_COLLECTION_NAME_PROPERTY, tableName);}public boolean isEnableUseFetchMethod() {return conf.getBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, true);}public void setOutputUseFetchMethod(boolean useFetchMethod) {conf.setBoolean(OUTPUT_USE_FETCH_METHOD_PROPERTY, useFetchMethod);}
}
MongoDBConfiguration.java
package com.gerry.mongo.hadoop2x.mr.mongodb.lib;import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;public interface MongoDBWritable {/*** 往mongodb的集合中写数据* * @param collection* @throws MongoException*/public void write(DBCollection collection) throws MongoException;/*** 获取要写的mongoDB对象* * @param old* @return* @throws MongoException*/public DBObject fetchWriteDBObject(DBObject old) throws MongoException;/*** 从mongodb的集合中读数据* * @param collection* @throws MongoException*/public void readFields(DBObject object) throws MongoException;
}
MongoDBWritable.java
package com.gerry.mongo.hadoop2x.mr.mongodb.nw;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBConfiguration;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBInputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBOutputFormat;
import com.gerry.mongo.hadoop2x.mr.mongodb.lib.MongoDBWritable;
import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoException;public class Demo {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();// 设置输入的mongodb的数据库和集合,以及对应的输入对象value,这里的数据库和集合要求存在,否则是没有数据的,当然没有数据不会出问题conf.set(MongoDBConfiguration.INPUT_COLLECTION_NAME_PROPERTY, "users");conf.set(MongoDBConfiguration.INPUT_DATABASE_NAME_PROPERTY, "db_java");conf.setClass(MongoDBConfiguration.INPUT_CLASS_PROPERTY, DemoInputValueAndOutputKey.class, MongoDBWritable.class);Job job = Job.getInstance(conf, "mongodb-demo");job.setJarByClass(Demo.class);job.setMapperClass(DemoMapper.class);job.setReducerClass(DemoReducer.class);job.setOutputKeyClass(DemoInputValueAndOutputKey.class);job.setOutputValueClass(DemoOutputValue.class);job.setMapOutputKeyClass(DemoInputValueAndOutputKey.class);job.setMapOutputValueClass(NullWritable.class);job.setInputFormatClass(MongoDBInputFormat.class);MongoDBOutputFormat.setOutput(job, "foobar2", "users"); // 这个可以不存在
job.waitForCompletion(true);}public static class DemoOutputValue implements Writable, MongoDBWritable {private Date clientTime;private long count;@Overridepublic void write(DBCollection collection) throws MongoException {throw new UnsupportedOperationException();}@Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder = null;Set<String> keys = new HashSet<String>();if (old != null) {keys = old.keySet();builder = BasicDBObjectBuilder.start(old.toMap());} else {builder = new BasicDBObjectBuilder();}// 添加当前对象的value值,如果存在同样的key,那么加序号builder.append(getKey(keys, "time", 0), clientTime).append(getKey(keys, "count", 0), this.count);return builder.get();}@Overridepublic void readFields(DBObject object) throws MongoException {throw new UnsupportedOperationException();}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(this.clientTime.getTime());out.writeLong(this.count);}@Overridepublic void readFields(DataInput in) throws IOException {this.clientTime = new Date(in.readLong());this.count = in.readLong();}public Date getClientTime() {return clientTime;}public void setClientTime(Date clientTime) {this.clientTime = clientTime;}public long getCount() {return count;}public void setCount(long count) {this.count = count;}}public static class DemoInputValueAndOutputKey implements MongoDBWritable, WritableComparable<DemoInputValueAndOutputKey> {private String name;private Integer age;private String sex;@Overridepublic void write(DataOutput out) throws IOException {if (this.name == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.name);}if (this.age == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeInt(this.age);}if (this.sex == null) {out.writeBoolean(false);} else {out.writeBoolean(true);out.writeUTF(this.sex);}}@Overridepublic void readFields(DataInput in) throws IOException {this.name = in.readBoolean() ? in.readUTF() : null;this.age = in.readBoolean() ? Integer.valueOf(in.readInt()) : null;this.sex = in.readBoolean() ? in.readUTF() : null;}@Overridepublic void write(DBCollection collection) throws MongoException {DBObject object = new BasicDBObject();object.put("name", this.name);object.put("age", this.age.intValue());object.put("sex", this.sex);collection.insert(object);}@Overridepublic void readFields(DBObject object) throws MongoException {this.name = (String) object.get("name");this.age = (Integer) object.get("age");this.sex = (String) object.get("sex");}@Overridepublic DBObject fetchWriteDBObject(DBObject old) throws MongoException {BasicDBObjectBuilder builder = null;Set<String> keys = new HashSet<String>();if (old != null) {keys = old.keySet();builder = BasicDBObjectBuilder.start(old.toMap());} else {builder = new BasicDBObjectBuilder();}// 添加当前对象的value值,如果存在同样的key,那么加序号if (this.name != null) {builder.append(getKey(keys, "name", 0), this.name);}if (this.age != null) {builder.append(getKey(keys, "age", 0), this.age.intValue());}if (this.sex != null) {builder.append(getKey(keys, "sex", 0), this.sex);}return builder.get();}@Overridepublic String toString() {return "DemoInputValue [name=" + name + ", age=" + age + ", sex=" + sex + "]";}@Overridepublic int compareTo(DemoInputValueAndOutputKey o) {int tmp;if (this.name == null) {if (o.name != null) {return -1;}} else if (o.name == null) {return 1;} else {tmp = this.name.compareTo(o.name);if (tmp != 0) {return tmp;}}if (this.age == null) {if (o.age != null) {return -1;}} else if (o.age == null) {return 1;} else {tmp = this.age - o.age;if (tmp != 0) {return tmp;}}if (this.sex == null) {if (o.sex != null) {return -1;}} else if (o.sex == null) {return 1;} else {return this.sex.compareTo(o.sex);}return 0;}}/*** 直接输出* * @author jsliuming* */public static class DemoMapper extends Mapper<LongWritable, DemoInputValueAndOutputKey, DemoInputValueAndOutputKey, NullWritable> {@Overrideprotected void map(LongWritable key, DemoInputValueAndOutputKey value, Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());}}/*** 写出数据,只做一个统计操作* * @author jsliuming* */public static class DemoReducer extends Reducer<DemoInputValueAndOutputKey, NullWritable, DemoInputValueAndOutputKey, DemoOutputValue> {private DemoOutputValue outputValue = new DemoOutputValue();@Overrideprotected void reduce(DemoInputValueAndOutputKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {long sum = 0;for (@SuppressWarnings("unused")NullWritable value : values) {sum++;}outputValue.setClientTime(new Date());outputValue.setCount(sum);context.write(key, outputValue);}}/*** 转换key,作用是当key存在keys集合中的时候,在key后面添加序号* * @param keys* @param key* @param index* @return*/public static String getKey(Set<String> keys, String key, int index) {while (keys.contains(key)) {key = key + (index++);}return key;}
}
Demo

四、结果截图

 

转载于:https://www.cnblogs.com/liuming1992/p/4758504.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/258450.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PS 色调——老照片效果

这就是通过调色使照片显得发黄。 R_new0.393*R0.769*G0.189*B; G_new0.349*R0.686*G0.168*B; B_new0.272*R0.534*G0.131*B; clc; clear all; Imageimread(9.jpg); Imagedouble(Image); Image_newImage; Image_new(:,:,1)0.393*Image(:,:,1)0.769*Image(:,:,2)0.189*Image(:,:,3…

jsp出现错误

昨天在调试页面时发生了如图显示的异常&#xff0c;它出现的原因是当<jsp:forward>或<jsp:include>标签没有参数时&#xff0c;开始标签和结束标签</jsp:forward>或</jsp:include>之间不能有空格&#xff0c;不能换行。解决办法&#xff1a;删除标签之…

门限回归模型的思想_Stata+R:门槛回归教程

来源 | 数量经济学综合整理转载请联系进行回归分析&#xff0c;一般需要研究系数的估计值是否稳定。很多经济变量都存在结构突变问题&#xff0c;使用普通回归的做法就是确定结构突变点&#xff0c;进行分段回归。这就像我们高中学习的分段函数。但是对于大样本、面板数据如何寻…

二阶传递函数的推导及几种求解方法的比较

二阶系统是指那些可用二阶微分方程描述的系统&#xff0c;其电路形式是由两个独立动态元器件组成的电路。 二阶系统电路包括二阶低通电路、二阶高通电路、二阶带通电路和二阶带阻电路。 下面分别给出以上二阶系统传递函数的推导过程&#xff0c;并以二阶低通电路的冲激响应为例…

前端技术-调试工具(上)

页面制作之调试工具 常用的调试工具有Chrome浏览器的调试工具&#xff0c;火狐浏览器的Firebug插件调试工具&#xff0c;IE的开发人员工具等。它们的功能与使用方法大致相似。Chrome浏览器简洁快速&#xff0c;功能强大这里主要介绍Chrome浏览器的调试工具。 打开 Google Chrom…

新版Microsoft Edge支持跨平台跨设备浏览

之前一直使用Google Chrome浏览器&#xff0c;可以随意安装插件扩展程序&#xff0c;无广告&#xff0c;这是我钟爱她的原因。但是之后不能登录Google账号&#xff0c;不能实现跨设备应用&#xff0c;就想找一款好用的替代品&#xff0c;近期发现了新版的Microsoft Edge&#x…

百度网盘7.3.1.10版本增加工作空间功能,可实现百度网盘与电脑文件夹同步

百度网盘新增的工作空间是一款文件同步的产品&#xff0c;支持电脑本地与云端之间的文件同步&#xff0c;多设备间文件自动保持同步、支持查看文件每次都修改的历史版本。功能类似于onedrive。如果有同步需求的小伙伴可以尝试下载最新版的百度网盘试用该功能哦。下载网址&#…

ubuntu+idea intellij配置android开发环境

最近对移动开发产生兴趣&#xff0c;决定在未来几年内利用空余时间开发一些app或游戏什么的&#xff0c;鉴于ios开发成本较高&#xff0c;且自身对java相对熟悉&#xff0c;因此选择了学习android。都说android市场不很很好&#xff0c;收益较难&#xff0c;但是仍觉得只要功夫…

LTI系统的物理可实现性与希尔伯特变换

产品的设计一般为线性时不变系统&#xff0c;要求系统具有物理可实现性&#xff0c;从时域上看&#xff0c;h(t)具有因果性&#xff1b;从频域上看&#xff0c;|H(jw)|符合佩利—维纳准则。任何具有因果性的系统&#xff0c;|H(jw)|的实部R(w)满足希尔伯特变换&#xff0c;|H(j…

垂死挣扎还是涅槃重生 -- Delphi XE5 公布会归来感想

Delphi 是一个基本上被我遗忘的工具&#xff0c; 要不是在使用RapidSql , 我是收不到Embarcadero 公司发出的邀请来參加Delphi XE5的公布会的。 有人可能要问为什么是Embarcadero &#xff08;名称很拗口&#xff09;而不是Borland 开Delphi 公布会&#xff0c; 这是由于Borla…

ubuntu下安装国际版QQ

在网上看到了好多的ubuntu下安装QQ的方法 好多 下面是看别人的文章 来测试的一篇 ubuntu下 安装国际版QQhttp://www.ubuntukylin.com/applications/showimg.php?langcn&id23下载 地址网盘:http://yun.baidu.com/share/link?shareid2983202140&uk202032639下载好以后 …

傅里叶变换应用——信号调制与解调

傅里叶变换的典型应用主要用于通信的信号调制与解调&#xff0c;信号调制的目的是将信号进行变换&#xff0c;使其便于传输。频率调制是将低频信号调制到高频载波信号上。同步信号解调是接受系统产生同步的高频载波信号进行解调&#xff0c;从调制信号中恢复原信号的过程。调制…

连续时间系统与离散时间系统的时域分析对比

通过学习离散时间系统的时域分析&#xff0c;发现其与连续时间系统的时域分析有很多相似之处&#xff0c;自己做了一个专题拓展&#xff0c;从数学模型描述到时域分析方法对两大系统进行横向对比&#xff0c;总结两者之间的联系和异同点。

[SQL Server]重命名数据库【转】

原文链接&#xff1a;http://www.cnblogs.com/Ryan_j/archive/2011/04/03/2004428.html 重命名数据库很简单&#xff0c;选择数据库--右键--重命名数据库 或者 sp_renamedb oldDB ,newDB 但是你再新建的相同名字的数据库就会报错&#xff0c;提示数据库已经存在 比如test数据库…

DCOS实践分享(4):如何基于DC/OS整合SMACK(Spark, Mesos, Akka, Cassandra, Kafka)

这篇文章入选CSDN极客头条 http://geek.csdn.net/news/detail/71572 当前&#xff0c;要保证业务的市场竞争力&#xff0c;仅靠设计一个可用并且好看的产品&#xff0c;已经完全不能满足要求。全球消费者都希望产品能够足够的智能化&#xff0c;通过大数据分析来改善他们的用户…

连续系统的卷积积分与离散系统的卷积和

在LTI连续系统中&#xff0c;以冲激函数为基本信号&#xff0c;将任意信号分解&#xff0c;从而得到连续系统的零状态响应等于激励与系统冲激响应的卷积积分 &#x1d466;&#x1d467;&#x1d460;&#x1d461;&#x1d453;&#x1d461;∗h&#x1d461; 在LTI离散…

【数据结构】图的深度优先搜索

图的深度优先搜索类似于树的深度优先搜索。不同的是&#xff0c;图中可能包括循环&#xff0c;即我们有可能重复访问节点。为了避免访问已经访问过的节点&#xff0c;我们要使用一个布尔变量的数组。 例如&#xff0c;在下图中&#xff0c;我们从节点2开始访问。当访问到节点0&…

菜鸟超级进口大仓618首度亮相!跨境商品也能当日次日达

6月12日下午3点40分&#xff0c;来自南京的一名用户收到了由宁波保税仓发出、圆通速递配送的雀巢咖啡&#xff0c;这距离他在天猫国际上下单仅过去4小时。 天猫618在昨日迎来进口日&#xff0c;进口销量火爆上升。作为国内最为先进的跨境进口仓&#xff0c;菜鸟超级大仓在本次大…

频域/s域/z域三大变换的发展史及其联系

本文主要介绍三大变换&#xff08;傅里叶变换、拉普拉斯变换及Z变换&#xff09;的发展史及其之间的联系。

Tomcat8.0.21登录时忘记用户名和密码

大概是这学期开学没多久吧&#xff0c;4月份的时候&#xff0c;为了学习javaEE&#xff0c;装了Tomcat。过了这么久早就忘记用户名和密码了&#xff0c;所以无法进入Tomcat的管理界面。百度&#xff08;其实我也很想用google&#xff09;了一堆&#xff0c;几乎都是修改用户配置…