1Map-Reduce Programming & Best PracticesApache Hadoop India Summit 2011Basant VermaYahoo! India R&DFebruary 16, 2011
Hadoop Components2Client 1Client 2Processing FrameworkDFSHDFS (Hadoop Distributed File System)Modeled on GFSReliable, High Bandwidth file system that can store TB' and PB's data.Map-ReduceUsing Map/Reduce metaphor from Lisp languageA distributed processing framework paradigm that process the data stored onto HDFS in key-value.
Word Count DataFlow
Word Count$ cat ~/wikipedia.txt | \sed -e 's/ /\n/g' | grep . | \sort | \uniq -c > \~/frequencies.txt4
MR for Word-Countmapper (filename, file-contents):	for each word in file-contents:	emit (word, 1)reducer (word, values[]):	sum = 0	for each value in values:	sum = sum + value	emit (word, sum)5
MR Dataflow6
MapReduce Pipeline7
Pipeline Details8
Available tweaks and optimizations!Input to MapsMap only jobsCombinerCompressionSpeculationFault ToleranceBuffer SizeParallelism (threads) PartitionerReporterDistributedCacheTask child environment settings
Input to MapMaps should process significant amount of data to minimize the effect of overhead.Process multiple-files per map for jobs with very large number of small input files.Process large chunk of data for large scale processingUse as fewer maps to process data in parallel, as few as possible without having bad failure recovery cases.Unless the application's maps are heavily CPU bound, there is almost no reason to ever require more than 60,000-70,000 maps for a single application.10
Map only jobsRun map only job once for generating dataRun multiple jobs with different reduce implementationsMap only jobs will write directly to HDFS
CombinerProvides map-side aggregation of dataEach and every record emitted by the Mapper need not be shipped to the reducers.Reduce code can be used as combiner. Example : Word count!Helps reduce network traffic for the shuffle. Results in lesser disk space usage.However, it is important to ensure thatEnsure they really work the Combiner does provide sufficient aggregation.
CompressionMap and Reduce outputs can be compressedCompressing intermediate data will help reduce the amount of disk usage and network I/O.Compression helps reduce the total data size on the DFS.
ShuffleShuffle Phase performance depends on the crossbar between the map tasks and the reduce tasks, which must be minimized.Compression of intermediate outputUse of Combiner14
ReducesConfigure appropriate number of reducesToo few hurt the nodesToo many hurt the cross-barAll reduces must be complete in single wave.Each reduce should process at least 1-2 GB of data, and at most 5-10GB of data, in most scenarios.
PartitionerDistribute data evenly across reducesUneven distribution will hurt the whole job runtime.Default is hash partitionerhash(key)%num-reducersWhy is a custom partitioner needed? SortWordCount
OutputOutputs to a few large files, with each file spanning multiple HDFS blocks and appropriately compressed.Number of output artifacts is linearly proportionate to the number of configured reducesCompress OutputsUse appropriate file-formats for outputE.g. compressed text file is not a great idea if not using splittable codec.Consider using Hadoop ARchive (HAR) to reduce namespace usage.Think of the consumers of your data-set
SpeculationSlow running tasks can be speculatedSlowness is determined by the expected time the task will take to complete. Speculation will kick-in only when there are no pending tasks.Total number of tasks that can be speculated for a job is capped to reduce wastage.
Fault ToleranceData is stored as blocks on separate nodesNodes are composed of cheap commodity hardwareTasks are independent of each otherNew tasks can be scheduled on new nodesThe JobTracker tries 4 times (default) before giving up. Job can be configured to tolerate task failures up to N% of the total tasks.
Reporter Used to report progress to the parent processes. Commonly used when the tasks try to - Connect to a remote application like web-service, database - Do some disk intensive computation - Get blocked on some event One can also spawn a thread and make it report the progress periodically
Distributed CacheEfficient distribution of read-only files for applicationsLocalized automatically once the task is scheduled on the slave nodeCleaned up once no task running on the slave needs the cache filesDesigned for small number of mid-size files.Artifacts in the distributed-cache should not require more i/o than the actual input to the application tasks.
Few tips for better performanceIncrease the memory/buffer allocated to the tasks (io.sort.mb)?Increase the number of tasks that can be run in parallelIncrease the number of threads that serve the map outputsDisable unnecessary loggingFind the optimal value of dfs block sizeShare the cluster between the DFS and MR for data localityTurn on speculationRun reducers in one wave as they can be really costlyMake proper use of DistributedCache
Anti-PatternsProcessing thousands of small files (sized less than 1 HDFS block, typically 128MB) with one map processing a single small file. Processing very large data-sets with small HDFS block size i.e. 128MB resulting in tens of thousands of maps. Applications with a large number (thousands) of maps with a very small runtime (e.g. 5s). Straight-forward aggregations without the use of the Combiner. Applications with greater than 60,000-70,000 maps. Applications processing large data-sets with very few reduces (such as1). Applications using a single reduce for total-order amount the output records.Pig scripts processing large data-sets without using the PARALLEL keyword
Anti-Patterns (Cont…)Applications processing data with very large number of reduces, such that each reduce processes less than 1-2GB of data. Applications writing out multiple, small, output files from each reduce.Applications using the DistributedCache to distribute a large number of artifacts and/or very large artifacts (hundreds of MBs each).Applications using more than 25 counters per task.Applications performing metadata operations (e.g. listStatus) on the file-system from the map/reduce tasks.Applications doing screen-scraping of JobTracker web-ui for status of queues/jobs or worse, job-history of completed jobs.Workflows comprising of hundreds of of small jobs processing small amounts of data with a very high job submission rate.
DebuggingSide effect files : Write to external files from M/R codeWeb UI : Web UI shows stdout/stderrIsolation Runner : Run the task on the tracker where the task failed. Switch to the workspace of the task and run IsolationRunner.Debug Scripts : Upload the script to the DFS, create a symlink and pass this script in the conf file. One common use is to filter out exceptions from the logs/stderr/stdoutLocalJobRunner is used to run a MapReduce job on local node. It can be used for faster debugging and proof-of-concept.
Task child environment settingsThe child-task inherits the environment of the parent TaskTracker. The user can specify additional options to the child JVM via the mapred.child.java.optsAn example showing multiple arguments and substitutionsshowing jvm GC logging start of a passwordless JVM JMX agent so that it can connect with jconsole get the thread dumps sets the maximum heap-size of the child jvm to 512MB add an additional path to the java.library.path of the child-jvm.<property> <name>mapred.child.java.opts</name> <value>-Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false </value></property>
Checklist..Are your partitions uniform?Can you combine records at the map side?Are maps reading off a DFS block worth of data?Are you running a single reduce wave (unless the data size per reducers is too big) ?Have you tried compressing intermediate data & final data?Are your buffer sizes large enough to minimize spills but small enough to stay clear of swapping?Do you see unexplained “long tails” ? (can be mitigated via speculative execution)Are you keeping your cores busy? (via slot configuration)Is at least one system resource being loaded?
28

