import org.apache.hadoop.util.bloom.BloomFilter; //导入方法依赖的package包/类
@Override
public int run(String[] args) throws Exception {
if (args.length != 4) {
System.err
.println("Usage: Trainer ");
return 1;
}
// Parse command line arguments
Path inputFile = new Path(args[0]);
int numMembers = Integer.parseInt(args[1]);
float falsePosRate = Float.parseFloat(args[2]);
Path bfFile = new Path(args[3]);
// TODO Create a new Jedis object using localhost at port 6379
jedis = new Jedis("localhost", 6379);
// TODO delete the REDIS_SET_KEY
jedis.del(REDIS_SET_KEY);
// TODO Create a new Bloom filter
BloomFilter filter = createBloomFilter(numMembers, falsePosRate);
// TODO open the file for read
FileSystem fs = FileSystem.get(getConf());
String line = null;
int numRecords = 0;
BufferedReader rdr = new BufferedReader(new InputStreamReader(
fs.open(inputFile)));
while ((line = rdr.readLine()) != null) {
// TODO if the line is not empty
if (!line.isEmpty()) {
// TODO add the line to the Bloom filter
filter.add(new Key(line.getBytes()));
// TODO use Jedis client's "sadd" method to set
jedis.sadd(REDIS_SET_KEY, line);
// TODO increment numRecords
++numRecords;
}
}
// TODO Close reader, disconnect Jedis client
rdr.close();
jedis.disconnect();
System.out.println("Trained Bloom filter with " + numRecords
+ " entries.");
System.out.println("Serializing Bloom filter to HDFS at " + bfFile);
// TODO create anew FSDataOutputStream using the FileSystem
FSDataOutputStream strm = fs.create(bfFile);
// TODO pass the stream to the Bloom filter
filter.write(strm);
// TODO close the stream
strm.flush();
strm.close();
System.out.println("Done training Bloom filter.");
return 0;
}