kangfoo's blog

工作学习笔记,生活掠影。

Hadoop MapReduce RecordReader 组件

| 评论

由 RecordReader 决定每次读取以什么样的方式读取数据分片中的一条数据。Hadoop 默认的 RecordReader 是 LineRecordReader(TextInputFormat 的 getRecordReader() 方法返回即是 LineRecordReader。二进制输入 SequenceFileInputFormat 的 getRecordReader() 方法返回即是SequenceFileRecordReader。)。LineRecordReader是用每行的偏移量作为 map 的 key,每行的内容作为 map 的 value;

它可作用于,自定义读取每一条记录的方式;自定义读入 key 的类型,如希望读取的 key 是文件的路径或名字而不是该行在文件中的偏移量。

自定义RecordReader一般步骤

  1. 继承抽象类 RecordReader,实现 RecordReader 的实例;
  2. 实现自定义 InputFormat 类,重写 InputFormat 中 createRecordReader() 方法,返回值是自定义的 RecordReader 实例; (3)配置 job.setInputFormatClass() 设置自定义的 InputFormat 类型;

TextInputFormat类源代码理解

源码见 org.apache.mapreduce.lib.input.TextInputFormat 类(新API);

Hadoop 默认 TextInputFormat 使用 LineRecordReader。具体分析见注释。

  public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    return new LineRecordReader();
  }
// --> LineRecordReader
 public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                    Integer.MAX_VALUE);
    start = split.getStart();  // 当前分片在整个文件中的起始位置
    end = start + split.getLength(); // 当前分片,在整个文件的位置
    final Path file = split.getPath();
    compressionCodecs = new CompressionCodecFactory(job);// 压缩
    codec = compressionCodecs.getCodec(file);
//
    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath()); // 获取 FSDataInputStream
//
    if (isCompressedInput()) {
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        in = new LineReader(cIn, job); //一行行读取
        start = cIn.getAdjustedStart(); // 可能跨分区读取
        end = cIn.getAdjustedEnd();// 可能跨分区读取
        filePosition = cIn;
      } else {
        in = new LineReader(codec.createInputStream(fileIn, decompressor),
            job);
        filePosition = fileIn;
      }
    } else {
      fileIn.seek(start);//  调整到文件起始偏移量
      in = new LineReader(fileIn, job); 
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start; // 在当前分片的位置
  }
//  --> getFilePosition() 指针读取到哪个位置
// filePosition 为 Seekable 类型
  private long getFilePosition() throws IOException {
    long retVal;
    if (isCompressedInput() && null != filePosition) {
      retVal = filePosition.getPos();
    } else {
      retVal = pos;
    }
    return retVal;
  }
//
// --> nextKeyValue() 
public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    // 预读取下一条纪录
    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      pos += newSize; // 下一行的偏移量
      if (newSize < maxLineLength) {
        break;
      }
//
      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

自定义 RecordReader 演示

假设,现有如下数据 10 ~ 70 需要利用自定义 RecordReader 组件分别计算数据奇数行和偶数行的数据之和。结果为:奇数行之和等于 160,偶数和等于 120。出自于 开源力量 LouisT 老师的开源力量培训课-Hadoop Development课件。

数据:
10
20
30
40
50
60
70

源代码

TestRecordReader.java

数据准备

$ ./bin/hadoop fs -mkdir /inputreader
$ ./bin/hadoop fs -put ./a.txt /inputreader
$ ./bin/hadoop fs -lsr /inputreader
-rw-r--r--   2 hadoop supergroup         21 2014-02-20 21:04 /inputreader/a.txt

执行

$ ./bin/hadoop jar study.hdfs-0.0.1-SNAPSHOT.jar TestRecordReader  /inputreader /inputreaderout1
##
$ ./bin/hadoop fs -lsr /inputreaderout1
-rw-r--r--   2 hadoop supergroup          0 2014-02-20 21:12 /inputreaderout1/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 2014-02-20 21:11 /inputreaderout1/_logs
drwxr-xr-x   - hadoop supergroup          0 2014-02-20 21:11 /inputreaderout1/_logs/history
-rw-r--r--   2 hadoop supergroup      16451 2014-02-20 21:11 /inputreaderout1/_logs/history/job_201402201934_0002_1392901901142_hadoop_TestRecordReader
-rw-r--r--   2 hadoop supergroup      48294 2014-02-20 21:11 /inputreaderout1/_logs/history/job_201402201934_0002_conf.xml
-rw-r--r--   2 hadoop supergroup         23 2014-02-20 21:12 /inputreaderout1/part-r-00000
-rw-r--r--   2 hadoop supergroup         23 2014-02-20 21:12 /inputreaderout1/part-r-00001
##
$ ./bin/hadoop fs -cat /inputreaderout1/part-r-00000
偶数行之和:  120
##
$ ./bin/hadoop fs -cat /inputreaderout1/part-r-00001
奇数行之和:  160

Hadoop MapReduce Partitioner 组件

| 评论

Partitioner 过程发生在循环缓冲区发生溢写文件之后,merge 之前。可以让 Map 对 Key 进行分区,从而可以根据不同的 key 来分发到不同的 reducer 中去处理;

Hadoop默认的提供的是HashPartitioner。

可以自定义 key 的分发规则,自定义Partitioner:

  • 继承抽象类Partitioner,实现自定义的getPartition()方法;
  • 通过job.setPartitionerClass()来设置自定义的Partitioner;

Partitioner 类

旧api

public interface Partitioner<K2, V2> extends JobConfigurable {
  int getPartition(K2 key, V2 value, int numPartitions);
}

新api

public abstract class Partitioner<KEY, VALUE> {
  public abstract int getPartition(KEY key, VALUE value, int numPartitions);  
}

Partitioner应用场景演示

需求:利用 Hadoop MapReduce 作业 Partitioner 组件分别统计每种商品的周销售情况。源代码 TestPartitioner.java出自于 开源力量 LouisT 老师的开源力量培训课-Hadoop Development课件。 (可使用 PM2.5 数据代替此演示程序)

  • site1的周销售清单(a.txt,以空格分开):

    shoes   20
    hat 10
    stockings   30
    clothes 40
    
  • site2的周销售清单(b.txt,以空格分开):

    shoes   15
    hat 1
    stockings   90
    clothes 80
    
  • 汇总结果:

    shoes     35
    hat       11
    stockings 120
    clothes   120
    
  • 准备测试数据

    $ ./bin/hadoop fs -mkdir /testPartitioner/input
    $ ./bin/hadoop fs -put a.txt /testPartitioner/input
    $ ./bin/hadoop fs -put b.txt /testPartitioner/input
    $ ./bin/hadoop fs -lsr /testPartitioner/input
    -rw-r--r--   2 hadoop supergroup         52 2014-02-18 22:53 /testPartitioner/input/a.txt
    -rw-r--r--   2 hadoop supergroup         50 2014-02-18 22:53 /testPartitioner/input/b.txt
    
  • 执行 MapReduce 作业 此处使用 hadoop jar 命令执行,eclipse 插件方式有一定的缺陷。(hadoop eclipse 执行出现java.io.IOException: Illegal partition for hat (1))

    $ ./bin/hadoop jar study.hdfs-0.0.1-SNAPSHOT.jar TestPartitioner /testPartitioner/input /testPartitioner/output10
    
  • 结果。 四个分区,分别存储上述四种产品的总销量的统计结果值。

    -rw-r--r--   2 hadoop supergroup          9 2014-02-19 00:18 /testPartitioner/output10/part-r-00000
    -rw-r--r--   2 hadoop supergroup          7 2014-02-19 00:18 /testPartitioner/output10/part-r-00001
    -rw-r--r--   2 hadoop supergroup         14 2014-02-19 00:18 /testPartitioner/output10/part-r-00002
    -rw-r--r--   2 hadoop supergroup         12 2014-02-19 00:18 /testPartitioner/output10/part-r-00003
    

Hadoop MapReduce Combiner 组件

| 评论

combiner 作用是把一个 map 产生的多个 合并成一个新的 ,然后再将新的 作为 reduce 的输入;

combiner 函数在 map 函数与 reduce 函数之间,目的是为了减少 map 输出的中间结果,减少 reduce 复制 map 输出的数据,减少网络传输负载;

并不是所有情况下都能使用 Combiner 组件,它适用于对记录汇总的场景(如求和,平均数不适用)

什么时候运行 Combiner

  • 当 job 设置了 Combiner,并且 spill 的个数达到 min.num.spill.for.combine (默认是3)的时候,那么 combiner 就会 Merge 之前执行;
  • 但是有的情况下,Merge 开始执行,但 spill 文件的个数没有达到需求,这个时候 Combiner 可能会在Merge 之后执行;
  • Combiner 也有可能不运行,Combiner 会考虑当时集群的一个负载情况。

测试 Combinner 过程

代码 TestCombiner

  1. 以 wordcount.txt 为输入的词频统计

    $ ./bin/hadoop fs -lsr /test3/input
    drwxr-xr-x   - hadoop supergroup          0 2014-02-18 00:28 /test3/input/test
    -rw-r--r--   2 hadoop supergroup        983 2014-02-18 00:28 /test3/input/test/wordcount.txt
    -rw-r--r--   2 hadoop supergroup        626 2014-02-18 00:28 /test3/input/test/wordcount2.txt
    
  2. 不启用 Reducer (输出,字节变大)

    drwxr-xr-x   - kangfoo-mac supergroup          0 2014-02-18 00:29 /test3/output1
    -rw-r--r--   3 kangfoo-mac supergroup          0 2014-02-18 00:29 /test3/output1/_SUCCESS
    -rw-r--r--   3 kangfoo-mac supergroup       1031 2014-02-18 00:29 /test3/output1/part-m-00000 (-m 没有 reduce 过程的中间结果,每个数据文件对应一个数据分片,每个分片对应一个map任务)
    -rw-r--r--   3 kangfoo-mac supergroup        703 2014-02-18 00:29 /test3/output1/part-m-00001
    

    结果如下(map过程并不合并相同key的value值):

    drwxr-xr-x  1
    -  1
    hadoop  1
    supergroup  1
    0   1
    2014-02-17  1
    21:03   1
    /home/hadoop/env/mapreduce  1
    drwxr-xr-x  1
    -  1
    hadoop  1
    
  3. 启用 Reducer

    drwxr-xr-x   - kangfoo-mac supergroup          0 2014-02-18 00:29 /test3/output1
    -rw-r--r--   3 kangfoo-mac supergroup          0 2014-02-18 00:29 /test3/output1/_SUCCESS
    -rw-r--r--   3 kangfoo-mac supergroup       1031 2014-02-18 00:29 /test3/output1/part-m-00000
    -rw-r--r--   3 kangfoo-mac supergroup        703 2014-02-18 00:29 /test3/output1/part-m-00001
    drwxr-xr-x   - kangfoo-mac supergroup          0 2014-02-18 00:31 /test3/output2
    -rw-r--r--   3 kangfoo-mac supergroup          0 2014-02-18 00:31 /test3/output2/_SUCCESS
    -rw-r--r--   3 kangfoo-mac supergroup        705 2014-02-18 00:31 /test3/output2/part-r-00000
    

    结果:

    0:17:31,680 6
    014-02-18   1
    2014-02-17  11
    2014-02-18  5
    21:02   7
    
  4. 在日志或者 http://master11:50030/jobtracker.jsp 页面查找是否执行过 Combine 过程。 日志截取如下:

    2014-02-18 00:31:29,894 INFO  SPLIT_RAW_BYTES=233
    2014-02-18 00:31:29,894 INFO  Combine input records=140
    2014-02-18 00:31:29,894 INFO  Reduce input records=43
    2014-02-18 00:31:29,894 INFO  Reduce input groups=42
    2014-02-18 00:31:29,894 INFO  Combine output records=43
    2014-02-18 00:31:29,894 INFO  Reduce output records=42
    2014-02-18 00:31:29,894 INFO  Map output records=140
    

