温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Hadoop中如何分区

发布时间:2021-12-09 15:44:33 来源:亿速云 阅读:201 作者:小新 栏目:云计算

小编给大家分享一下Hadoop中如何分区,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

package partition; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class KpiApp {	public static final String INPUT_PATH = "hdfs://hadoop:9000/files/HTTP_20130313143750.dat";	public static final String OUTPUT_PATH = "hdfs://hadoop:9000/files/format";	public static void main(String[] args)throws Exception {	Configuration conf = new Configuration();	existsFile(conf);	Job job = new Job(conf, KpiApp.class.getName());	//打成Jar在Linux运行	job.setJarByClass(KpiApp.class);	//1.1	FileInputFormat.setInputPaths(job, INPUT_PATH);	job.setInputFormatClass(TextInputFormat.class);	//1.2	job.setMapperClass(MyMapper.class);	job.setOutputKeyClass(Text.class);	job.setOutputValueClass(KpiWritable.class);	//1.3 自定义分区	job.setPartitionerClass(KpiPartition.class);	job.setNumReduceTasks(2);	//1.4 排序分组	//1.5 聚合	//2.1	//2.2	job.setReducerClass(MyReducer.class);	job.setOutputKeyClass(Text.class);	job.setOutputValueClass(KpiWritable.class);	//2.3	FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));	job.setOutputFormatClass(TextOutputFormat.class);	job.waitForCompletion(true);	}	private static void existsFile(Configuration conf) throws IOException,	URISyntaxException {	FileSystem fs = FileSystem.get(new URI(OUTPUT_PATH),conf);	if(fs.exists(new Path(OUTPUT_PATH))){	fs.delete(new Path(OUTPUT_PATH), true);	}	}	static class MyMapper extends Mapper<LongWritable, Text, Text, KpiWritable>{	@Override	protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {	String string = value.toString();	String[] split = string.split("\t");	String phone = split[1];	Text key2 = new Text();	key2.set(phone);	KpiWritable v2= new KpiWritable();	v2.set(split[6],split[7],split[8],split[9]);	context.write(key2, v2);	}	}	static class MyReducer extends Reducer<Text, KpiWritable, Text, KpiWritable>{	@Override	protected void reduce(Text key2, Iterable<KpiWritable> values,Context context)	throws IOException, InterruptedException {	long upPackNum = 0L;	long downPackNum = 0L;	long upPayLoad = 0L;	long downPayLoad = 0L;	for(KpiWritable writable : values){	upPackNum += writable.upPackNum;	downPackNum += writable.downPackNum;	upPayLoad += writable.upPayLoad;	downPayLoad += writable.downPayLoad;	}	KpiWritable value3 = new KpiWritable();	value3.set(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad));	context.write(key2, value3);	}	} } class KpiWritable implements Writable{	long upPackNum;	long downPackNum;	long upPayLoad;	long downPayLoad;	@Override	public void write(DataOutput out) throws IOException {	out.writeLong(this.upPackNum);	out.writeLong(this.downPackNum);	out.writeLong(this.upPayLoad);	out.writeLong(this.downPayLoad);	}	public void set(String string, String string2, String string3,	String string4) {	this.upPackNum = Long.parseLong(string);	this.downPackNum = Long.parseLong(string2);	this.upPayLoad = Long.parseLong(string3);	this.downPayLoad = Long.parseLong(string4);	}	@Override	public void readFields(DataInput in) throws IOException {	this.upPackNum = in.readLong();	this.downPackNum = in.readLong();	this.upPayLoad = in.readLong();	this.downPayLoad = in.readLong();	}	@Override	public String toString() {	return  upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;	} } class KpiPartition extends Partitioner<Text, KpiWritable>{	@Override	public int getPartition(Text key, KpiWritable value, int numPartitions) {	String string = key.toString();	return string.length()==11?0:1;	} }

  Paritioner是Hashpartitioner的基类,如果需要定制Partitioner也需要继承该类。

  HashPartitioner是MapReduce的默认Partitioner。

以上是“Hadoop中如何分区”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI