电商推荐系统

在这里插入图片描述

此篇博客主要记录一下商品推荐系统的主要实现过程。

一、获取用户对商品的偏好值

在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;public class GoodsStep1 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep1(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");// 将行为转化为偏好值double like = 0.0;if (split.length >= 3) {String str = split[2].toLowerCase();if (str.equals("paysuccess")) { // 支付成功like = 0.3;} else if (str.equals("addreview")) { //评论like = 0.3;} else if (str.equals("createorder")) { // 创建订单like = 0.2;} else if (str.equals("addcar")){ // 加入购物车like = 0.15;} else { // 浏览like = 0.05;}}// key=用户:商品 value=[偏好,偏好]Text outkey = new Text(split[0] + ":" + split[1]);DoubleWritable outvalue = new DoubleWritable(like);context.write(outkey, outvalue);}}public static class GS1Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {// 避免精度丢失选用bigDecimalBigDecimal sum = new BigDecimal(0.0);for (DoubleWritable value : values) {BigDecimal v = new BigDecimal(value.get());sum = sum.add(v);}DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());context.write(key, outvalue);}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step1");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep1.GS1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DoubleWritable.class);//  默认reducejob.setReducerClass(GoodsStep1.GS1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("data/userLog.log"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step1"));// 6、运行job.waitForCompletion(true);return 0;}
}

二、将偏好数据整理成偏好矩阵

在这里插入图片描述
在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep2 extends Configured implements Tool {public static class GS2Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String uid_gid = key.toString();String[] split = uid_gid.split(":");// 将商品id作为输出keyText outkey = new Text(split[1]);// 将用户id与偏好值组合形成valueText outvalue = new Text(split[0] + ":" + value.toString());context.write(outkey, outvalue);}}public static class GS2Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuffer buffer = new StringBuffer();for (Text value : values) {buffer.append(value.toString()).append(",");}buffer.setLength(buffer.length() - 1);context.write(key, new Text(buffer.toString()));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep2(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");Job job = Job.getInstance(conf, "step2");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep2.GS2Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep2.GS2Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step2"));// 6、运行job.waitForCompletion(true);return 0;}
}

三、统计商品共现次数

在这里插入图片描述

代码实现

笛卡尔积
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.util.ArrayList;public class GoodsStep3 extends Configured implements Tool {public static class GS3Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String uid_gid = key.toString();String[] split = uid_gid.split(":");Text outkey = new Text(split[0]);Text outvalue = new Text(split[1]);context.write(outkey, outvalue);}}public static class GS3Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {ArrayList<String> list = new ArrayList<>();for (Text value:values) {list.add(value.toString());}for (String g1 : list) {for (String g2:list) {if (!g1.equals(g2)) {Text outkey = new Text(g1);Text outvalue = new Text(g2);context.write(outkey, outvalue);}}}}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep3(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");Job job = Job.getInstance(conf, "step3");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS3Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GS3Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step3"));// 6、运行job.waitForCompletion(true);return 0;}
}
共现次数
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep4 extends Configured implements Tool {public static class GS4Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");String outkey = split[0] + ":" + split[1];context.write(new Text(outkey), new IntWritable(1));}}public static class GS4Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable i : values) {sum += 1;}context.write(key, new IntWritable(sum));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep4(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step4");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS4Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//  默认reducejob.setReducerClass(GS4Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("src/main/resources/step3/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step4"));// 6、运行job.waitForCompletion(true);return 0;}
}

四、获取商品共现矩阵

在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep5 extends Configured implements Tool {public static class GS5Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String goods = key.toString();String[] split = goods.split(":");// key为第一列商品,value为第二列商品:次数Text outkey = new Text(split[0]);Text outvalue = new Text(split[1] + ":" + value.toString());context.write(outkey, outvalue);}}public static class GS5Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuffer buffer = new StringBuffer();for (Text value : values) {buffer.append(value.toString()).append(",");}buffer.setLength(buffer.length() - 1);context.write(key, new Text(buffer.toString()));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep5(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step5");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS5Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GS5Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step4/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step5"));// 6、运行job.waitForCompletion(true);return 0;}
}

五、获取推荐值

在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;public class GoodsStep6 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep6(), args);} catch (Exception e) {e.printStackTrace();}}/*** 第二步* 375	11:0.25,5:0.25,4:0.55* 商品 用户:偏好值* 第五步* 375	203:1,961:1,91:1,90:2,89:1* 商品 商品:共现次数* 输出数据:* 用户:商品 推荐值*/public static class GS6Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");for (String str : split) {// key=商品 value={用户:偏好值,商品:共现次数}context.write(key, new Text(str));}}}public static class GS6Reducer extends Reducer<Text, Text, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {// 偏好集合[用户:偏好]HashMap<String, String> like = new HashMap<>();// 共现集合[商品:共现次数]HashMap<String, String> same = new HashMap<>();for (Text value : values) {String data = value.toString();String[] split = data.split(":");if (split[1].contains(".")) {like.put(split[0], split[1]);} else {same.put(split[0], split[1]);}}for (Map.Entry<String, String> l : like.entrySet()) {for (Map.Entry<String, String> s : same.entrySet()) {//用户偏好值BigDecimal lvalue = new BigDecimal(l.getValue());//商品共现BigDecimal svalue = new BigDecimal(s.getValue());//用户:共现商品Text outkey = new Text(l.getKey() + ":" + s.getKey());double outvalue = lvalue.multiply(svalue).doubleValue();context.write(outkey, new DoubleWritable(outvalue));}}}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step6");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep6.GS6Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep6.GS6Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step2"),new Path("src/main/resources/step5"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step6"));// 6、运行job.waitForCompletion(true);return 0;}
}

六、推荐值累加及数据清洗

在这里插入图片描述

代码实现

推荐值累加
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;public class GoodsStep7 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep7(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS7Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {context.write(key, value);}}public static class GS7Reducer extends Reducer<Text, Text, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {BigDecimal sum = new BigDecimal(0.0);for (Text value : values) {BigDecimal v = new BigDecimal(value.toString());sum = sum.add(v);}DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());context.write(key, outvalue);}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step7");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep7.GS7Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep7.GS7Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step6"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step7"));// 6、运行job.waitForCompletion(true);return 0;}
}
数据清洗
统计已经支付成功一次的数据
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep8 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep8(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS8Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");boolean paySuccess = split[2].toLowerCase().equals("paysuccess");if (paySuccess) {context.write(new Text(split[0] + ":" + split[1]), new IntWritable(1));}}}public static class GS8Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int num = 0;for (IntWritable i : values) {num ++;}if (num == 1) context.write(key, new IntWritable(num));}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step8.1");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep8.GS8Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//  默认reducejob.setReducerClass(GoodsStep8.GS8Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("data/userLog.log"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8"));// 6、运行job.waitForCompletion(true);return 0;}
}
在整理出来的数据中去除统计出来支付成功的
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.util.Iterator;public class GoodsStep8_2 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep8_2(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS8_1Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {context.write(key, value);}}public static class GS8_1Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//            int num = 0;
//            String outvalue = "";
//            for (Text value : values) {
//                outvalue = value.toString();
//                num ++;
//            }
//            if (num == 1) context.write(key, new Text(outvalue));Iterator<Text> iter = values.iterator();Text outvalue = iter.next();if (iter.hasNext()) {}else {context.write(key, outvalue);}}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step8.2");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step7"),new Path("src/main/resources/step8"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8_2"));// 6、运行job.waitForCompletion(true);return 0;}
}