Hadoop MapReduce 类型与格式

| 评论

MapReduce 的 map和reduce函数的输入和输出是键/值对(key/value pair) 形式的数据处理模型。

MapReduce 的类型

Hadoop1.x MapReduce 有2套API.旧api偏向与接口,新api偏向与抽象类,如无特殊默认列举为旧的api作讨论.

在Hadoop的MapReduce中,map和reduce函数遵循如下格式:

  • map(K1, V1) –> list (K2, V2) // map:对输入分片数据进行过滤数据,组织 key/value 对等操作
  • combine(K2, list(V2)) –> list(K2, V2) // 在map端对输出进行预处理,类似 reduce。combine 不一定适用任何情况,如:对总和求平均数。选用。
  • partition(K2, V2) –> integer // 将中间键值对划分到一个 reduce 分区,返回分区索引号。实际上,分区单独由键决定(值是被忽略的),分区内的键会排序,相同的键的所有值会合成一个组(list(V2))
  • reduce(K2, list(V2)) –> list(K3, V3) // 每个 reduce 会处理具有某些特性的键,每个键上都有值的序列,是通过对所有 map 输出的值进行统计得来的,reduce 根据所有map传来的结果,最后进行统计合并操作,并输出结果。

旧api类代码

public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {  
  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) throws IOException;
}
//
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
  void reduce(K2 key, Iterator<V2> values, OutputCollector<K3, V3> output, Reporter reporter) throws IOException;
}
//
public interface Partitioner<K2, V2> extends JobConfigurable {
   int getPartition(K2 key, V2 value, int numPartitions);
}

新api类代码

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
… …
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }
… …
}
//
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
… …
 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }
… …
}
//
public interface Partitioner<K2, V2> extends JobConfigurable {
  int getPartition(K2 key, V2 value, int numPartitions);
}

默认的 partitioner 是 HashPartitioner,对键进行哈希操作以决定该记录属于哪个分区让 reduce 处理,每个分区对应一个 reducer 任务。总槽数 solt=集群中节点数 * 每个节点的任务槽。实际值应该比理论值要小,以空闲一部分在错误容忍是备用。

HashPartitioner的实现

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

hadooop1.x 版本中

  • 旧的api,map 默认的 IdentityMapper, reduce 默认的是 IdentityReducer
  • 新的api,map 默认的 Mapper, reduce 默认的是 Reducer

默认MapReduce函数实例程序

public class MinimalMapReduceWithDefaults extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
            }
        //
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(Mapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);
        job.setReducerClass(Reducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        return job.waitForCompletion(true) ? 0 : 1;
        }
    //
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args);
        System.exit(exitCode);
        }
}

输入格式

输入分片与记录

一个输入分片(input split)是由单个 map 处理的输入块,即每一个 map 只处理一个输入分片,每个分片被划分为若干个记录( records ),每条记录就是一个 key/value 对,map 一个接一个的处理每条记录,输入分片和记录都是逻辑的,不必将他们对应到文件上。数据分片由数据块大小决定的。

注意,一个分片不包含数据本身,而是指向数据的引用( reference )。

输入分片在Java中被表示为InputSplit抽象类

public interface InputSplit extends Writable {
  long getLength() throws IOException;
  String[] getLocations() throws IOException;
}

InputFormat负责创建输入分片并将它们分割成记录,抽象类如下:

public interface InputFormat<K, V> {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job, 
                                     Reporter reporter) throws IOException;
}

客户端通过调用 getSpilts() 方法获得分片数目(怎么调到的?),在 TaskTracker 或 NodeManager上,MapTask 会将分片信息传给 InputFormat 的 createRecordReader() 方法,进而这个方法来获得这个分片的 RecordReader,RecordReader 基本就是记录上的迭代器,MapTask 用一个 RecordReader 来生成记录的 key/value 对,然后再传递给 map 函数,如下步骤:

  1. jobClient调用getSpilts()方法获得分片数目,将numSplits作为参数传入,以参考。InputFomat实现有自己的getSplits()方法。
  2. 客户端将他们发送到jobtracker
  3. jobtracker使用其存储位置信息来调度map任务从而在tasktracker上处理分片数据
  4. 在tasktracker上,map任务把输入分片传给InputFormat上的getRecordReader()方法,来获取分片的RecordReader。
  5. map 用一个RecordReader来生成纪录的键值对。
  6. RecordReader的next()方法被调用,知道返回false。map任务结束。

MapRunner 类部分代码(旧api)

public class MapRunner<K1, V1, K2, V2>
    implements MapRunnable<K1, V1, K2, V2> {
… … 
 public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
                  Reporter reporter)
    throws IOException {
    try {
      // allocate key & value instances that are re-used for all entries
      K1 key = input.createKey();
      V1 value = input.createValue();
      //
      while (input.next(key, value)) {
        // map pair to output
        mapper.map(key, value, output, reporter);
        if(incrProcCount) {
          reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
              SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
        }
      }
    } finally {
      mapper.close();
    }
  }
……
}

FileInputFormat类

FileInputFormat是所有使用文件为数据源的InputFormat实现的基类,它提供了两个功能:一个定义哪些文件包含在一个作业的输入中;一个为输入文件生成分片的实现,把分片割成记录的作业由其子类来完成。

下图为InputFormat类的层次结构image

FileInputFormat 类输入路径

FileInputFormat 提供四种静态方法来设定 Job 的输入路径,其中下面的 addInputPath() 方法 addInputPaths() 方法可以将一个或多个路径加入路径列表,setInputPaths() 方法一次设定完整的路径列表(可以替换前面所设路 径)

