温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

hadoop中如何利用mapreduce实现wordcount和电影评分预测

发布时间:2021-11-24 15:49:03 来源:亿速云 阅读:230 作者:柒染 栏目:云计算

这篇文章将为大家详细讲解有关hadoop中如何利用mapreduce实现wordcount和电影评分预测,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

mapreduce中map指映射,map指的是归约。

mapreduce是一个key-value来处理数据的编程模型,它使用map将一组key-value映射为另一组key-value

通过底层传递给reduce,在reduce中,它将所有map过程传递过来的key-value进行归约,相同的key值,value值会放在一起。mapreduce内部还会对reduce过程中的key值进行一次排序。

一.WordCount

public class WordCount {     //     public static final String HDFS = "hdfs://localhost:8888";     public static final Pattern DELIMITER = Pattern.compile("\\b([a-zA-Z]+)\\b");          //自定义Map类型执行  "映射"这一部分     public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>     {         //mapreduce中,Text相当于String类型,IntWritable相当于Int类型         //LongWritable是实现了WritableComparable的一个数据类型。         private final static IntWritable one = new IntWritable(1);         private Text word = new Text();                  @Override         //重写父类map()函数         public void map(LongWritable key, Text value,                 Context context)                 throws IOException, InterruptedException         {             //读取一行数据             String line = value.toString();             //将该行字符全部变为小写             line = line.toLowerCase();             //根据定义好的正则表达式拆分一行字符串。             Matcher matcher = DELIMITER.matcher(line);             while(matcher.find()){                 //将分解的一个个单词类型转化为Text。                 word.set(matcher.group());                 //将相应的key-value值传入。key值为单词,value值为1.                 context.write(word,one);             }         }     }          //自定义Combine过程先对本地进行的map进行一次reduce过程,减少传递给主机的数据量.     public static class Combine extends Reducer <Text, IntWritable, Text, IntWritable>     {          @Override          public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {             int sum = 0;             //遍历同一个key值的所有value,所有的value放在同一个Iterable中。             for (IntWritable line : values)             {                 sum += line.get();             }             IntWritable value = new IntWritable(sum);             //将key-value按照指定的输出格式输出。             context.write(key, value);         }     }          public static class Reduce extends Reducer <Text, IntWritable, Text, IntWritable>     {         @Override         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {             int sum = 0;            for (IntWritable line : values)            {                sum += line.get();            }            IntWritable value = new IntWritable(sum);            context.write(key, value);                                  }     }     public static void main(String[] args) throws Exception     {         JobConf conf = WordCount.config();         String input = "data/1.txt";         String output = HDFS + "/user/hdfs/wordcount";         //自定义HDFS文件操作工具类         HdfsDAO hdfs = new HdfsDAO(WordCount.HDFS, conf);         //移除存在的文件否则会报文件生成文件已存在的错误         hdfs.rmr(output);         Job job = new Job(conf);         job.setJarByClass(WordCount.class);                  //设置输出的key值类型         job.setOutputKeyClass(Text.class);         //设置输出的value值类型         job.setOutputValueClass(IntWritable.class);                  job.setMapperClass(WordCount.Map.class);         job.setCombinerClass(WordCount.Combine.class);         job.setReducerClass(WordCount.Reduce.class);                  job.setInputFormatClass(TextInputFormat.class);         //设置输出的格式,这里使用的是自定义的FileOutputFormat类,见下文。         job.setOutputFormatClass(ParseTextOutputFormat.class);         FileInputFormat.setInputPaths(job, new Path(input));         FileOutputFormat.setOutputPath(job, new Path(output));                                          System.exit(job.waitForCompletion(true) ? 0 : 1);     }               public static JobConf config() {         JobConf conf = new JobConf(WordCount.class);         conf.setJobName("WordCount");         conf.addResource("classpath:/hadoop/core-site.xml");         conf.addResource("classpath:/hadoop/hdfs-site.xml");         conf.addResource("classpath:/hadoop/mapred-site.xml"); //        conf.set("io.sort.mb", "");         return conf;     }           }

自定义文件输出格式

import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public class ParseTextOutputFormat<K, V> extends FileOutputFormat<K, V>{     protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {         private static final String utf8 = "UTF-8";         private static final byte[] newline;         static {           try {             newline = "\n".getBytes(utf8);           } catch (UnsupportedEncodingException uee) {             throw new IllegalArgumentException("can't find " + utf8 + " encoding");           }         }         protected DataOutputStream out;         private final byte[] keyValueSeparator;         public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {           this.out = out;           try {             this.keyValueSeparator = keyValueSeparator.getBytes(utf8);           } catch (UnsupportedEncodingException uee) {             throw new IllegalArgumentException("can't find " + utf8 + " encoding");           }         }         public LineRecordWriter(DataOutputStream out) {           this(out, "\t");         }         /**          * Write the object to the byte stream, handling Text as a special          * case.          * @param o the object to print          * @throws IOException if the write throws, we pass it on          */         private void writeObject(Object o) throws IOException {           if (o instanceof Text) {             Text to = (Text) o;             out.write(to.getBytes(), 0, to.getLength());           } else {             out.write(o.toString().getBytes(utf8));           }         }         public synchronized void write(K key, V value)           throws IOException {           boolean nullKey = key == null || key instanceof NullWritable;           boolean nullValue = value == null || value instanceof NullWritable;           if (nullKey && nullValue) {             return;           }           if (!nullKey) {             writeObject(key);           }           if (!(nullKey || nullValue)) {             out.write(keyValueSeparator);           }           if (!nullValue) {             writeObject(value);           }           out.write(newline);         }         public synchronized          void close(TaskAttemptContext context) throws IOException {           out.close();         }       }       public RecordWriter<K, V>               getRecordWriter(TaskAttemptContext job                              ) throws IOException, InterruptedException {         Configuration conf = job.getConfiguration();         boolean isCompressed = getCompressOutput(job);         String keyValueSeparator= conf.get("mapred.textoutputformat.separator",                                            ":");         CompressionCodec codec = null;         String extension = "";         if (isCompressed) {           Class<? extends CompressionCodec> codecClass =              getOutputCompressorClass(job, GzipCodec.class);           codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);           extension = codec.getDefaultExtension();         }         Path file = getDefaultWorkFile(job, extension);         FileSystem fs = file.getFileSystem(conf);         if (!isCompressed) {           FSDataOutputStream fileOut = fs.create(file, false);           return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);         } else {           FSDataOutputStream fileOut = fs.create(file, false);           return new LineRecordWriter<K, V>(new DataOutputStream                                             (codec.createOutputStream(fileOut)),                                             keyValueSeparator);         }       }           }

