kangfoo's blog

工作学习笔记,生活掠影。

Hadoop MapReduce RecordReader 组件

| 评论

由 RecordReader 决定每次读取以什么样的方式读取数据分片中的一条数据。Hadoop 默认的 RecordReader 是 LineRecordReader(TextInputFormat 的 getRecordReader() 方法返回即是 LineRecordReader。二进制输入 SequenceFileInputFormat 的 getRecordReader() 方法返回即是SequenceFileRecordReader。)。LineRecordReader是用每行的偏移量作为 map 的 key,每行的内容作为 map 的 value;

它可作用于,自定义读取每一条记录的方式;自定义读入 key 的类型,如希望读取的 key 是文件的路径或名字而不是该行在文件中的偏移量。

自定义RecordReader一般步骤

  1. 继承抽象类 RecordReader,实现 RecordReader 的实例;
  2. 实现自定义 InputFormat 类,重写 InputFormat 中 createRecordReader() 方法,返回值是自定义的 RecordReader 实例; (3)配置 job.setInputFormatClass() 设置自定义的 InputFormat 类型;

TextInputFormat类源代码理解

源码见 org.apache.mapreduce.lib.input.TextInputFormat 类(新API);

Hadoop 默认 TextInputFormat 使用 LineRecordReader。具体分析见注释。

  public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    return new LineRecordReader();
  }
// --> LineRecordReader
 public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                    Integer.MAX_VALUE);
    start = split.getStart();  // 当前分片在整个文件中的起始位置
    end = start + split.getLength(); // 当前分片,在整个文件的位置
    final Path file = split.getPath();
    compressionCodecs = new CompressionCodecFactory(job);// 压缩
    codec = compressionCodecs.getCodec(file);
//
    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath()); // 获取 FSDataInputStream
//
    if (isCompressedInput()) {
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        in = new LineReader(cIn, job); //一行行读取
        start = cIn.getAdjustedStart(); // 可能跨分区读取
        end = cIn.getAdjustedEnd();// 可能跨分区读取
        filePosition = cIn;
      } else {
        in = new LineReader(codec.createInputStream(fileIn, decompressor),
            job);
        filePosition = fileIn;
      }
    } else {
      fileIn.seek(start);//  调整到文件起始偏移量
      in = new LineReader(fileIn, job); 
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start; // 在当前分片的位置
  }
//  --> getFilePosition() 指针读取到哪个位置
// filePosition 为 Seekable 类型
  private long getFilePosition() throws IOException {
    long retVal;
    if (isCompressedInput() && null != filePosition) {
      retVal = filePosition.getPos();
    } else {
      retVal = pos;
    }
    return retVal;
  }
//
// --> nextKeyValue() 
public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    // 预读取下一条纪录
    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      pos += newSize; // 下一行的偏移量
      if (newSize < maxLineLength) {
        break;
      }
//
      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

自定义 RecordReader 演示

假设,现有如下数据 10 ~ 70 需要利用自定义 RecordReader 组件分别计算数据奇数行和偶数行的数据之和。结果为:奇数行之和等于 160,偶数和等于 120。出自于 开源力量 LouisT 老师的开源力量培训课-Hadoop Development课件。

数据:
10
20
30
40
50
60
70

源代码

TestRecordReader.java

数据准备

$ ./bin/hadoop fs -mkdir /inputreader
$ ./bin/hadoop fs -put ./a.txt /inputreader
$ ./bin/hadoop fs -lsr /inputreader
-rw-r--r--   2 hadoop supergroup         21 2014-02-20 21:04 /inputreader/a.txt

执行

$ ./bin/hadoop jar study.hdfs-0.0.1-SNAPSHOT.jar TestRecordReader  /inputreader /inputreaderout1
##
$ ./bin/hadoop fs -lsr /inputreaderout1
-rw-r--r--   2 hadoop supergroup          0 2014-02-20 21:12 /inputreaderout1/_SUCCESS
drwxr-xr-x   - hadoop supergroup          0 2014-02-20 21:11 /inputreaderout1/_logs
drwxr-xr-x   - hadoop supergroup          0 2014-02-20 21:11 /inputreaderout1/_logs/history
-rw-r--r--   2 hadoop supergroup      16451 2014-02-20 21:11 /inputreaderout1/_logs/history/job_201402201934_0002_1392901901142_hadoop_TestRecordReader
-rw-r--r--   2 hadoop supergroup      48294 2014-02-20 21:11 /inputreaderout1/_logs/history/job_201402201934_0002_conf.xml
-rw-r--r--   2 hadoop supergroup         23 2014-02-20 21:12 /inputreaderout1/part-r-00000
-rw-r--r--   2 hadoop supergroup         23 2014-02-20 21:12 /inputreaderout1/part-r-00001
##
$ ./bin/hadoop fs -cat /inputreaderout1/part-r-00000
偶数行之和:  120
##
$ ./bin/hadoop fs -cat /inputreaderout1/part-r-00001
奇数行之和:  160

评论