public static void addInputPath(Job job, Path path);
public static void addInputPaths(Job job, String commaSeparatedPaths);
public static void setInputPaths(Job job, Path... inputPaths);
public static void setInputPaths(Job job, String commaSeparatedPaths);

如果需要排除特定文件,可以使用 FileInputFormat 的 setInputPathFilter() 设置一个过滤器: public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter); 它默认过滤隐藏文件中以”_“和”.“开头的文件

  private static final PathFilter hiddenFileFilter = new PathFilter(){
      public boolean accept(Path p){
        String name = p.getName(); 
        return !name.startsWith("_") && !name.startsWith("."); 
      }
    }; 

FileInputFormat 类的输入分片

FileInputFormat 类一般分割超过 HDFS 块大小的文件。通常分片与 HDFS 块大小一样,然后分片大小也可以改变的,下面展示了控制分片大小的属性:

待补。 TODO

FileInputFormat computeSplitSize(long goalSize, long minSize,long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
}

即: minimumSize < blockSize < maximumSize 分片的大小即为块大小。

重载 FileInputFormat 的 isSplitable() =false 可以避免 mapreduce 输入文件被分割。

小文件与CombineFileInputFormat

  1. CombineFileInputFormat 是针对小文件设计的,CombineFileInputFormat 会把多个文件打包到一个分片中,以便每个 mapper 可以处理更多的数据;减少大量小文件的另一种方法可以使用 SequenceFile 将这些小文件合并成一个或者多个大文件。

  2. CombineFileInputFormat 不仅对于处理小文件实际上对于处理大文件也有好处,本质上,CombineFileInputFormat 使 map 操作中处理的数据量与 HDFS 中文件的块大小之间的耦合度降低了

  3. CombineFileInputFormat 是一个抽象类,没有提供实体类,所以需要实现一个CombineFileInputFormat 具体 类和 getRecordReader() 方法(旧的接口是这个方法,新的接口InputFormat中则是createRecordReader())

把整个文件作为一条记录处理

有时,mapper 需要访问问一个文件中的全部内容。即使不分割文件,仍然需要一个 RecordReader 来读取文件内容为 record 的值,下面给出实现这个功能的完整程序,详细解释见《Hadoop权威指南》。