二.电影评分预测

整个算法的实现中使用了slop one算法来预测评分,此处自定义的输出类与上文一致。

输入文件格式为userId::movieId::score

package  main.java.org.conan.myhadoop.recommend; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Pattern; import org.apache.hadoop.mapred.JobConf; import  main.java.org.conan.myhadoop.hdfs.HdfsDAO; public class Recommend {     public static final String HDFS = "hdfs://localhost:8888";     public static final Pattern DELIMITER = Pattern.compile("[\t,]");          public static final Pattern STRING = Pattern.compile("[\t,:]");      //    public final static int movieListLength = 100000; //    public static int []movieList = new int[movieListLength];     public static List movieList = new ArrayList();          public static Map userScore = new HashMap();     public static void main(String[] args) throws Exception {         Map<String, String> path = new HashMap<String, String>();         String in = "logfile/4.txt";         String out = HDFS + "/user/hdfs/recommend" + "/step5";          //       path.put("data", "logfile/small.csv");          //       path.put("data", "logfile/ratings.dat");         if(args.length == 2){             in = args[0];             out = HDFS + args[1];             System.out.println(out);         }         //设置数据输入路径         path.put("data", in);                  //设置第一步输入文件路径         path.put("Step1Input", HDFS + "/user/hdfs/recommend");                  //设置第一步结果输出路径         path.put("Step1Output", path.get("Step1Input") + "/step1");                  //设置第二步输入文件路径         path.put("Step2Input", path.get("Step1Output"));                  //设置第二步结果输出路径         path.put("Step2Output", path.get("Step1Input") + "/step2");                  //设置第三步输入文件路径         path.put("Step3Input1", path.get("data")); //        path.put("Step3Input2", "logfile/movie/movies.dat");         //设置第三步结果输出路径         path.put("Step3Output", path.get("Step1Input") + "/step3"); //        path.put("Step3Input2", path.get("Step2Output")); //        path.put("Step3Output2", path.get("Step1Input") + "/step3_2"); //                 //设置第四步输入文件路径1         path.put("Step4Input1", path.get("Step2Output"));                  //设置第四步输入文件路径2         path.put("Step4Input2", path.get("Step3Output"));         //设置第四步结果输出路径         path.put("Step4Output", path.get("Step1Input") + "/step4"); //                 //设置第五步输入文件路径         path.put("Step5Input", path.get("Step4Output")); //        path.put("Step5Input2", path.get("Step3Output2"));         //设置第五步结果输出路径         path.put("Step5Output", out);                  //第一步,根据给出的用户评分文件,求出每个用户对物品的评分矩阵         Step1.run(path);                  //根据第一步的输出结果计算物品评分的同现矩阵         Step2.run(path);                  //获取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为0         Step3.run(path);                  //根据第二步与第三步的结果计算出每位用户对每部电影的评分         Step4.run(path);                  //整理输出格式。         Step5.run(path);                  System.exit(0);     }     public static JobConf config() {         JobConf conf = new JobConf(Recommend.class);         conf.setJobName("Recommand");         conf.addResource("classpath:/hadoop/core-site.xml");         conf.addResource("classpath:/hadoop/hdfs-site.xml");         conf.addResource("classpath:/hadoop/mapred-site.xml"); //        conf.set("io.sort.mb", "");         return conf;     } }
//求出用户对物品的评分矩阵,即得出用户对电影 的评分矩阵 //每一行数据代表一个用户对所有打分电影的结果 //key值为userId, value值为movieID:score,movieId:score public class Step1 {     public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, Text, Text> {         private final static Text k = new Text();         private final static Text v = new Text();         @Override         public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {             String[] tokens = value.toString().split("::");             String itemID = tokens[1];             String pref = tokens[2];             k.set(tokens[0]);             v.set(itemID + ":" + pref);             output.collect(k, v);         }     }     public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {         @Override         public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {             String value= "";             int num = 0;             while (values.hasNext()) {                 num++;                 value += values.next();                 value += ",";                 if( num >= 400 ){                     value = value.substring(0, value.length() - 1);                     Text v = new Text(value);                     output.collect(key, v);                     value = "";                     num = 0;                     break;                 }             }             if(num != 0){                 value = value.substring(0, value.length() - 1);                 Text v = new Text(value);                 output.collect(key, v);             }                      }     }     public static void run(Map<String, String> path) throws IOException {         JobConf conf = Recommend.config();         String input = path.get("Step1Input");         String output = path.get("Step1Output");         HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf); //        hdfs.rmr(output);         hdfs.rmr(input);         hdfs.mkdirs(input);         hdfs.copyFile(path.get("data"), input);         conf.setMapOutputKeyClass(Text.class);         conf.setMapOutputValueClass(Text.class);         conf.setOutputKeyClass(Text.class);         conf.setOutputValueClass(Text.class);         conf.setMapperClass(Step1_ToItemPreMapper.class);         conf.setReducerClass(Step1_ToUserVectorReducer.class);         conf.setInputFormat(TextInputFormat.class);         conf.setOutputFormat(TextOutputFormat.class);         FileInputFormat.setInputPaths(conf, new Path(input));         FileOutputFormat.setOutputPath(conf, new Path(output));         RunningJob job = JobClient.runJob(conf);         while (!job.isComplete()) {             job.waitForCompletion();         }     } }
//根据第一步的 结果求出物品的同现矩阵 //算法方面,没有太好的算法处理两个for循环,就在求物品同现矩阵的时候使用一个随机数,得出一个movieA:movieB的结果 public class Step2 {     public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, DoubleWritable> {         private final static Text k = new Text();         private final static DoubleWritable v = new DoubleWritable(); //        private final static IntWritable v = new IntWritable(1);         @Override         public void map(LongWritable key, Text values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException {             String[] tokens = Recommend.DELIMITER.split(values.toString());             for (int i = 1; i < tokens.length; i++) {                 String itemID = tokens[i].split(":")[0]; //                for (int j = 1; j < i+1; j++) { //                    String itemID2 = tokens[j].split(":")[0]; //                    double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]);  ////                    if(sum<0.5) break; ////                    if(sum>4.5) break; //                    k.set(itemID + ":" + itemID2+":"); //                    v.set(sum); //                    output.collect(k, v); //                    k.set(itemID2 + ":" + itemID+":"); //                    v.set(sum); //                    output.collect(k, v); // //                }                 Random random = new Random();                 int j;                 j = random.nextInt(tokens.length - 1) + 1;                 String itemID2 = tokens[j].split(":")[0];                 double sum =Double.parseDouble(tokens[i].split(":")[1])-Double.parseDouble(tokens[j].split(":")[1]);                 k.set(itemID + ":" + itemID2+":");                 v.set(sum);                 output.collect(k, v);             }         }     }     public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, DoubleWritable, Text, DoubleWritable> {         private DoubleWritable result = new DoubleWritable();         @Override         public void reduce(Text key, Iterator<DoubleWritable> values, OutputCollector<Text, DoubleWritable> output, Reporter reporter) throws IOException {             double sum = 0;             int count = 0;             while (values.hasNext()) {                 sum += values.next().get();                 count++;             }             sum = sum/count*1.0;             DecimalFormat df = new DecimalFormat("#.0000");             sum = Double.valueOf(df.format(sum)); //            System.out.println(key+"---count----"+count+"-------"+sum);             result.set(sum);             output.collect(key, result);         }     }     public static void run(Map<String, String> path) throws IOException {         JobConf conf = Recommend.config();         String input = path.get("Step2Input");          String output = path.get("Step2Output");         HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);         hdfs.rmr(output);         conf.setOutputKeyClass(Text.class);         conf.setOutputValueClass(DoubleWritable.class);         conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class); //        conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class);         conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);         conf.setInputFormat(TextInputFormat.class);         conf.setOutputFormat(TextOutputFormat.class);         FileInputFormat.setInputPaths(conf, new Path(input));         FileOutputFormat.setOutputPath(conf, new Path(output));         RunningJob job = JobClient.runJob(conf);         while (!job.isComplete()) {             job.waitForCompletion();         }     } }
//取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为0 //此处因为没有一个专门的电影记录为文件,所以就只能从数据文件中获取到所有的电影ID。 //并将所有的电影ID维持在一个线性表中,但是当数据文件过大时,每次读取一条数据都要从线性表中判断该电影是否已经记录 //,导致效率会越来越低 //并且维持一个静态map记录每个用户对的第一部评过分的电影,以此作为标准,使用物品同现矩阵进行计算 public class Step3 {     public static class Step4_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> {         private final static Text k = new Text();         private final static Text v = new Text();         private String flag;    //判断读取的数据集 //        private final static Map<Integer, List<Cooccurrence>> cooccurrenceMatrix = new HashMap<Integer, List<Cooccurrence>>();         @Override         protected void setup(Context context) throws IOException, InterruptedException {             FileSplit split = (FileSplit) context.getInputSplit();             flag = split.getPath().getParent().getName();// 判断读的数据集                      }         @Override         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {             String[] tokens = values.toString().split("::"); //            System.out.println(flag); //            System.out.println(tokens.length); //             //            for(int i = 0;i < tokens.length;i++){ //                System.out.println(tokens[i]); //            }              //            获取所有的电影数据,应该有一个文件记录所有的电影信息,就不用判断是否包含直接添加             if( !Recommend.movieList.contains(tokens[1]) ){                 Recommend.movieList.add(tokens[1]);             }              //            if(flag.equals("movie")){ //                Recommend.movieList.add(tokens[0]); //            } //            else{                 k.set(tokens[0]);                 v.set(tokens[1] + "," + tokens[2]);                 context.write(k, v); //            }                      }     }     public static class Step4_AggregateAndRecommendReducer extends Reducer<Text, Text, Text, Text> {         private final static Text v = new Text();         @Override         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {             Map userMovieList = new HashMap();             for(Text line : values){                 String[] tokens = Recommend.DELIMITER.split(line.toString());                 userMovieList.put(tokens[0], tokens[1]);             }             for(int i = 0; i < Recommend.movieList.size();i++){ //                System.out.println("key---->" + key); //                System.out.println("value---->" + v);                 if(!userMovieList.containsKey(Recommend.movieList.get(i))){                     v.set(Recommend.movieList.get(i) + "," + 0);                     context.write(key, v);                 }                 else{                     v.set(Recommend.movieList.get(i) + "," + userMovieList.get(Recommend.movieList.get(i)));                     context.write(key, v);                 }             }         }     }     public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {         JobConf conf = Recommend.config();         String input1 = path.get("Step3Input1"); //        String input2 = path.get("Step3Input2");         String output = path.get("Step3Output");         HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);         hdfs.rmr(output);         Job job = new Job(conf);         job.setJarByClass(Step3.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(Text.class);         job.setMapperClass(Step3.Step4_PartialMultiplyMapper.class);         job.setReducerClass(Step3.Step4_AggregateAndRecommendReducer.class);         job.setInputFormatClass(TextInputFormat.class);         job.setOutputFormatClass(TextOutputFormat.class);         FileInputFormat.setInputPaths(job, new Path(input1));         FileOutputFormat.setOutputPath(job, new Path(output));                  do{             job.waitForCompletion(false);         }while(!job.isComplete());     } }
//根据第二步与第三步的结果计算出每位用户对每部电影的评分 //根据第三步结果,读取数据,当发现用户对某部电影的评分为0的时候, //根据第二步得到的map获取数据和物品同现矩阵计算得出用户对电影的评分 public class Step4 {     public static class Step4Update_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> {         private String flag;// A同现矩阵 or B评分矩阵         @Override         protected void setup(Context context) throws IOException, InterruptedException {             FileSplit split = (FileSplit) context.getInputSplit();             flag = split.getPath().getParent().getName();// 判断读的数据集 //             System.out.println(flag);         }         @Override         public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {             String[] tokens = Recommend.DELIMITER.split(values.toString());                                                    if (flag.equals("step2")) {// 同现矩阵 //                System.out.println(tokens.length); //                for(int i = 0; i < tokens.length;i++){ //                    System.out.println(tokens[i]); //                } //                String[] v1 = tokens[0].split(":"); //                String itemID1 = v1[0]; //                String itemID2 = v1[1]; //                String num = tokens[1]; // //                Text k = new Text(itemID1); //                Text v = new Text("A:" + itemID2 + "," + num);                 String[] v1 = tokens[0].split(":");                                                                                     Text k = new Text(v1[0]);                                  Text v = new Text("M:" + v1[1] + "," + tokens[1]);                                                   context.write(k, v); //                 System.out.println(k.toString() + "  " + v.toString());             } else if (flag.equals("step3")) {// 评分矩阵 //                System.out.println(tokens.length); //                for(int i = 0; i < tokens.length;i++){ //                    System.out.println(tokens[i]); //                }                  //                String[] v2 = tokens[1].split(","); ////                String itemID = tokens[0]; ////                String userID = v2[0]; ////                String pref = v2[1];                                  if(Double.parseDouble(tokens[2]) != 0 && !Recommend.userScore.containsKey(tokens[0])){                     Recommend.userScore.put(tokens[0], tokens[1] + "," + tokens[2]);                 } ////                 Text k = new Text(tokens[1]);                                  Text v = new Text("U:" + tokens[0] + "," + tokens[2]);                 context.write(k, v);                 // System.out.println(k.toString() + "  " + v.toString());             }         }     }     public static class Step4Update_AggregateReducer extends Reducer<Text, Text, Text, Text> {         @Override         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //            System.out.println("key--->" + key);             Map movie = new HashMap();             Text k;             Text v;             //Map user = new HashMap();             List list = new ArrayList();             for (Text line : values) {                 list.add(line.toString()); //                System.out.println(line.toString());                 String[] tokens = Recommend.STRING.split(line.toString());                 if(tokens[0].equals("M")){ //                    System.out.println(tokens[1]); //                    System.out.println(tokens[2]);                     movie.put(tokens[1], tokens[2]);                 }             }                          for(int i = 0;i < list.size();i++) {                                  String[] tokens = Recommend.STRING.split((String) list.get(i));                 //System.out.println(tokens[0]);                 if(tokens[0].equals("U")){                     if(Double.parseDouble(tokens[2]) == 0 ){                         String userScore = (String) Recommend.userScore.get(tokens[1]);                         String[] temps =  Recommend.STRING.split(userScore);                         k = new Text(key); //                        System.out.println("useid"+tokens[1]+"movie score"+temps[1]); //                        System.out.println("movie id"+movie.get(temps[0]));                         double temp = 0;                         if(movie.get(temps[0]) != null){                             Double.parseDouble((String) movie.get(temps[0]));                         }                                                  double score = Double.parseDouble(temps[1])+temp;                                                  v = new Text(tokens[1] + "," + score);                     }                     else{                         k = new Text(key);                         v = new Text(tokens[1] + "," + tokens[2]);                                                                       } //                    System.out.println("key-->" + k); //                    System.out.println("value-->" + v);                     context.write(k, v);                 }                              }                                        //            System.out.println(key.toString() + ":"); // //            Map<String, String> mapA = new HashMap<String, String>(); //            Map<String, String> mapB = new HashMap<String, String>(); // //            for (Text line : values) { //                String val = line.toString(); //                System.out.println(val); // //                if (val.startsWith("A:")) { //                    String[] kv = Recommend.DELIMITER.split(val.substring(2)); //                    mapA.put(kv[0], kv[1]); // //                } else if (val.startsWith("B:")) { //                    String[] kv = Recommend.DELIMITER.split(val.substring(2)); //                    mapB.put(kv[0], kv[1]); // //                } //            } // //            double result = 0; //            Iterator<String> iter = mapA.keySet().iterator(); //            while (iter.hasNext()) { //                String mapk = iter.next();// itemID // //                int num = Integer.parseInt(mapA.get(mapk)); //                Iterator<String> iterb = mapB.keySet().iterator(); //                while (iterb.hasNext()) { //                    String mapkb = iterb.next();// userID //                    double pref = Double.parseDouble(mapB.get(mapkb)); //                    result = num * pref;// 矩阵乘法相乘计算 // //                    Text k = new Text(mapkb); //                    Text v = new Text(mapk + "," + result); //                    context.write(k, v); //                    System.out.println(k.toString() + "  " + v.toString()); //                } //            }         }     }     public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {         JobConf conf = Recommend.config();         String input1 = path.get("Step4Input1");         String input2 = path.get("Step4Input2");         String output = path.get("Step4Output");         HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);         hdfs.rmr(output);         Job job = new Job(conf);         job.setJarByClass(Step4.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(Text.class);         job.setMapperClass(Step4.Step4Update_PartialMultiplyMapper.class);         job.setReducerClass(Step4.Step4Update_AggregateReducer.class);         job.setInputFormatClass(TextInputFormat.class);         job.setOutputFormatClass(TextOutputFormat.class);         FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));         FileOutputFormat.setOutputPath(job, new Path(output));                  do{             job.waitForCompletion(false);         }while(!job.isComplete());              } }
//对最后的数据输出格式做一遍规范。 public class Step5 {        public static class Step5_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> {                @Override          public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {  //            System.out.println("run");  //            System.out.println("key--->" + key);              String[] tokens = Recommend.DELIMITER.split(values.toString());              Text k = new Text(tokens[1]);              Text v;              if(Double.parseDouble(tokens[2]) == 0){                  v = new Text(tokens[0] + "::");              }              else{                  v = new Text(tokens[0] + "::" + tokens[2]);              }              context.write(k, v);          }        }        public static class Step5_AggregateReducer extends Reducer<Text, Text, Text, Text> {            @Override          public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {              for (Text line : values) {                  Text k = new Text(key.toString());                  context.write(k, line);              }          }      }        public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {          JobConf conf = Recommend.config();            String input = path.get("Step5Input");          String output = path.get("Step5Output");            HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);          hdfs.rmr(output);            Job job = new Job(conf);          job.setJarByClass(Step5.class);            job.setOutputKeyClass(Text.class);          job.setOutputValueClass(Text.class);            job.setMapperClass(Step5.Step5_PartialMultiplyMapper.class);          job.setReducerClass(Step5.Step5_AggregateReducer.class);            job.setInputFormatClass(TextInputFormat.class);          job.setOutputFormatClass(ParseTextOutputFormat.class);            FileInputFormat.setInputPaths(job, new Path(input));          FileOutputFormat.setOutputPath(job, new Path(output));            do{              job.waitForCompletion(false);          }while(!job.isComplete());          System.out.println("---------------------end--------------------");      }    }

关于hadoop中如何利用mapreduce实现wordcount和电影评分预测就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI