我们要自定义输出时,首先继承两个抽象类,一个是 OutputFormat,一个是 RecordWriter
。前者是主要是创建RecordWriter,后者就是主要实现 write方法来将kv写入文件。
1、需求
将reduce输出的KV中,如果key中包含特定字符串,则将其输出到一个文件中,剩下的KV则输出到另外的文件中。
2、源码
源数据
http://cn.bing.com http://www.baidu.com http://www.google.com http://www.itstar.com http://www.itstar1.com http://www.itstar2.com http://www.itstar3.com http://www.baidu.com http://www.sin2a.com http://www.sin2a.comw.google.com http://www.sin2desa.com http://www.sin2desa.comw.google.com http://www.sina.com http://www.sindsafa.com http://www.sohu.com
outputFormat
public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new MyRecordWriter(taskAttemptContext); } }
RecordWriter
public class MyRecordWriter extends RecordWriter<Text, NullWritable> { private FSDataOutputStream startOut; private FSDataOutputStream otherOut; public MyRecordWriter(TaskAttemptContext job) { try { FileSystem fs = FileSystem.get(job.getConfiguration()); startOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\startout.log")); otherOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\otherout.log")); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String line = key.toString(); //如果key中包含itstar就写入到另外一个文件中 if (line.contains("itstar")) { this.startOut.writeUTF(line); } else { this.otherOut.writeUTF(line); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { this.startOut.close(); this.otherOut.close(); } }
mapper
public class MyOutputMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); } }
reducer
public class MyOutputReducer extends Reducer<Text, NullWritable, Text, NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { String line = key.toString(); line = line + "\r\n"; k.set(line); context.write(k, NullWritable.get()); } }
driver
ublic class MyDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\date\\A\\itstarlog\\A\\other.log", "G:\\test\\date\\A\\itstarlog\\logresult\\"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyDriver.class); job.setMapperClass(MyOutputMapper.class); job.setReducerClass(MyOutputReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //自定义输出的实现子类,也是继承FileOutputFormat job.setOutputFormatClass(MyOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); //这个路径输出的是job的执行成功successs文件的输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。