Map-Reduce Programming with Hadoop CS5225 Parallel and Concurrent Programming Dilum Bandara Dilum.Bandara@uom.lk Some slides adapted from Dr. Srinath Perera
HDFS  HDFS – Hadoop Distributed File System  File system supported by Hadoop  Based on ideas presented in “The Google File System” Paper  Highly scalable file system for handling large data 2
HDFS Architecture 3
HDFS Architecture (Cont.)  HDFS has master-slave architecture  Name Node – Master node  Manages file system namespace  Regulates access to files by clients  Data node  Manage storage attached to nodes  Responsible for serving read & write requests from file system’s clients  Perform block creation, deletion, & replication upon instruction from Name Node 4
HDFS Architecture (Cont.) 5
HDFS in Production  Yahoo! Search Webmap is a Hadoop application  Webmap starts with every webpage crawled by Yahoo! & produces a database of all known web pages  This derived data feed to Machine Learned Ranking algorithms  Runs on 10,000+ core Linux clusters & produces data that is used in every Yahoo! Web search query  1 trillion links  Produce over 300 TB, compressed!  Over 5 Petabytes of raw disk used in production cluster 6
HDFS Java Client Configuration conf = new Configuration(false); conf.addResource(new Path("/works/fsaas/hadoop-0.20.2/conf/core-site.xml")); conf.addResource(new Path("/works/fsaas/hadoop-0.20.2/conf/hdfs-site.xml")); FileSystem fs = null; fs = FileSystem.get(conf); Path filenamePath = new Path(filename); FileSystem fs = getFileSystemConnection(); if (fs.exists(filenamePath)) { // remove the file first fs.delete(filenamePath); } FSDataOutputStream out = fs.create(filenamePath); out.writeUTF(String.valueOf(currentSystemTime)); out.close(); FSDataInputStream in = fs.open(filenamePath); String messageIn = in.readUTF(); System.out.print(messageIn); in.close(); System.out.println(fs.getContentSummary(filenamePath).toString()); 7
Install Hadoop  3 different Options 1. Local  One JVM installation  Just Unzip 2. Pseudo Distributed  One JVM, but like distributed installation 3. Distributed Installation 8
More General Map/Reduce  Typically Map-Reduce implementations are bit more general 1. Formatters 2. Partition Function  Break map output across many reduce function instances 3. Map Function 4. Combine Function  If there are many map steps, this step combine the result before giving it to Reduce 5. Reduce Function 9
Example – Word Count  Find words in a collection of documents & their frequency of occurrence Map(docId, text): for all terms t in text emit(t, 1); Reduce(t, values[]) int sum = 0; for all values v sum += v; emit(t, sum); 10
Example – Mean  Compute mean value associated with same key Map(k, value): emit(k, value); Reduce(k, values[]) int sum = 0; int count = 0; for all values v sum += v; count += 1; emit(k, sum/count); 11
Example – Sorting  How to sort an array of 1 million integers using Map reduce?  Partial sorts at mapper & final sort by reducer  Use of locality preserving hash function  If k1 < k2 then hash(k1) < hash(k2) Map(k, v): int val = read value from v emit(val, val); Reduce(k, values[]) emit(k, k); 12
Example – Inverted Index  Normal index is a mapping from document to terms  Inverted index is mapping from terms to documents  If we have a million documents, how do we build a inverted index using Map-Reduce? Map(docid, text): for all word w in text emit(w, docid) Reduce(w, docids[]) emit(w, docids[]); 13
Example – Distributed Grep map(k, v): Id docId = .. (read file name) If (v maps grep) emit(k, (pattern, docid)) Reduce(k, values[]) emit(k, values); 14
Composition with Map-Reduce  Map/Reduce is not a tool to use as a fixed template  It should be used with Fork/Join, etc., to build solutions  Solution may have more than one Map/Reduce step 15
Composition with Map-Reduce – Example  Calculate following for a list of million integers 16
Map Reduce Client public class WordCountSample { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {….. } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { ..} } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCountSample.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path("/input")); FileOutputFormat.setOutputPath(conf, new Path("/output/"+ System.currentTimeMillis())); JobClient.runJob(conf); } } 17 Example: http://wiki.apache.org/hadoop/WordCount
Format to Parse Custom Data //add following to the main method Job job = new Job(conf, "LogProcessingHitsByLink"); …. job.setInputFormatClass(MboxFileFormat.class); .. System.exit(job.waitForCompletion(true) ? 0 : 1); // write a formatter public class MboxFileFormat extends FileInputFormat<Text, Text>{ private MBoxFileReader boxFileReader = null; public RecordReader<Text, Text> createRecordReader( InputSplit inputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException { boxFileReader = new MBoxFileReader(); boxFileReader.initialize(inputSplit, attempt); return boxFileReader; } } //write a reader public class MBoxFileReader extends RecordReader<Text, Text> { public void initialize(InputSplit inputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException { .. } public boolean nextKeyValue() throws IOException, InterruptedException { ..} 18
Your Own Partioner public class IPBasedPartitioner extends Partitioner<Text, IntWritable>{ public int getPartition(Text ipAddress, IntWritable value, int numPartitions) { String region = getGeoLocation(ipAddress); if (region!=null){ return ((region.hashCode() & Integer.MAX_VALUE) % numPartitions); } return 0; } } Set the Partitioner class parameter in the job object. Job job = new Job(getConf(), "log-analysis"); …… job.setPartitionerClass(IPBasedPartitioner.class); 19
Using Distributed File Cache  Give access to a static file from a Job Job job = new Job(conf, "word count"); FileSystem fs = FileSystem.get(conf); fs.copyFromLocalFile(new Path(scriptFileLocation), new Path("/debug/fail-script")); DistributedCache.addCacheFile(mapUri, conf); DistributedCache.createSymlink(conf); 20