文本处理

  1. TextInputFileFormat 是默认的 InputFormat,每一行就是一个纪录

  2. TextInputFileFormat 的 key 是 LongWritable 类型,存储该行在整个文件的偏移量,value 是每行的数据内容,不包括任何终止符(换行符和回车符),它是Text类型. 如下例 On the top of the Crumpetty Tree

    The Quangle Wangle sat,
    But his face you could not see,
    On account of his Beaver Hat.
    每条记录表示以下key/value对
    (0, On the top of the Crumpetty Tree)
    (33, The Quangle Wangle sat,)
    (57, But his face you could not see,)
    (89, On account of his Beaver Hat.

  3. 输入分片与 HDFS 块之间的关系:TextInputFormat 每一条纪录就是一行,很可能某一行跨数据库存放。

image

  1. KeyValueTextInputFormat。对下面的文本,KeyValueTextInputFormat 比较适合处理,其中可以通过 mapreduce.input.keyvaluelinerecordreader.key.value.separator 属性设置指定分隔符,默认 值为制表符,以下指定”→“为分隔符
    line1→On the top of the Crumpetty Tree
    line2→The Quangle Wangle sat,
    line3→But his face you could not see,
    line4→On account of his Beaver Hat.

  2. NLineInputFormat。如果希望 mapper 收到固定行数的输入,需要使用 NLineInputFormat 作为 InputFormat 。与 TextInputFormat 一样,key是文件中行的字节偏移量,值是行本身。

N 是每个 mapper 收到的输入行数,默认时 N=1,每个 mapper 会正好收到一行输入,mapreduce.input.lineinputformat.linespermap 属性控制 N 的值。以刚才的文本为例。 如果N=2,则每个输入分片包括两行。第一个 mapper 会收到前两行 key/value 对:

(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
另一个mapper则收到:
(57, But his face you could not see,)
(89, On account of his Beaver Hat.)

二进制输入

SequenceFileInputFormat 如果要用顺序文件数据作为 MapReduce 的输入,应用 SequenceFileInputFormat。key 和 value 顺序文件,所以要保证map输入的类型匹配

SequenceFileInputFormat 可以读 MapFile 和 SequenceFile,如果在处理顺序文件时遇到目录,SequenceFileInputFormat 类会认为值正在读 MapFile 数据文件。

SequenceFileAsTextInputFormat 是 SequenceFileInputFormat 的变体。将顺序文件(其实就是SequenceFile)的 key 和 value 转成 Text 对象

SequenceFileAsBinaryInputFormat是 SequenceFileInputFormat 的变体。将顺序文件的key和value作为二进制对象

多种输入

对于不同格式,不同表示的文本文件输出的处理,可以用 MultipleInputs 类里处理,它允许为每条输入路径指定 InputFormat 和 Mapper。

MultipleInputs 类有一个重载版本的 addInputPath()方法:

  • 旧api列举
    public static void addInputPath(JobConf conf, Path path, Class<? extends InputFormat> inputFormatClass) 
    
  • 新api列举
    public static void addInputPath(Job job, Path path, Class<? extends InputFormat> inputFormatClass) 
    
    在有多种输入格式只有一个mapper时候(调用Job的setMapperClass()方法),这个方法会很有用。

DBInputFormat

JDBC从关系数据库中读取数据的输入格式(参见权威指南)

输出格式

OutputFormat类的层次结构

image

文本输出

默认输出格式是 TextOutputFormat,它本每条记录写成文本行,key/value 任意,这里 key和value 可以用制表符分割,用 mapreduce.output.textoutputformat.separator 书信可以改变制表符,与TextOutputFormat 对应的输入格式是 KeyValueTextInputFormat。

可以使用 NullWritable 来省略输出的 key 和 value。

二进制输出

  • SequenceFileOutputFormat 将它的输出写为一个顺序文件,因为它的格式紧凑,很容易被压缩,所以易于作为 MapReduce 的输入
  • 把key/value对作为二进制格式写到一个 SequenceFile 容器中
  • MapFileOutputFormat 把 MapFile 作为输出,MapFile 中的 key 必需顺序添加,所以必须确保 reducer 输出的 key 已经排好序。

多个输出

  • MultipleOutputFormat 类可以将数据写到多个文件中,这些文件名称源于输出的键和值。MultipleOutputFormat是个抽象类,它有两个子类:MultipleTextOutputFormatMultipleSequenceFileOutputFormat 。它们是 TextOutputFormat 的和 SequenceOutputFormat 的多版本。

  • MultipleOutputs 类 用于生成多个输出的库,可以为不同的输出产生不同的类型,无法控制输出的命名。它用于在原有输出基础上附加输出。输出是制定名称的。

MultipleOutputFormat和MultipleOutputs的区别

这两个类库的功能几乎相同。MultipleOutputs 功能更齐全,但 MultipleOutputFormat 对 目录结构和文件命令更多de控制。

特征 MultipleOutputFormat MultipleOutputs
完全控制文件名和目录名
不同输出有不同的键和值类型
从同一作业的map和reduce使用
每个纪录多个输出
与任意OutputFormat一起使用 否,需要子类

延时输出

有些文件应用倾向于不创建空文件,此时就可以利用 LazyOutputFormat (Hadoop 0.21.0版本之后开始提供),它是一个封装输出格式,可以保证指定分区第一条记录输出时才真正的创建文件,要使用它,用JobConf和相关输出格式作为参数来调用 setOutputFormatClass() 方法.

Streaming 和 Pigs 支持 -LazyOutput 选项来启用 LazyOutputFormat功能。

数据库输出

参见 关系数据和 HBase的输出格式。

练习代码

代码路径 https://github.com/kangfoo/hadoop1.study/blob/master/kangfoo/study.hdfs/src/main/java/com/kangfoo/study/hadoop1/mp/typeformat
使用 maven 打包之后用 hadoop jar 命令执行
步骤同 Hadoop example jar 类

  1. 使用 TextInputFormat 类型测试 wordcount TestMapreduceInputFormat 上传一个文件

    $ ./bin/hadoop fs -mkdir /test/input1
    $ ./bin/hadoop fs -put ./wordcount.txt /test/input1
    

    使用maven 打包 或者用eclipse hadoop 插件, 执行主函数时设置如下参数

    hdfs://master11:9000/test/input1/wordcount.txt hdfs://master11:9000/numbers.seq hdfs://master11:9000/test/output5
    

    没改过端口默认 namenode RPC 交互端口 8020 将上述的 9000 改成你自己的端口即可。
    部分日志

    ## 准备运行程序和测试数据
    lrwxrwxrwx.  1 hadoop hadoop      86 2月  17 21:02 study.hdfs-0.0.1-SNAPSHOT.jar -> /home/hadoop/env/kangfoo.study/kangfoo/study.hdfs/target/study.hdfs-0.0.1-SNAPSHOT.jar
    -rw-rw-r--.  1 hadoop hadoop    1983 2月  17 20:18 wordcount.txt
    ##执行
    $ ./bin/hadoop jar study.hdfs-0.0.1-SNAPSHOT.jar TestMapreduceInputFormat /test/input1/wordcount.txt /test/output1
    
  2. 使用SequenceInputFormat类型测试wordcound 使用Hadoop权威指南中的示例创建 /numbers.seq 文件

    $ ./bin/hadoop fs -text /numbers.seq
    $ ./bin/hadoop jar study.hdfs-0.0.1-SNAPSHOT.jar TestMapreduceSequenceInputFormat /numbers.seq /test/output2
    
  3. 多文件输入

    $  ./bin/hadoop jar study.hdfs-0.0.1-SNAPSHOT.jar TestMapreduceMultipleInputs /test/input1/wordcount.txt /numbers.seq /test/output3
    

博客参考

淘宝博客

Hadoop MapReduce 工作机制

| 评论

工作流程

  1. 作业配置
  2. 作业提交
  3. 作业初始化
  4. 作业分配
  5. 作业执行
  6. 进度和状态更新
  7. 作业完成
  8. 错误处理
  9. 作业调度
  10. shule(mapreduce核心)和sort

作业配置

相对不难理解。 具体略。

作业提交

image

首先熟悉上图,4个实例对象: client jvm、jobTracker、TaskTracker、SharedFileSystem

MapReduce 作业可以使用 JobClient.runJob(conf) 进行 job 的提交。如上图,这个执行过程主要包含了4个独立的实例。

  • 客户端。提交MapReduce作业。
  • jobtracker:协调作业的运行。jobtracker一个java应用程序。
  • tasktracker:运行作业划分后的任务。tasktracker一个java应用程序。
  • shared filesystem(分布式文件系统,如:HDFS)

以下是Hadoop1.x 中旧版本的 MapReduce JobClient API. org.apache.hadoop.mapred.JobClient

/** JobClient is the primary interface for the user-job to interact with the JobTracker. JobClient provides facilities to submit jobs, track their progress, access component-tasks' reports/logs, get the Map-Reduce cluster status information etc.
The job submission process involves:
Checking the input and output specifications of the job.
Computing the InputSplits for the job.
Setup the requisite accounting information for the DistributedCache of the job, if necessary.
Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
Submitting the job to the JobTracker and optionally monitoring it's status.
Normally the user creates the application, describes various facets of the job via JobConf and then uses the JobClient to submit the job and monitor its progress. */ 
Here is an example on how to use JobClient:
     // Create a new JobConf
     JobConf job = new JobConf(new Configuration(), MyJob.class);
     // Specify various job-specific parameters     
     job.setJobName("myjob");
     job.setInputPath(new Path("in"));
     job.setOutputPath(new Path("out"));
     job.setMapperClass(MyJob.MyMapper.class);
     job.setReducerClass(MyJob.MyReducer.class);
     // Submit the job, then poll for progress until the job is complete
     JobClient.runJob(job);   
// JobClient.runJob(job) --> JobClient. submitJob(job) -->  submitJobInternal(job) 

新API放在 org.apache.hadoop.mapreduce.* 包下. 使用 Job 类代替 JobClient。又由job.waitForCompletion(true) 内部进行 JobClient.submitJobInternal() 封装。

新旧API请参考博文 Hadoop编程笔记(二):Hadoop新旧编程API的区别

hadoop1.x 旧 API JobClient.runJob(job) 调用submitJob() 之后,便每秒轮询作业进度monitorAndPrintJob。并将其进度、执行结果信息打印到控制台上。

接着再看看 JobClient 的 submitJob() 方法的实现基本过程。上图步骤 2,3,4.

  1. 向 jobtracker 请求一个新的 jobId. (JobID jobId = jobSubmitClient.getNewJobId(); void org.apache.hadoop.mapred.JobClient.init(JobConf conf) throws IOException , 集群环境下是 RPC JobSubmissionProtocol 代理。本地环境使用 LocalJobRunner。

  2. 检查作业的相关的输出路径并提交 job 以及相关的 jar 到 job tracker, 相关的 libjar 通过distributedCache 传递给 jobtracker.

    submitJobInternal(… …); 
    // -->
    copyAndConfigureFiles(jobCopy, submitJobDir); 
    // --> 
    copyAndConfigureFiles(job, jobSubmitDir, replication); 
    … 
    // --> 
    output.checkOutputSpecs(context);
    
  3. 计算作业的分片。将 SplitMetaInfo 信息写入 JobSplit。 Maptask 的个数 = 输入的文件大小除以块的大小。

    int maps = writeSplits(context, submitJobDir);
    (JobConf)jobCopy.setNumMapTasks(maps);
    // --> 
    maps = writeNewSplits(job, jobSubmitDir); 
    // --> (重写,要详细)
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array); // List<InputSplit> splits = input.getSplits(job); 
    // -->
    SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
    
  4. 写JobConf信息到配置文件 job.xml。 jobCopy.writeXml(out);

  5. 准备提交job。 RPC 通讯到 JobTracker 或者 LocalJobRunner.

    jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials());
    

作业初始化

  1. 当 JobTracker 接收到了 submitJob() 方法的调用后,会把此调用放入一个内部队列中,交由作业调度器(job scheduler)进行调度。

    submitJob(jobId, jobSubmitDir, null, ts, false);
    // -->
    jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),
          new Path(jobSubmitDir));
    
  2. 作业调度器并对job进行初始化。初始化包括创建一个表示正在运行作业的对象——封装任务和纪录信息,以便跟踪任务的状态和进程(步骤5)。

    job = new JobInProgress(this, this.conf, jobInfo, 0, ts);
    // -->
    status = addJob(jobId, job);
    // -->
    synchronized (jobs) {
      synchronized (taskScheduler) {
        jobs.put(job.getProfile().getJobID(), job);
        for (JobInProgressListener listener : jobInProgressListeners) {
          listener.jobAdded(job);
        }
      }
    }
    
  3. 创建任务列表。在 JobInProgress的 initTask()方法中

  4. 从共享文件系统中获取 JobClient 已计算好的输入分片信息(步骤6)

  5. 创建 Map 任务和 Reduce 任务,为每个 MapTask 和 ReduceTask 生成 TaskProgress 对象。

  6. 创建的 reduce 任务的数量由 JobConf 的 mapred.reduce.task 属性决定,可用 setNumReduceTasks() 方法设置,然后调度器创建相应数量的要运行的 reduce 任务。任务被分配了 id。

    JobInProgress initTasks() 
    … …
    TaskSplitMetaInfo[] splits = createSplits(jobId); // read input splits and create a map per a split
    // -->
    allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex, 
          splitMetaInfo.getLocations(), 
          splitMetaInfo.getInputDataLength());
    maps = new TaskInProgress[numMapTasks]; // 每个分片创建一个map任务
    this.reduces = new TaskInProgress[numReduceTasks]; // 创建reduce任务
    