Apache Hadoop India Summit 2011 talk "Hadoop Map-Reduce Programming & Best Practices" by Basant Verma

  • 1.
    1Map-Reduce Programming &Best PracticesApache Hadoop India Summit 2011Basant VermaYahoo! India R&DFebruary 16, 2011
  • 2.
    Hadoop Components2Client 1Client2Processing FrameworkDFSHDFS (Hadoop Distributed File System)Modeled on GFSReliable, High Bandwidth file system that can store TB' and PB's data.Map-ReduceUsing Map/Reduce metaphor from Lisp languageA distributed processing framework paradigm that process the data stored onto HDFS in key-value.
  • 3.
  • 4.
    Word Count$ cat~/wikipedia.txt | \sed -e 's/ /\n/g' | grep . | \sort | \uniq -c > \~/frequencies.txt4
  • 5.
    MR for Word-Countmapper(filename, file-contents): for each word in file-contents: emit (word, 1)reducer (word, values[]): sum = 0 for each value in values: sum = sum + value emit (word, sum)5
  • 6.
  • 7.
  • 8.
  • 9.
    Available tweaks andoptimizations!Input to MapsMap only jobsCombinerCompressionSpeculationFault ToleranceBuffer SizeParallelism (threads) PartitionerReporterDistributedCacheTask child environment settings
  • 10.
    Input to MapMapsshould process significant amount of data to minimize the effect of overhead.Process multiple-files per map for jobs with very large number of small input files.Process large chunk of data for large scale processingUse as fewer maps to process data in parallel, as few as possible without having bad failure recovery cases.Unless the application's maps are heavily CPU bound, there is almost no reason to ever require more than 60,000-70,000 maps for a single application.10
  • 11.
    Map only jobsRunmap only job once for generating dataRun multiple jobs with different reduce implementationsMap only jobs will write directly to HDFS
  • 12.
    CombinerProvides map-side aggregationof dataEach and every record emitted by the Mapper need not be shipped to the reducers.Reduce code can be used as combiner. Example : Word count!Helps reduce network traffic for the shuffle. Results in lesser disk space usage.However, it is important to ensure thatEnsure they really work the Combiner does provide sufficient aggregation.
  • 13.
    CompressionMap and Reduceoutputs can be compressedCompressing intermediate data will help reduce the amount of disk usage and network I/O.Compression helps reduce the total data size on the DFS.
  • 14.
    ShuffleShuffle Phase performancedepends on the crossbar between the map tasks and the reduce tasks, which must be minimized.Compression of intermediate outputUse of Combiner14
  • 15.
    ReducesConfigure appropriate numberof reducesToo few hurt the nodesToo many hurt the cross-barAll reduces must be complete in single wave.Each reduce should process at least 1-2 GB of data, and at most 5-10GB of data, in most scenarios.
  • 16.
    PartitionerDistribute data evenlyacross reducesUneven distribution will hurt the whole job runtime.Default is hash partitionerhash(key)%num-reducersWhy is a custom partitioner needed? SortWordCount
  • 17.
    OutputOutputs to afew large files, with each file spanning multiple HDFS blocks and appropriately compressed.Number of output artifacts is linearly proportionate to the number of configured reducesCompress OutputsUse appropriate file-formats for outputE.g. compressed text file is not a great idea if not using splittable codec.Consider using Hadoop ARchive (HAR) to reduce namespace usage.Think of the consumers of your data-set
  • 18.
    SpeculationSlow running taskscan be speculatedSlowness is determined by the expected time the task will take to complete. Speculation will kick-in only when there are no pending tasks.Total number of tasks that can be speculated for a job is capped to reduce wastage.
  • 19.
    Fault ToleranceData isstored as blocks on separate nodesNodes are composed of cheap commodity hardwareTasks are independent of each otherNew tasks can be scheduled on new nodesThe JobTracker tries 4 times (default) before giving up. Job can be configured to tolerate task failures up to N% of the total tasks.
  • 20.
    Reporter Used toreport progress to the parent processes. Commonly used when the tasks try to - Connect to a remote application like web-service, database - Do some disk intensive computation - Get blocked on some event One can also spawn a thread and make it report the progress periodically
  • 21.
    Distributed CacheEfficient distributionof read-only files for applicationsLocalized automatically once the task is scheduled on the slave nodeCleaned up once no task running on the slave needs the cache filesDesigned for small number of mid-size files.Artifacts in the distributed-cache should not require more i/o than the actual input to the application tasks.
  • 22.
    Few tips forbetter performanceIncrease the memory/buffer allocated to the tasks (io.sort.mb)?Increase the number of tasks that can be run in parallelIncrease the number of threads that serve the map outputsDisable unnecessary loggingFind the optimal value of dfs block sizeShare the cluster between the DFS and MR for data localityTurn on speculationRun reducers in one wave as they can be really costlyMake proper use of DistributedCache
  • 23.
    Anti-PatternsProcessing thousands ofsmall files (sized less than 1 HDFS block, typically 128MB) with one map processing a single small file. Processing very large data-sets with small HDFS block size i.e. 128MB resulting in tens of thousands of maps. Applications with a large number (thousands) of maps with a very small runtime (e.g. 5s). Straight-forward aggregations without the use of the Combiner. Applications with greater than 60,000-70,000 maps. Applications processing large data-sets with very few reduces (such as1). Applications using a single reduce for total-order amount the output records.Pig scripts processing large data-sets without using the PARALLEL keyword
  • 24.
    Anti-Patterns (Cont…)Applications processingdata with very large number of reduces, such that each reduce processes less than 1-2GB of data. Applications writing out multiple, small, output files from each reduce.Applications using the DistributedCache to distribute a large number of artifacts and/or very large artifacts (hundreds of MBs each).Applications using more than 25 counters per task.Applications performing metadata operations (e.g. listStatus) on the file-system from the map/reduce tasks.Applications doing screen-scraping of JobTracker web-ui for status of queues/jobs or worse, job-history of completed jobs.Workflows comprising of hundreds of of small jobs processing small amounts of data with a very high job submission rate.
  • 25.
    DebuggingSide effect files: Write to external files from M/R codeWeb UI : Web UI shows stdout/stderrIsolation Runner : Run the task on the tracker where the task failed. Switch to the workspace of the task and run IsolationRunner.Debug Scripts : Upload the script to the DFS, create a symlink and pass this script in the conf file. One common use is to filter out exceptions from the logs/stderr/stdoutLocalJobRunner is used to run a MapReduce job on local node. It can be used for faster debugging and proof-of-concept.
  • 26.
    Task child environmentsettingsThe child-task inherits the environment of the parent TaskTracker. The user can specify additional options to the child JVM via the mapred.child.java.optsAn example showing multiple arguments and substitutionsshowing jvm GC logging start of a passwordless JVM JMX agent so that it can connect with jconsole get the thread dumps sets the maximum heap-size of the child jvm to 512MB add an additional path to the java.library.path of the child-jvm.<property> <name>mapred.child.java.opts</name> <value>-Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false </value></property>
  • 27.
    Checklist..Are your partitionsuniform?Can you combine records at the map side?Are maps reading off a DFS block worth of data?Are you running a single reduce wave (unless the data size per reducers is too big) ?Have you tried compressing intermediate data & final data?Are your buffer sizes large enough to minimize spills but small enough to stay clear of swapping?Do you see unexplained “long tails” ? (can be mitigated via speculative execution)Are you keeping your cores busy? (via slot configuration)Is at least one system resource being loaded?
  • 28.

Editor's Notes

  • #10 (&gt;90% of map tasks are data local)(10X gain with the use of Combiner)
  • #19 Check the speculation formula and update
  • #26 IsolationRunner is intended to facilitate debugging by re-running a specific task, given left-over task files for a (typically failed) past jobCurrently, it is limited to re-running map tasks.mapreduce.task.files.preserve.failedtasks