七、写入数据库

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;public class GoodsStep9 {public static void main(String[] args) {try {toDB();} catch (Exception e) {e.printStackTrace();}}public static void toDB() throws Exception {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Class.forName("com.mysql.cj.jdbc.Driver");Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmall?serverTimezone=Asia/Shanghai", "briup", "briup");Statement statement = null;FSDataInputStream open = fs.open(new Path("src/main/resources/step8_2/part-r-00000"));BufferedReader br = new BufferedReader(new InputStreamReader(open));String line = "";while ((line = br.readLine()) != null) {// 11:512	0.25// 用户:商品 推荐值String[] str = line.split("\t");String[] uid_gid = str[0].split(":");statement = conn.createStatement();String sql = "delete from recommend where customerId = '" + uid_gid[0] + "' and bookId = '" + uid_gid[1] + "'";String sql2 = "insert into recommend(customerId, bookId, recommendNum) values ('"+ uid_gid[0] + "','" + uid_gid[1] + "'," + str[1] + ")";statement.addBatch(sql);statement.addBatch(sql2);statement.executeBatch();}}
}

八、工作流

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class GoodsMain extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsMain(), args);GoodsStep9.toDB();} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();
//        String inpath = new String("inpath");
//        Path path = new Path(conf.get(inpath));Path path = new Path("data/userLog.log");Path outpath =  new Path("src/main/resources/step1");Path outpath2 = new Path("src/main/resources/step2");Path outpath3 = new Path("src/main/resources/step3");Path outpath4 = new Path("src/main/resources/step4");Path outpath5 = new Path("src/main/resources/step5");Path outpath6 = new Path("src/main/resources/step6");Path outpath7 = new Path("src/main/resources/step7");Path outpath8 = new Path("src/main/resources/step8");Path outpath9 = new Path("src/main/resources/step8_2");//获取所有mr步骤job配置//step1Job job = Job.getInstance(conf);job.setJarByClass(this.getClass());job.setMapperClass(GoodsStep1.GS1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DoubleWritable.class);job.setReducerClass(GoodsStep1.GS1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,path);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,outpath);//step8Job job8 = Job.getInstance(conf);job8.setJarByClass(this.getClass());job8.setMapperClass(GoodsStep8.GS8Mapper.class);job8.setMapOutputKeyClass(Text.class);job8.setMapOutputValueClass(IntWritable.class);job8.setReducerClass(GoodsStep8.GS8Reducer.class);job8.setOutputKeyClass(Text.class);job8.setOutputValueClass(IntWritable.class);job8.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job8,path);job8.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job8,outpath8);//step2Job job2 = Job.getInstance(conf);job2.setJarByClass(this.getClass());job2.setMapperClass(GoodsStep2.GS2Mapper.class);job2.setMapOutputKeyClass(Text.class);job2.setMapOutputValueClass(Text.class);job2.setReducerClass(GoodsStep2.GS2Reducer.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(Text.class);job2.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job2,outpath);job2.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job2,outpath2);//step3bookIdJob job3 = Job.getInstance(conf);job3.setJarByClass(this.getClass());job3.setMapperClass(GoodsStep3.GS3Mapper.class);job3.setMapOutputKeyClass(Text.class);job3.setMapOutputValueClass(Text.class);job3.setReducerClass(GoodsStep3.GS3Reducer.class);job3.setOutputKeyClass(Text.class);job3.setOutputValueClass(Text.class);job3.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job3,outpath);job3.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job3,outpath3);//step4Job job4 = Job.getInstance(conf);job4.setJarByClass(this.getClass());job4.setMapperClass(GoodsStep4.GS4Mapper.class);job4.setMapOutputKeyClass(Text.class);job4.setMapOutputValueClass(IntWritable.class);job4.setReducerClass(GoodsStep4.GS4Reducer.class);job4.setOutputKeyClass(Text.class);job4.setOutputValueClass(IntWritable.class);job4.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job4,outpath3);job4.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job4,outpath4);//step5Job job5 = Job.getInstance(conf);job5.setJarByClass(this.getClass());job5.setMapperClass(GoodsStep5.GS5Mapper.class);job5.setMapOutputKeyClass(Text.class);job5.setMapOutputValueClass(Text.class);job5.setReducerClass(GoodsStep5.GS5Reducer.class);job5.setOutputKeyClass(Text.class);job5.setOutputValueClass(Text.class);job5.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job5,outpath4);job5.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job5,outpath5);//step6Job job6 = Job.getInstance(conf);job6.setJarByClass(this.getClass());job6.setMapperClass(GoodsStep6.GS6Mapper.class);job6.setMapOutputKeyClass(Text.class);job6.setMapOutputValueClass(Text.class);job6.setReducerClass(GoodsStep6.GS6Reducer.class);job6.setOutputKeyClass(Text.class);job6.setOutputValueClass(DoubleWritable.class);job6.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job6,outpath2,outpath5);job6.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job6,outpath6);//step7Job job7 = Job.getInstance(conf);job7.setJarByClass(this.getClass());job7.setMapperClass(GoodsStep7.GS7Mapper.class);job7.setMapOutputKeyClass(Text.class);job7.setMapOutputValueClass(Text.class);job7.setReducerClass(GoodsStep7.GS7Reducer.class);job7.setOutputKeyClass(Text.class);job7.setOutputValueClass(DoubleWritable.class);job7.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job7,outpath6);job7.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job7,outpath7);//step9Job job9 = Job.getInstance(conf);job9.setJarByClass(this.getClass());job9.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);job9.setMapOutputKeyClass(Text.class);job9.setMapOutputValueClass(Text.class);job9.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);job9.setOutputKeyClass(Text.class);job9.setOutputValueClass(Text.class);job9.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job9,outpath7,outpath8);job9.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job9,outpath9);//创建可控作业ControlledJob cj = new ControlledJob(conf);cj.setJob(job);ControlledJob cj2 = new ControlledJob(conf);cj2.setJob(job2);ControlledJob cj3 = new ControlledJob(conf);cj3.setJob(job3);ControlledJob cj4 = new ControlledJob(conf);cj4.setJob(job4);ControlledJob cj5 = new ControlledJob(conf);cj5.setJob(job5);ControlledJob cj6 = new ControlledJob(conf);cj6.setJob(job6);ControlledJob cj7 = new ControlledJob(conf);cj7.setJob(job7);ControlledJob cj8 = new ControlledJob(conf);cj8.setJob(job8);ControlledJob cj9 = new ControlledJob(conf);cj9.setJob(job9);//添加作业间的依赖关系cj2.addDependingJob(cj);cj3.addDependingJob(cj);cj4.addDependingJob(cj3);cj5.addDependingJob(cj4);cj6.addDependingJob(cj2);cj6.addDependingJob(cj5);cj7.addDependingJob(cj6);cj9.addDependingJob(cj7);cj9.addDependingJob(cj8);//创建工作流,创建控制器JobControl jobs = new JobControl("work_flow");jobs.addJob(cj);jobs.addJob(cj2);jobs.addJob(cj3);jobs.addJob(cj4);jobs.addJob(cj5);jobs.addJob(cj6);jobs.addJob(cj7);jobs.addJob(cj8);jobs.addJob(cj9);//启动控制器-》一键完成所有mr计算任务Thread t=new Thread(jobs);t.start();while(true){if(jobs.allFinished()){System.out.println("作业全部完成");System.out.println(jobs.getSuccessfulJobList());jobs.stop();return 0;}else if(jobs.getFailedJobList().size()>0) {System.out.println("任务失败");System.out.println(jobs.getFailedJobList());jobs.stop();return -1;}}}
}