任务分配

Tasktracker 和 JobTracker 通过心跳通信分配一个任务

  1. TaskTracker 定期发送心跳,告知 JobTracker, tasktracker 是否还存活,并充当两者之间的消息通道。

  2. TaskTracker 主动向 JobTracker 询问是否有作业。若自己有空闲的 solt,就可在心跳阶段得到 JobTracker 发送过来的 Map 任务或 Reduce 任务。对于 map 任务和 task 任务,TaskTracker 有固定数量的任务槽,准确数量由 tasktracker 核的个数核内存的大小来确定。默认调度器在处理 reduce 任务槽之前,会填充满空闲的 map 任务槽,因此,如果 tasktracker 至少有一个空闲的 map 任务槽,tasktracker 会为它选择一个 map 任务,否则选择一个 reduce 任务。选择 map 任务时,jobTracker 会考虑数据本地化(任务运行在输入分片所在的节点),而 reduce 任务不考虑数据本地化。任务还可能是机架本地化。

  3. TaskTracker 和 JobTracker heartbeat代码

    TaskTracker.transmitHeartBeat()
    // -->
    //
    // Check if we should ask for a new Task
    //
    if (askForNewTask) {
      askForNewTask = enoughFreeSpace(localMinSpaceStart);
      long freeDiskSpace = getFreeSpace();
      long totVmem = getTotalVirtualMemoryOnTT();
      long totPmem = getTotalPhysicalMemoryOnTT();
      long availableVmem = getAvailableVirtualMemoryOnTT();
      long availablePmem = getAvailablePhysicalMemoryOnTT();
      long cumuCpuTime = getCumulativeCpuTimeOnTT();
      long cpuFreq = getCpuFrequencyOnTT();
      int numCpu = getNumProcessorsOnTT();
      float cpuUsage = getCpuUsageOnTT();
    // -->
    // Xmit the heartbeat
    HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
                                                              justStarted,
                                                              justInited,
                                                              askForNewTask, 
                                                              heartbeatResponseId);
    注: InterTrackerProtocol jobClient RPC 到 JobTracker.heartbeat() 
    JobTracker.heartbeat()
    // -->
    // Process this heartbeat 
    short newResponseId = (short)(responseId + 1);
    status.setLastSeen(now);
    if (!processHeartbeat(status, initialContact, now)) {
      if (prevHeartbeatResponse != null) {
        trackerToHeartbeatResponseMap.remove(trackerName);
      }
      return new HeartbeatResponse(newResponseId, 
                   new TaskTrackerAction[] {new ReinitTrackerAction()});
    }
    