Introduction to Map-Reduce Programming with Hadoop

  • 1.
    Map-Reduce Programming with Hadoop CS5225Parallel and Concurrent Programming Dilum Bandara Dilum.Bandara@uom.lk Some slides adapted from Dr. Srinath Perera
  • 2.
    HDFS  HDFS –Hadoop Distributed File System  File system supported by Hadoop  Based on ideas presented in “The Google File System” Paper  Highly scalable file system for handling large data 2
  • 3.
  • 4.
    HDFS Architecture (Cont.) HDFS has master-slave architecture  Name Node – Master node  Manages file system namespace  Regulates access to files by clients  Data node  Manage storage attached to nodes  Responsible for serving read & write requests from file system’s clients  Perform block creation, deletion, & replication upon instruction from Name Node 4
  • 5.
  • 6.
    HDFS in Production Yahoo! Search Webmap is a Hadoop application  Webmap starts with every webpage crawled by Yahoo! & produces a database of all known web pages  This derived data feed to Machine Learned Ranking algorithms  Runs on 10,000+ core Linux clusters & produces data that is used in every Yahoo! Web search query  1 trillion links  Produce over 300 TB, compressed!  Over 5 Petabytes of raw disk used in production cluster 6
  • 7.
    HDFS Java Client Configurationconf = new Configuration(false); conf.addResource(new Path("/works/fsaas/hadoop-0.20.2/conf/core-site.xml")); conf.addResource(new Path("/works/fsaas/hadoop-0.20.2/conf/hdfs-site.xml")); FileSystem fs = null; fs = FileSystem.get(conf); Path filenamePath = new Path(filename); FileSystem fs = getFileSystemConnection(); if (fs.exists(filenamePath)) { // remove the file first fs.delete(filenamePath); } FSDataOutputStream out = fs.create(filenamePath); out.writeUTF(String.valueOf(currentSystemTime)); out.close(); FSDataInputStream in = fs.open(filenamePath); String messageIn = in.readUTF(); System.out.print(messageIn); in.close(); System.out.println(fs.getContentSummary(filenamePath).toString()); 7
  • 8.
    Install Hadoop  3different Options 1. Local  One JVM installation  Just Unzip 2. Pseudo Distributed  One JVM, but like distributed installation 3. Distributed Installation 8
  • 9.
    More General Map/Reduce Typically Map-Reduce implementations are bit more general 1. Formatters 2. Partition Function  Break map output across many reduce function instances 3. Map Function 4. Combine Function  If there are many map steps, this step combine the result before giving it to Reduce 5. Reduce Function 9
  • 10.
    Example – WordCount  Find words in a collection of documents & their frequency of occurrence Map(docId, text): for all terms t in text emit(t, 1); Reduce(t, values[]) int sum = 0; for all values v sum += v; emit(t, sum); 10
  • 11.
    Example – Mean Compute mean value associated with same key Map(k, value): emit(k, value); Reduce(k, values[]) int sum = 0; int count = 0; for all values v sum += v; count += 1; emit(k, sum/count); 11
  • 12.
    Example – Sorting How to sort an array of 1 million integers using Map reduce?  Partial sorts at mapper & final sort by reducer  Use of locality preserving hash function  If k1 < k2 then hash(k1) < hash(k2) Map(k, v): int val = read value from v emit(val, val); Reduce(k, values[]) emit(k, k); 12
  • 13.
    Example – InvertedIndex  Normal index is a mapping from document to terms  Inverted index is mapping from terms to documents  If we have a million documents, how do we build a inverted index using Map-Reduce? Map(docid, text): for all word w in text emit(w, docid) Reduce(w, docids[]) emit(w, docids[]); 13
  • 14.
    Example – DistributedGrep map(k, v): Id docId = .. (read file name) If (v maps grep) emit(k, (pattern, docid)) Reduce(k, values[]) emit(k, values); 14
  • 15.
    Composition with Map-Reduce Map/Reduce is not a tool to use as a fixed template  It should be used with Fork/Join, etc., to build solutions  Solution may have more than one Map/Reduce step 15
  • 16.
    Composition with Map-Reduce– Example  Calculate following for a list of million integers 16
  • 17.
    Map Reduce Client publicclass WordCountSample { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {….. } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { ..} } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCountSample.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path("/input")); FileOutputFormat.setOutputPath(conf, new Path("/output/"+ System.currentTimeMillis())); JobClient.runJob(conf); } } 17 Example: http://wiki.apache.org/hadoop/WordCount
  • 18.
    Format to ParseCustom Data //add following to the main method Job job = new Job(conf, "LogProcessingHitsByLink"); …. job.setInputFormatClass(MboxFileFormat.class); .. System.exit(job.waitForCompletion(true) ? 0 : 1); // write a formatter public class MboxFileFormat extends FileInputFormat<Text, Text>{ private MBoxFileReader boxFileReader = null; public RecordReader<Text, Text> createRecordReader( InputSplit inputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException { boxFileReader = new MBoxFileReader(); boxFileReader.initialize(inputSplit, attempt); return boxFileReader; } } //write a reader public class MBoxFileReader extends RecordReader<Text, Text> { public void initialize(InputSplit inputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException { .. } public boolean nextKeyValue() throws IOException, InterruptedException { ..} 18
  • 19.
    Your Own Partioner publicclass IPBasedPartitioner extends Partitioner<Text, IntWritable>{ public int getPartition(Text ipAddress, IntWritable value, int numPartitions) { String region = getGeoLocation(ipAddress); if (region!=null){ return ((region.hashCode() & Integer.MAX_VALUE) % numPartitions); } return 0; } } Set the Partitioner class parameter in the job object. Job job = new Job(getConf(), "log-analysis"); …… job.setPartitionerClass(IPBasedPartitioner.class); 19
  • 20.
    Using Distributed FileCache  Give access to a static file from a Job Job job = new Job(conf, "word count"); FileSystem fs = FileSystem.get(conf); fs.copyFromLocalFile(new Path(scriptFileLocation), new Path("/debug/fail-script")); DistributedCache.addCacheFile(mapUri, conf); DistributedCache.createSymlink(conf); 20