总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/669454.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVA代理模式详解

代理模式 1 代理模式介绍 在软件开发中,由于一些原因,客户端不想或不能直接访问一个对象,此时可以通过一个称为"代理"的第三者来实现间接访问.该方案对应的设计模式被称为代理模式. 代理模式(Proxy Design Pattern ) 原始定义是&#xff1a;让你能够提供对象的替代…

LEETCODE 75. 颜色分类

class Solution { public:void sortColors(vector<int>& nums) {//先定0int i,j;i0;j0;int nnums.size();while(j<n){if(nums[j]0){int tmpnums[j];nums[j]nums[i];nums[i]tmp;j1;i1;}else{j1;}}//对[i,n]处理&#xff0c;定1int i1i;ji1;while(j<n){if(nums[j…

全新 鸿蒙系统

一&#xff0c; 开发框架 基础 二&#xff0c; 官网地址 文档开发&#xff1a;华为HarmonyOS智能终端操作系统官网 | 应用设备分布式开发者生态 三&#xff0c;基础了解 鸿蒙系统是基于 js 和 ts 衍生出来的一个东西 要学 arkts 就要学习 js 和 ts 语法 四&#xff0c…

计算机毕业设计 | SSM 校园线上订餐系统(附源码)

1&#xff0c; 概述 1.1 项目背景 传统的外卖方式就是打电话预定&#xff0c;然而&#xff0c;在这种方式中&#xff0c;顾客往往通过餐厅散发的传单来获取餐厅的相关信息&#xff0c;通过电话来传达自己的订单信息&#xff0c;餐厅方面通过电话接受订单后&#xff0c;一般通…

哪种安全数据交换系统,可以满足信创环境要求?

安全数据交换系统是一种专门设计用于在不同网络环境之间安全传输数据的技术解决方案。这类系统确保数据在传输过程中的完整性、机密性和可用性&#xff0c;同时遵守相关的数据保护法规和行业标准。 使用安全数据交换系统的原因主要包括以下几点&#xff1a; 1、数据保护&#…

Jmeter接口自动化测试 —— Jmeter断言之Json断言

json断言可以让我们很快的定位到响应数据中的某一字段&#xff0c;当然前提是响应数据是json格式的&#xff0c;所以如果响应数据为json格式的话&#xff0c;使用json断言还是相当方便的。 还是以之前的接口举例 Url: https://data.cma.cn/weatherGis/web/weather/weatherFcst…

linux中的mtime,ctime,atime

目录 结论 文件 touch新文件 调整文件内容 echo直接修改 vi修改 修改文件属性 调整归属 调整权限 读取文件 目录 增加文件 调整目录下文件属性 访问目录下文件 删除文件 结论 mtime&#xff1a;文件内容的修改时间&#xff08;不含权限、属组修改&#xff09; …

mobi, azw, azw3, epub格式有什么区别

mobi, azw, azw3, epub格式有什么区别 对复杂排版的支持上是azw3好。 对使用方便来说是mobi好&#xff0c;因为可以邮件推送。 刚接触Kindle的小伙伴经常会被mobi、azw、azw3、epub等常见的几个格式搞的很凌乱&#xff0c;它们都有哪些区别呢&#xff1f;又各有什么优缺点呢&am…

C++泛编程(4)

类模板高级&#xff08;1&#xff09; 1.类模板具体化部分具体化完全具体化 2.类模板与继承 1.类模板具体化 有了函数模板具体化的基础&#xff0c;学习类模板的具体化很简单。类模板具体化有两种方式&#xff0c;分别为部分具体化和完全具体化。假如有类模板&#xff1a; te…

户用光伏电站设计优化方案:为行业打造示范标杆

不可再生能源的日益消耗促使了大家对新能源的使用和推广&#xff0c;光伏发电已经成为国家和企业大力推崇的技术。其中&#xff0c;户用光伏发电是重要组成部分&#xff0c;有非常大的市场发展空间。然而&#xff0c;如何优化设计&#xff0c;提高效率&#xff0c;降低成本&…

【Iceberg学习二】Branch和Tag在Iceberg中的应用

Iceberg 表元数据保持一个快照日志&#xff0c;记录了对表所做的更改。快照在 Iceberg 中至关重要&#xff0c;因为它们是读者隔离和时间旅行查询的基础。为了控制元数据大小和存储成本&#xff0c;Iceberg 提供了快照生命周期管理程序&#xff0c;如 expire_snapshots&#xf…

函数调用栈是什么

今天在力扣leetbook上看《图解算法数据结构》中的空间复杂度这一小节&#xff0c;看到如下这句话&#xff1a; “程序调用函数是基于栈实现的&#xff0c;函数在调用期间&#xff0c;占用常量大小的栈帧空间&#xff0c;直至返回后释放。” 这句话的意思是&#xff0c;在程序中…

2.3作业

作业要求&#xff1a; 程序代码&#xff1a; #include<stdlib.h> #include<string.h> #include<stdio.h> typedef struct node //定义链表节点结构体&#xff1a;数据域、指针域 {int data;struct node *next; }*linklist;linklist create_node()//创建新节…

C++类和对象入门(三)

顾得泉&#xff1a;个人主页 个人专栏&#xff1a;《Linux操作系统》 《C从入门到精通》 《LeedCode刷题》 键盘敲烂&#xff0c;年薪百万&#xff01; 前言 在c中&#xff0c;类型分为两类&#xff0c;一类是内置类型&#xff0c;另一类是自定义类型。 1.内置类型&#xf…

Linux内存管理:(十二)Linux 5.0内核新增的反碎片优化

文章说明&#xff1a; Linux内核版本&#xff1a;5.0 架构&#xff1a;ARM64 参考资料及图片来源&#xff1a;《奔跑吧Linux内核》 Linux 5.0内核源码注释仓库地址&#xff1a; zhangzihengya/LinuxSourceCode_v5.0_study (github.com) 外碎片化发生时&#xff0c;页面分配…

Python基础知识:Python流程控制语句

流程控制就是控制程序如何执行的方法&#xff0c;适用于任何一门编程语言&#xff0c;其作用在于&#xff0c;可以根据用户的需求决定程序执行的顺序。计算机在运行程序时&#xff0c;有3种执行方法&#xff0c;第一种是顺序执行&#xff0c;自上而下顺序执行所有的语句&#x…

分享63个节日PPT,总有一款适合您

分享63个节日PPT&#xff0c;总有一款适合您 63个节日PPT下载链接&#xff1a;https://pan.baidu.com/s/1kZeiN06KbevtSCs5vXm6oA?pwd6666 提取码&#xff1a;6666 Python采集代码下载链接&#xff1a;采集代码.zip - 蓝奏云 学习知识费力气&#xff0c;收集整理更不易…

【代码随想录-哈希表】两个数组的交集

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老导航 檀越剑指大厂系列:全面总结 jav…

MySQL 小技巧:使用 xtrabackup 2.4 实现 完全备份及还原

演示&#xff1a;使用 xtrabackup 2.4 实现 完全备份及还原 本案例基于 CentOS 7 的 Mariadb5.5 实现&#xff0c;也支持 MySQL5.5 和 MySQL5.7 1) 安装 xtrabackup 包 // 先安装 Mariadb5.5 和 xtrabackup 包 [rootcentos7 ~] yum install mariadb-server -y [rootcentos7 ~]…

AMH面板如何安装与公网远程访问本地面板界面

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…