任务执行

tasktracker 执行任务大致步骤:

  1. 被分配到一个任务后,从共享文件中把作业的jar复制到本地,并将程序执行需要的全部文件(配置信息、数据分片)复制到本地
  2. 为任务新建一个本地工作目录
  3. 内部类TaskRunner实例启动一个新的jvm运行任务

Tasktracker.TaskRunner.startNewTask()代码

// -->
RunningJob rjob = localizeJob(tip);
// -->
launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); 
// -->
tip.launchTask(rjob);
// -->
setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
this.runner.start(); // MapTaskRunner 或者 ReduceTaskRunner
//
//startNewTask 方法完整代码:
void startNewTask(final TaskInProgress tip) throws InterruptedException {
    Thread launchThread = new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          RunningJob rjob = localizeJob(tip);//初始化job工作目录
          tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString());
          // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
          launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); // 启动taskrunner执行task
        } catch (Throwable e) {
          String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                        ":\n" + StringUtils.stringifyException(e));
          LOG.warn(msg);
          tip.reportDiagnosticInfo(msg);
          try {
            tip.kill(true);
            tip.cleanup(false, true);
          } catch (IOException ie2) {
            LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
          } catch (InterruptedException ie2) {
            LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
          }
          if (e instanceof Error) {
            LOG.error("TaskLauncher error " + 
                StringUtils.stringifyException(e));
          }
        }
      }
    });
    launchThread.start();
  }

进度和状态更新

  1. 状态包括:作业或认为的状态(成功,失败,运行中)、map 和 reduce 的进度、作业计数器的值、状态消息或描述
  2. task 运行时,将自己的状态发送给 TaskTracker,由 TaskTracker 心跳机制向 JobTracker 汇报
  3. 状态进度由计数器实现

如图: image

作业完成

  1. jobtracker收到最后一个任务完成通知后,便把作业任务状态置为成功
  2. 同时jobtracker,tasktracker清理作业的工作状态

错误处理

task 失败

  1. map 或者 reduce 任务中的用户代码运行异常,子 jvm 在进程退出之前向其父 tasktracker 发送报告, 并打印日志。tasktracker 会将此 task attempt 标记为 failed,释放一个任务槽 slot,以运行另一个任务。streaming 任务以非零退出代码,则标记为 failed.
  2. 子进程jvm突然退出(jvm bug)。tasktracker 注意到会将其标记为 failed。
  3. 任务挂起。tasktracker 注意到一段时间没有收到进度的更新,便将任务标记为 failed。此 jvm 子进程将被自动杀死。任务超时时间间隔通常为10分钟,使用 mapred.task.timeout 属性进行配置。以毫秒为单位。超时设置为0表示将关闭超时判定,长时间运行不会被标记为 failed,也不会释放任务槽。
  4. tasktracker 通过心跳将子任务标记为失败后,自身计数器减一,以便向 jobtracker 申请新的任务
  5. jobtracker 通过心跳知道一个 task attempt 失败之后,便重新调度该任务的执行(避开将失败的任务分配给执行失败的tasktracker)。默认执行失败尝试4次,若仍没有执行成功,整个作业就执行失败。

tasktracker 失败

  1. 一个 tasktracker 由于崩溃或者运行过于缓慢而失败,就会停止将 jobtracker 心跳。默认间隔可由 mapred.tasktracker.expriy.interval 设置,毫秒为单位。
  2. 同时 jobtracker 将从等待任务调度的 tasktracker 池将此 tasktracker 移除。jobtracker 重新安排此 tasktracker 上已运行并成功完成的 map 任务重新运行。
  3. 若 tasktracker 上面的失败任务数远远高于集群的平均失败数,tasktracker 将被列入黑名单。重启后失效。

jobtracker失败

Hadoop jobtracker 失败是一个单点故障。作业失败。可在后续版本中启动多个 jobtracker,使用zookeeper协调控制(YARN)。

作业调度

  1. hadoop默认使用先进先出调度器(FIFO) 先遵循优先级优先,在按作业到来顺序调度。缺点:高优先级别的长时间运行的task占用资源,低级优先级,短作业得不到调度。
  2. 公平调度器(FairScheduler) 目标:让每个用户公平的共享集群的能力.默认情况下,每个用户都有自己的池。支持抢占,若一个池在特定的时间内未得到公平的资源分配共享,调度器将终止运行池中得到过多资源的任务,以便将任务槽让给资源不足的池。 详细文档参见:http://hadoop.apache.org/docs/r1.2.1/fair_scheduler.html
  3. 容量调度器(CapacityScheduler) 支持多队列,每个队列配置一定的资源,采用FIFO调度策略。对每个用户提交的作业所占的资源进行限定。 详细文档参见:http://hadoop.apache.org/docs/r1.2.1/capacity_scheduler.html

