由 RecordReader 决定每次读取以什么样的方式读取数据分片中的一条数据。Hadoop 默认的 RecordReader 是 LineRecordReader(TextInputFormat 的 getRecordReader() 方法返回即是 LineRecordReader。二进制输入 SequenceFileInputFormat 的 getRecordReader() 方法返回即是SequenceFileRecordReader。)。LineRecordReader是用每行的偏移量作为 map 的 key,每行的内容作为 map 的 value;
它可作用于,自定义读取每一条记录的方式;自定义读入 key 的类型,如希望读取的 key 是文件的路径或名字而不是该行在文件中的偏移量。
自定义RecordReader一般步骤
- 继承抽象类 RecordReader,实现 RecordReader 的实例;
- 实现自定义 InputFormat 类,重写 InputFormat 中 createRecordReader() 方法,返回值是自定义的 RecordReader 实例; (3)配置 job.setInputFormatClass() 设置自定义的 InputFormat 类型;
TextInputFormat类源代码理解
源码见 org.apache.mapreduce.lib.input.TextInputFormat 类(新API);
Hadoop 默认 TextInputFormat 使用 LineRecordReader。具体分析见注释。
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
return new LineRecordReader();
}
// --> LineRecordReader
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart(); // 当前分片在整个文件中的起始位置
end = start + split.getLength(); // 当前分片,在整个文件的位置
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);// 压缩
codec = compressionCodecs.getCodec(file);
//
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath()); // 获取 FSDataInputStream
//
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new LineReader(cIn, job); //一行行读取
start = cIn.getAdjustedStart(); // 可能跨分区读取
end = cIn.getAdjustedEnd();// 可能跨分区读取
filePosition = cIn;
} else {
in = new LineReader(codec.createInputStream(fileIn, decompressor),
job);
filePosition = fileIn;
}
} else {
fileIn.seek(start);// 调整到文件起始偏移量
in = new LineReader(fileIn, job);
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start; // 在当前分片的位置
}
// --> getFilePosition() 指针读取到哪个位置
// filePosition 为 Seekable 类型
private long getFilePosition() throws IOException {
long retVal;
if (isCompressedInput() && null != filePosition) {
retVal = filePosition.getPos();
} else {
retVal = pos;
}
return retVal;
}
//
// --> nextKeyValue()
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
// 预读取下一条纪录
while (getFilePosition() <= end) {
newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize; // 下一行的偏移量
if (newSize < maxLineLength) {
break;
}
//
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
自定义 RecordReader 演示
假设,现有如下数据 10 ~ 70 需要利用自定义 RecordReader 组件分别计算数据奇数行和偶数行的数据之和。结果为:奇数行之和等于 160,偶数和等于 120。出自于 开源力量 LouisT 老师的开源力量培训课-Hadoop Development课件。
数据: 10 20 30 40 50 60 70
源代码
数据准备
$ ./bin/hadoop fs -mkdir /inputreader $ ./bin/hadoop fs -put ./a.txt /inputreader $ ./bin/hadoop fs -lsr /inputreader -rw-r--r-- 2 hadoop supergroup 21 2014-02-20 21:04 /inputreader/a.txt
执行
$ ./bin/hadoop jar study.hdfs-0.0.1-SNAPSHOT.jar TestRecordReader /inputreader /inputreaderout1 ## $ ./bin/hadoop fs -lsr /inputreaderout1 -rw-r--r-- 2 hadoop supergroup 0 2014-02-20 21:12 /inputreaderout1/_SUCCESS drwxr-xr-x - hadoop supergroup 0 2014-02-20 21:11 /inputreaderout1/_logs drwxr-xr-x - hadoop supergroup 0 2014-02-20 21:11 /inputreaderout1/_logs/history -rw-r--r-- 2 hadoop supergroup 16451 2014-02-20 21:11 /inputreaderout1/_logs/history/job_201402201934_0002_1392901901142_hadoop_TestRecordReader -rw-r--r-- 2 hadoop supergroup 48294 2014-02-20 21:11 /inputreaderout1/_logs/history/job_201402201934_0002_conf.xml -rw-r--r-- 2 hadoop supergroup 23 2014-02-20 21:12 /inputreaderout1/part-r-00000 -rw-r--r-- 2 hadoop supergroup 23 2014-02-20 21:12 /inputreaderout1/part-r-00001 ## $ ./bin/hadoop fs -cat /inputreaderout1/part-r-00000 偶数行之和: 120 ## $ ./bin/hadoop fs -cat /inputreaderout1/part-r-00001 奇数行之和: 160