第1关 统计每个城市的宾馆平均价格
package com.processdata;import java.io.IOException;
import java.util.Scanner;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import com.util.HBaseUtil;/*** 使用MapReduce程序处理HBase中的数据并将最终结果存入到另一张表 1中*/
public class HBaseMapReduce extends Configured implements Tool {public static class MyMapper extends TableMapper<Text, DoubleWritable> {public static final byte[] column = "price".getBytes();public static final byte[] family = "hotel_info".getBytes();@Overrideprotected void map(ImmutableBytesWritable rowKey, Result result, Context context)throws IOException, InterruptedException {/********** Begin *********///获取酒店价格String cityId = Bytes.toString(result.getValue("cityIdInfo".getBytes(),"cityId".getBytes()));byte[] value = result.getValue(family,column);//将价格转换为double Double hotel = Double.parseDouble(Bytes.toString(value));//将价格转化成()类型DoubleWritable i = new DoubleWritable(hotel);String key = cityId; //写出城市(id,酒店价格)context.write(new Text(key),i);// String cityId = Bytes.toString(result.getValue("cityInfo".getBytes(), "cityId".getBytes())); // byte[] value = result.getValue(family, column);// //获取酒店价格// //String cityId1 = Bytes.toString(result.getValue("hotel_info".getBytes(), "price".getBytes())); // //将价格转换为double// Double ho =Double.parseDouble(Bytes.toString(value));// //将价格转换成()类型// DoubleWritable i = new DoubleWritable(ho);// String key = cityId;// //写出(城市id,酒店价格)// context.write(new Text(key),i);/********** End *********/}}public static class MyTableReducer extends TableReducer<Text, DoubleWritable, ImmutableBytesWritable> {@Overridepublic void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {/********** Begin *********/double sum = 0;int count = 0; for (DoubleWritable num:values){ count++; sum += num.get(); }double avePrice = sum / count; Put put = new Put(Bytes.toBytes(key.toString())); put.addColumn("average_infos".getBytes(),"price".getBytes(),Bytes.toBytes(String.valueOf(avePrice))); context.write(null,put);//initTableReducerJob 设置了表名所以在这里无需设置了// double sum=0;// int count=0;// for(DoubleWritable value:values){// count++;// sum+=value.get();// }// double avePrice=sum/count;// //创建pit对象// Put put =new Put(Bytes.toBytes(key.toString()));// put.addColumn("average_infos".getBytes(),"price".getBytes(),Bytes.toBytes(String.valueOf(avePrice)));// context.write(null,put); /********** End *********/}}public int run(String[] args) throws Exception {//配置JobConfiguration conf = HBaseConfiguration.create(getConf());conf.set("hbase.zookeeper.quorum", "127.0.0.1"); //hbase 服务地址conf.set("hbase.zookeeper.property.clientPort", "2181"); //端口号Scanner sc = new Scanner(System.in);String arg1 = sc.next();String arg2 = sc.next();//String arg1 = "t_city_hotels_info";//String arg2 = "average_table";try {HBaseUtil.createTable("average_table", new String[] {"average_infos"});} catch (Exception e) {// 创建表失败e.printStackTrace();}Job job = configureJob(conf,new String[]{arg1,arg2});return job.waitForCompletion(true) ? 0 : 1;}private Job configureJob(Configuration conf, String[] args) throws IOException {String tablename = args[0];String targetTable = args[1];Job job = new Job(conf,tablename);Scan scan = new Scan();scan.setCaching(300);scan.setCacheBlocks(false);//在mapreduce程序中千万不要设置允许缓存//初始化Mapreduce程序TableMapReduceUtil.initTableMapperJob(tablename,scan,MyMapper.class, Text.class, DoubleWritable.class,job);//初始化ReduceTableMapReduceUtil.initTableReducerJob(targetTable, // output tableMyTableReducer.class, // reducer classjob);job.setNumReduceTasks(1);return job;}
}
第2关 统计酒店评论中词频较高的词
package com.processdata;
import java.io.IOException;
import java.util.List;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apdplat.word.WordSegmenter;
import org.apdplat.word.segmentation.Word;
import com.util.HBaseUtil;
import com.vdurmont.emoji.EmojiParser;/*** 词频统计**/
public class WorldCountMapReduce extends Configured implements Tool {public static class MyMapper extends TableMapper<Text, IntWritable> {private static byte[] family = "comment_info".getBytes();private static byte[] column = "content".getBytes();@Overrideprotected void map(ImmutableBytesWritable rowKey, Result result, Context context)throws IOException, InterruptedException {/********** Begin *********/// String content = Bytes.toString(result.getValue("comment_info".getBytes(),"content".getBytes()));// byte[] content = result.getValue(family,column);// if(content != null && content.isEmpty()){// content=EmojiParser.removeAllEmojis(content);// List<Word> words = WordSegmenter.seg(content);// IntWritable intWritable = new IntWritable(1);// for (Word word : words) {// Text text = new Text(word.getText());// //5、写入到context// context.write(text,intWritable);// } // }byte[] value = result.getValue(family, column);String word = new String(value,"utf-8");if(!word.isEmpty()){String filter = EmojiParser.removeAllEmojis(word);List<Word> segs = WordSegmenter.seg(filter);for(Word cont : segs) {Text text = new Text(cont.getText());IntWritable v = new IntWritable(1);context.write(text,v);}}/********** End *********/}}public static class MyReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {private static byte[] family = "word_info".getBytes();private static byte[] column = "count".getBytes();@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {/********** Begin *********///1.统计每一个分词的次数// int count = 0;// for (IntWritable value:values){// count +=value.get();// }// //2.构建一个v3 是put RowKey: 分词// Put put = new Put(Bytes.toBytes(key.toString()));// //添加列// put.addColumn(family,column,Bytes.toBytes(coumt));// //3.把k3,v3 写入到context// context.write(null,put);int sum = 0;for (IntWritable value : values) {sum += value.get();}Put put = new Put(Bytes.toBytes(key.toString()));put.addColumn(family,column,Bytes.toBytes(sum));context.write(null,put);/********** End *********/}}public int run(String[] args) throws Exception {//配置JobConfiguration conf = HBaseConfiguration.create(getConf());conf.set("hbase.zookeeper.quorum", "127.0.0.1"); //hbase 服务地址conf.set("hbase.zookeeper.property.clientPort", "2181"); //端口号Scanner sc = new Scanner(System.in);String arg1 = sc.next();String arg2 = sc.next();try {HBaseUtil.createTable("comment_word_count", new String[] {"word_info"});} catch (Exception e) {// 创建表失败e.printStackTrace();}Job job = configureJob(conf,new String[]{arg1,arg2});return job.waitForCompletion(true) ? 0 : 1;}private Job configureJob(Configuration conf, String[] args) throws IOException {String tablename = args[0];String targetTable = args[1];Job job = new Job(conf,tablename);Scan scan = new Scan();scan.setCaching(300);scan.setCacheBlocks(false);//在mapreduce程序中千万不要设置允许缓存//初始化Mapper Reduce程序TableMapReduceUtil.initTableMapperJob(tablename,scan,MyMapper.class, Text.class, IntWritable.class,job);TableMapReduceUtil.initTableReducerJob(targetTable,MyReducer.class,job);job.setNumReduceTasks(1);return job;}}