shuffle和sort

mapreduce 执行排序,将 map 输出作为输入传递给 reduce 称为 shuffle。其确保每个 reduce 的输入都时按键排序。shuffle 是调优 mapreduce 重要的阶段。

mapreduce 的 shuffle 和排序如下图: image

map端

  1. map端并不是简单的将中间结果输出到磁盘。而是先用缓冲的方式写到内存,并预排序。
  2. 每个map任务都有一个环形缓冲区,用于存储任务的输出。默认100mb,由 io.sort.mb 设置。 io.sort.spill.percent 设置阀值,默认80%。
  3. 一旦内存缓冲区到达阀值,由一个后台线程将内存中内容 spill 到磁盘中。在写磁盘前,线程会根据数据最终要传送的 reducer 数目划分成相应的分区。每一个分区中,后台线程按键进行内排序,如果有一个 combiner 它会在排序后的输出上运行。
  4. 在任务完成之前,多个溢出写文件会被合并成一个已分区已排序的输出文件。最终成为 reduce 的输入文件。属性 io.sort.factor 控制一次最多能合并多少流(分区),默认10.
  5. 如果已指定 combiner,并且溢出写文件次数至少为3(min.num.spills.for.combiner 属性),则 combiner 就会在输出文件写到磁盘之前运行。目的时 map 输出更紧凑,写到磁盘上的数据更少。combiner 在输入上反复运行并不影响最终结果。
  6. 压缩 map 输出。写磁盘速度更快、节省磁盘空间、减少传给 reduce 数据量。默认不压缩。可使 mapred.compress.map.output=true 启用压缩,并指定压缩库, mapred.map.output.compression.codec。
  7. reducer 通过HTTP方式获取输出文件的分区。由于文件分区的工作线程数量任务的 tracker.http.threads 属性控制。

MapTask代码,内部类MapOutputBuffer.collect()方法在收集key/value到容器中,一旦满足预值,则开始溢出写文件由sortAndSpill() 执行。

// sufficient acct space
          kvfull = kvnext == kvstart;
          final boolean kvsoftlimit = ((kvnext > kvend)
              ? kvnext - kvend > softRecordLimit
              : kvend - kvnext <= kvoffsets.length - softRecordLimit);
          if (kvstart == kvend && kvsoftlimit) {
            LOG.info("Spilling map output: record full = " + kvsoftlimit);
            startSpill();
          }
// --> startSpill();
 spillReady.signal(); //    private final Condition spillReady = spillLock.newCondition();
// --> 溢出写文件主要由内部类 SpillThread(Thread) 执行
    try {
              spillLock.unlock();
              sortAndSpill(); // 排序并溢出
            } 
// --> sortAndSpill()
 // create spill file
        final SpillRecord spillRec = new SpillRecord(partitions);
 // sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
… …
 sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
// -->
 if (combinerRunner == null) {
… …
 // Note: we would like to avoid the combiner if we've fewer
              // than some threshold of records for a partition
              if (spstart != spindex) {
                combineCollector.setWriter(writer);
                RawKeyValueIterator kvIter =
                  new MRResultIterator(spstart, spindex);
                combinerRunner.combine(kvIter, combineCollector);
              }
}

reduce 端

  1. reduce 端 shuffle 过程分为三个阶段:复制 map 输出、排序合并、reduce 处理
  2. reduce 可以接收多个 map 的输出。若 map 相当小,则会复制到 reduce tasktracker 的内存中(mapred.job.shuffle.input.buffer.pecent控制百分比)。一旦内存缓冲区达到阀值大小(由 mapped.iob.shuffle.merge.percent 决定)或者达到map输出阀值( mapred.inmem.merge.threshold 控制),则合并后溢出写到磁盘
  3. map任务在不同时间完成,tasktracker 通过心跳从 jobtracker 获取 map 输出位置。并开始复制 map 输出文件。
  4. reduce 任务由少量复制线程,可并行复制 map 输出文件。由属性 mapred.reduce.parallel.copies 控制。
  5. reduce 阶段不会等待所有输入合并成一个大文件后在进行处理,而是把部分合并的结果直接进行处理。

ReduceTask源代码,run()方法

// --> 3个阶段
 if (isMapOrReduce()) {
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
// --> copy 阶段
if (!isLocal) {
      reduceCopier = new ReduceCopier(umbilical, job, reporter);
      if (!reduceCopier.fetchOutputs()) {
        if(reduceCopier.mergeThrowable instanceof FSError) {
          throw (FSError)reduceCopier.mergeThrowable;
        }
        throw new IOException("Task: " + getTaskID() + 
            " - The reduce copier failed", reduceCopier.mergeThrowable);
      }
    }
    copyPhase.complete();                         // copy is already complete
// --> sort 阶段
setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);
    final FileSystem rfs = FileSystem.getLocal(job).getRaw();
    RawKeyValueIterator rIter = isLocal
      ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
          job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
          !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
          new Path(getTaskID().toString()), job.getOutputKeyComparator(),
          reporter, spilledRecordsCounter, null)
      : reduceCopier.createKVIterator(job, rfs, reporter);
    // free up the data structures
    mapOutputFilesOnDisk.clear();
    sortPhase.complete();                         // sort is complete
// --> reduce 阶段
setPhase(TaskStatus.Phase.REDUCE); 
    statusUpdate(umbilical);
    Class keyClass = job.getMapOutputKeyClass();
    Class valueClass = job.getMapOutputValueClass();
    RawComparator comparator = job.getOutputValueGroupingComparator();
    if (useNewApi) {
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }
// --> done 执行结果
    done(umbilical, reporter);

有关mapreduce shuffle和sort 原理、过程和调优

hadoop作业调优参数整理及原理, MapReduce:详解Shuffle过程 介绍的非常详尽。