- 在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS
- 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
- 在 MapReduce 处理时,可采用 CombineFileInputFormat 提高效率
- 编写自定义的InoputFormat
- 改写 RecordReader,实现一次 maptask 读取一个小文件的完整内容封装到一个 KV 对
- 在Driver 类中一定要设置使用自定义的 InputFormat: job.setInputFormatClass(WholeFileInputFormat.class)
代码实现:
public class MergeDriver { //job public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = null; try { job = Job.getInstance(conf, "combine small files to bigfile"); job.setJarByClass(MergeDriver.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //设置自定义输入的类 job.setInputFormatClass(MyMyFileInputForamt.class); Path input = new Path("/hadoop/input/num_add"); Path output = new Path("/hadoop/output/merge_output1"); //这里使用自定义得我FileInputForamt去格式化input MyMyFileInputForamt.addInputPath(job,input); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); int status = job.waitForCompletion(true) ? 0 : 1; System.exit(status); } catch (Exception e) { e.printStackTrace(); } } //Mapper static private class MyMapper extends Mapper<NullWritable, Text, NullWritable, Text> { /* 这里的map方法就是每读取一个文件调用一次 */ @Override protected void map(NullWritable key, Text value, Mapper<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { context.write(key, value); } } //Reducer private static class MyReducer extends Reducer<NullWritable, Text, NullWritable, Text> { @Override protected void reduce(NullWritable key, Iterable<Text> values, Reducer<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { for (Text v : values) { context.write(key, v); } } } //RecordReader ,这种这个两个泛型,是map端输入的key和value的类型 private static class MyRecordReader extends RecordReader<NullWritable, Text> { // 输出的value对象 Text map_value = new Text(); // 文件系统对象,用于获取文件的输入流 FileSystem fs; // 判断当前文件是否已经读完 Boolean isReader = false; //文件的切片信息 FileSplit fileSplit; //初始化方法,类似于Mapper中的setup,整个类最开始运行 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //初始化文件系统对象 fs = FileSystem.get(context.getConfiguration()); //获取文件路径 fileSplit = (FileSplit) split; } //这个方法,在每次调用map中传入的K-V中,就是在这个方法中给K-V赋值的 @Override public boolean nextKeyValue() throws IOException, InterruptedException { //先读取一次 if (!isReader) { FSDataInputStream input = fs.open(fileSplit.getPath()); //一次性将整个小文件内容都读取出来 byte flush[] = new byte[(int) fileSplit.getLength()]; //将文件内容读取到这个byte数组中 /** * 参数一:读取的字节数组 * 参数二:开始读取的偏移量 * 参数三:读取的长度 */ input.readFully(flush, 0, (int) fileSplit.getLength()); isReader = true; map_value.set(flush); //将读取的内容,放置在map的value中 //保证能正好读一次,nextKeyValue()第一次返回true正好可以调用一次map,第二次返回false return isReader; } return false; } @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } @Override public Text getCurrentValue() throws IOException, InterruptedException { return map_value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { fs.close(); } } //FileInputFormat private static class MyMyFileInputForamt extends FileInputFormat<NullWritable, Text> { @Override public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { MyRecordReader mr = new MyRecordReader(); //先调用初始化方法 mr.initialize(split, context); return mr; } } }
- 从原始日志文件中读取数据
- 根据业务获取业务数据库中的数据
- 根据某个连接条件获取相应的连接结果
- 在 MapReduce 中访问外部资源
- 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
- 自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write() CombineFileInputFormat 提高效率
代码实现
//这里以一个简单的案例为例,将文件按照不同的等级输出的不同的文件中
public class Score_DiffDic { //job public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop01:9000"); Job job = null; try { job = Job.getInstance(conf, "Score_DiffDic"); job.setJarByClass(Score_DiffDic.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //设置自定义输出类型 job.setOutputFormatClass(MyOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); Path input = new Path("/hadoop/input/num_add"); FileInputFormat.addInputPath(job,input); Path output = new Path("/hadoop/output/merge_output1"); //这是自定义输出类型 MyOutputFormat.setOutputPath(job,output); FileSystem fs = FileSystem.get(conf); if (fs.exists(output)) { fs.delete(output, true); } FileOutputFormat.setOutputPath(job, output); int status = job.waitForCompletion(true) ? 0 : 1; System.exit(status); } catch (Exception e) { e.printStackTrace(); } } //Mapper private static class MyMapper extends Mapper<LongWritable,Text,Text,DoubleWritable>{ Text mk=new Text(); DoubleWritable mv=new DoubleWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\\s+"); //computer,huangxiaoming,85 if(fields.length==3){ mk.set(fields[1]); mv.set(Double.parseDouble(fields[2])); context.write(mk, mv); } } } //Reducer private static class MyReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{ DoubleWritable mv=new DoubleWritable(); @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { double sum=0; int count=0; for(DoubleWritable value:values){ sum+=value.get(); count++; } mv.set(sum/count); context.write(key,mv); } } //FileOutputFormat private static class MyOutputFormat extends FileOutputFormat<Text, DoubleWritable> { @Override public RecordWriter<Text, DoubleWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { FileSystem fs =FileSystem.get(job.getConfiguration()); return new MyRecordWrite(fs); } } //RecordWriter,这里的两个泛型是Reudcer输出K-V的类型 private static class MyRecordWrite extends RecordWriter<Text, DoubleWritable> { FileSystem fs; //输出的文件的路径 Path path2 = new Path("/hadoop/output/score_out1"); Path path3 = new Path("/hadoop/output/score_out2"); FSDataOutputStream output1; FSDataOutputStream output2; public MyRecordWrite() { } //初始化参数 public MyRecordWrite(FileSystem fs) { this.fs = fs; try { output1=fs.create(path2); output2=fs.create(path3); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, DoubleWritable value) throws IOException, InterruptedException { //业务逻辑操作,平均分数大于80的在path2中,其他的在path3中 if(value.get()>80){ output1.write((key.toString()+":"+value.get()+"\n").getBytes()); }else{ output2.write((key.toString()+":"+value.get()+"\n").getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { fs.close(); output1.close(); output2.close(); } } }
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。