kangfoo's blog

工作学习笔记,生活掠影。

Hadoop I/O

| 评论

HDFS 对网络IO, 磁盘IO 的操作是比较复杂且开销还比较高的。Hadoop 在设计中使用了内部的原子操作、压缩、随机读写、流式存储、数据完整性校验、序列化、基于文件的数据结构等方面进行 IO 操作。

数据完整性

保证数据在传输过程中不损坏,常见的保证数据完整性采用的技术

  • 奇偶校验技术
  • ECC 内存纠错校验技术
  • CRC-32 循环冗余校验技术

HDFS的数据完整性

HDFS 会对写入的所有数据计算校验和,并在读取数据时验证校验和。它针对每个由 io.bytes.per.checksum(默认512字节,开销低于1%)指定字节数据技术校验和。

DataNode 负责在验证收到的数据后存储数据及其校验和。从客户端和其它数据节点复制过来的数据。客户端写入数据并且将它发送到一个数据节点管线中,在管线的最后一个数据节点验证校验和。

客户端读取 DataNode 上的数据时,也会验证校验和。将其与 DataNode 上存储的校验和进行对比。每个 DataNode 维护一个连续的校验和验证日志,因此它知道每个数据块最后验证的时间。

每个 DataNode 还会在后台线程运行一个 DataBlockScanner(数据块检测程序),定期验证存储在数据节点上的所有块,以解决物理存储媒介上位损坏问题。

HDFS 通过复制完整的数据复本来修复损坏的数据块,进而得到一个新的、完好无损的复本。基本思路:如果客户端读取数据块时检测到错误,就向 NameNode 汇报已损坏的数据块及它试图从名称节点中要读取的 DataNode,并抛出 ChecksumException。 NameNode 将这个已损坏的数据块复本标记为已损坏,并不直接与 datanode 联系,或尝试将这个个复本复制到另一个 datanode。之后,namennode 安排这个数据块的一个复本复制到另一个 datanode。 至此,数据块复制因子恢复到期望水平。此后,并将已损坏的数据块复本删除。

LocalFileSystem

Hadoop的 LocalFileSystem 执行客户端校验。意味着,在写一个名filename的文件时,文件系统的客户端以透明的方式创建一个隐藏.filename.crc。在同一个文件夹下,包含每个文件块的校验和。

禁用校验和,使用底层文件系统原生支持校验和。这里通过 RawLocalFileSystem 来替代 LocalFileSystem 完成。要在一个应用中全局使用,只需要设置 fs.file.impl值为 org.apache.hadoop.fs.RawLocalFileSystem 来重新 map 执行文件的 URL。或者只想对某些读取禁用校验和校验。例:

Configuration conf = ...
FileSystem fs = new RawLocalFileSystem();
fs.initialize(null, conf);

ChecksumFileSystem

LocalFileSystem 继承自 ChecksumFileSystem(校验和文件系统),ChecksumFileSystem 继承自 FileSystem。ChecksumFileSystem 可以很容易的添加校验和功能到其他文件系统中。

压缩

将文件压缩有两大好处

  • 减少存储文件所需要的磁盘空间
  • 加速数据在网络和磁盘上的传输

编译native-hadoop

参见 《Native-hadoop 编译》

压缩算法

所有的压缩算法都需要权衡时间/空间比.压缩和解压缩速度越快,节省空间越少。gizp压缩空间/时间性能比较适中。bzip2比gzip更高效,但数度更慢; lzo 压缩速度比gzip比较快,但是压缩效率稍微低一点。

Hadoop支持的压缩格式

压缩格式 工具 算法 文件扩展名 多文件 可切分
DEFLATE DEFLATE .deflate
Gzip gzip DEFLATE .gz
bzip2 bzip2 bzip2 .bz
LZO Lzop LZO .lzo


编码/解码 用以执行压缩解压算法,是否有java/原生实现

压缩格式 codec java实现 原生实现
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.Bzip2Codec
LZO com.hadoop.compression.lzo.LzopCodec


压缩算法相关的 API

使用 CompressionCodecFactory.getCodec()方法来推断 CompressionCodec 具体实现。由 CompressionCodec 接口的实现对流进行进行压缩与解压缩。CodecPool 提供了重复利用压缩和解压缩的对象的机制。

… … 画个类图。## TOTO

NativeCodeLoader 加载 native-hadoop library 若想使用 snappycode 首先加载 snappy.so,再判断加载 native hadoop–>hadoop.so。native hadoop 中包含了 java 中申明的native 方法,由 native 方法去调用第三方的 natvie library。native_libraries官方参考文档

在 Hadoop core-site.xml 配置文件中可以设置是否使用本地库,默认以启用。

<property>
  <name>hadoop.native.lib</name>
  <value>true</value>
  <description>Should native hadoop libraries, if present, be used.</description>
</property>

编写使用压缩的测试程序

  1. 首先下载并编译 snappy,zlib
  2. 编写 java 代码 CompressionTest.java, DeCompressionTest.java。 程序是由 maven test 进行。
  3. 执行
    $ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/hadoop/env/hadoop/lib/native/Linux-amd64-64:/usr/local/lib
    $ mvn test -Dtest=com.kangfoo.study.hadoop1.io.CompressionTest
    $ mvn test -Dtest=com.kangfoo.study.hadoop1.io.DeCompressionTest
    ## 结果:
    rw-rw-r--. 1 hadoop hadoop 531859 7月  23 2013 releasenotes.html
    -rw-rw-r--. 1 hadoop hadoop 140903 1月  22 15:13 releasenotes.html.deflate
    -rw-rw-r--. 1 hadoop hadoop 531859 1月  22 15:22 releasenotes.html.deflate.decp
    -rw-rw-r--. 1 hadoop hadoop 140915 1月  22 15:13 releasenotes.html.gz
    -rw-rw-r--. 1 hadoop hadoop 531859 1月  22 15:22 releasenotes.html.gz.decp
    -rw-rw-r--. 1 hadoop hadoop 224661 1月  22 15:13 releasenotes.html.snappy
    -rw-rw-r--. 1 hadoop hadoop 531859 1月  22 15:22 releasenotes.html.snappy.decp
    ## 日志:
    Running com.kangfoo.study.hadoop1.io.CompressionTest
    2014-01-22 15:13:31,312 WARN  snappy.LoadSnappy (LoadSnappy.java:<clinit>(36)) - Snappy native library is available
    2014-01-22 15:13:31,357 INFO  util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(43)) - Loaded the native-hadoop library
    2014-01-22 15:13:31,357 INFO  snappy.LoadSnappy (LoadSnappy.java:<clinit>(44)) - Snappy native library loaded
    2014-01-22 15:13:31,617 INFO  zlib.ZlibFactory (ZlibFactory.java:<clinit>(47)) - Successfully loaded & initialized native-zlib library
    

启用压缩

出于性能考虑,使用原生的压缩库要比同时提供 java 实现的开销更小。可以修改 Hadoop core-site.xml 配置文件 io.compression.codecs 以启用压缩,前提是必须安装好对应的原生压缩库依赖,并配置正确的 Codec。

  • 属性名: io.compression.codecs
  • 默认值:org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.ompress.Bzip2Codec

压缩与输入分割

考虑如何压缩将由 MapReduce 处理的数据时,是否支持分割很重要。

案例假设,一个gzip压缩的文件的为1GB。HDFS 将其分为16块(64mb 块大小),其中每一数据块最为一个 map 任务输入。那么在 map 任务中,每一个分块是无法独立工作的( gzip 是使用的 DEFLATE 算法,它将数据存储在一系列的压缩块中。无法实现从数据流的任意位置读取数据,那么这些分块必须全部读取并与整个数据流进行同步才能从任意位置进行读取数据)。这样就失去了本地化的优势。一个 map 要处理其他15个分块的数据,而大多数据并不存储在当前 map 节点上。Map的任务数越少,作业的粒度就较大,运行的时间可能会更长。

具体应该选择哪种压缩形式,还要经过测试,才可以决定。大文件选择支持分割的压缩形式,目前只有 bzip2 支持分片,但没有原生库的实现。或者使用 SequenceFile, MapFile 数据格式进行小文件的合并再存储,这样可以满足分片。

在 MapReduce 中使用压缩

如果文件是压缩过的,那么在被 MapReduce 读取时,它们会被解压,根据文件扩展名选择对应的解码器。可参考 MapReduce 块压缩相关知识。

压缩 MapReduce 的作业输出

  1. 在作业配置中将 mapred.output.compress 属性设置为 true
  2. 将 mapred.output.compression.codec 属性设置为自己需要使用的压缩解码/编码器的类名。

代码示例

conf.setBoolean(“mapred.output.compress”,true)
Conf.setClass(“mapred.output.compression.codec”,GizpCodec.class,CompressionCodec.class);

对 Map 任务输出结果的压缩

压缩 Map 作业的中间结果以减少网络传输。

Map输出压缩属性
属性名称: mapred.compress.map.output
类型: boolean
默认值: false
描述: 对 map 任务输出是否进行压缩

属性名称: mapred.map.output.compression.codec
类型: Class
默认值: org.apache.hadoop.io.compress.DefaultCodec
描述: map 输出所用的压缩 codec

代码示例

conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.classs)


序列化和反序列化

什么是Hadoop的序列化? 序列化,将结构化对象转换为字节流,以便于在网络传输和磁盘存储的过程。反序列化,将字节流转化为结构化的对象的逆过程。可用于进程间的通讯和永久存储,数据拷贝

序列化特点:

  • 紧凑:可充分利用网络带宽(哈夫曼编码)
  • 快速:尽量减少序列化和反序列化的开销
  • 可扩展:通讯协议升级向下兼容
  • 互操作:支持不同语言间的通讯

Hadoop1.x 仅满足了紧凑和快速两个特性。 java 自身提供的序列化并不精简。Java Serializaiton 是序列化图对象的通讯机制,它有序列化和反序列化的开销。 java 序列化比较复杂,不能很精简的控制对象的读写。连接/延迟/缓冲。java 序列化不能满足: 精简,快速,可扩展,可互操作。

Hadoop1.x 使用 Writable 实现自己的序列化格式。它结构紧凑,快速。但难以用 java 以外的语言进行扩展。

Writable 接口

Writeable 接口定义了2个方法:

void write(DataOutput out) throws IOException; // 将其状态写入二进制格式的 DataOutput 流;
void readFields(DataInput in) throws IOException; // 从二进制格式的 DataInput 流读取其状态

画个类图 ## TODO

writable
writableComparable(interface WritableComparable<T> extends Writable, Comparable<T> )
comparator(int compare(T o1, T o2);)
comparable(public int compareTo(T o);)
rawcomparator(public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);)
writablecomparator(ReflectionUtils.newInstance(keyClass, null);)

Writable 类的层次结构 image

部分类型列举

  • NullWritable 是一种特殊的Writable类型,单例的, 序列化的长度是零。可以做占位符。
  • Text 是针对UTF-8序列化的Writable类。一般可等价于 java.lang.String 的 Writable。Text是可变的。
  • BytesWritable 是一个对二进制的封装,序列化为一个格式为一个用于制定后面数据字节数的整数域(4字节),后跟字节本身。它是可变的。如:
    BytesWritable b = new BytesWritable(new byte[]{2,5,127}); // 3个长度
    byte[] bytes = serialize(b);
    assertThat(StringUtils.byteToHexString(bytes), is("0000000302057f"));
    
  • ObjectWritable 适用于java基本类型(String,enum,Writable,null或者这些类型组成的数组)的一个封装。
  • Writable集合。ArrayWritable和TwoDArrayWritable针对于数组和二维数组,它们中所有的元素必须是同一个类的实例。MapWritable和SortedMapWritable是针对于 Map 和 SorMap。

自定义Writable •实现WritableComparable •实现

write(); // 将对象转换为字节流并写入到输出流 out 中
readFields(); // 从输入流 in 中读取字节流并反序列化为对象
compareTo()方法。 // 将 this 对像与对象 O 比较

示例程序代码

序列化框架

  • apache avro 旨在解决Hadoop中Writable类型的不足:缺乏语言的可移植性。
  • apache thrift 可伸缩的跨语言, 提供了 PRC 实现层。
  • Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。目前提供了 C++、Java、Python 三种语言的 API。

参考

基于文件的数据结构

使用 SequenceFile, MapFile 主要解决的问题是:支持分片的数据压缩格式的比较有限,对于某些应用而言,需要处理的数据格式来存储自己的格式,MapRedurce 需要更高级的容器。

SequenceFile

  1. 文件是以二进制键/值对形式存储的平面文件
  2. 可以作为小文件的容器,它将小文件包装起来,以获取更高效率的存储和处理
  3. 存储在 SequenceFile 中的 key/valu e并不一定是 Writable 类型
  4. 可使用 append()方法在文件末位附加 key/value 对

好处

  1. 支持纪录或者块压缩
  2. 支持splittable, 可作为mapreduce输入分片
  3. 修改简单(har是不可以修改的)

SequenceFile 压缩

SequenceFile 文件格式内部结构与是否启用压缩有关。启用压缩又分两类:纪录压缩;数据块压缩。

  1. 无压缩。 默认是不启用压缩,则每个纪录就是它的纪录长度(字节数)、键长度、键和值组成。长度字段为4字节的整数。

  2. 纪录压缩。其格式与无压缩情况相同,不同在于纪录压缩的值需要通过文件头中定义的压缩codec进行压缩。键不压缩。
    无压缩和纪录压缩的示意图: image

  3. 块压缩。一次压缩多条纪录,比单条纪录压缩效率高。可以不断的向数据块中压缩纪录,直到字节数不小于io.seqfile.compress.blocksize属性中设置的字节数。默认1MB.每个新的块的开始处都需要插入同步标识。数据块的格式如下:首先是一个指示数据块中字节数的字段;紧跟是4个压缩字段(键长度、键、值长度、值)。块压缩示意图如下: image

实例程序代码

运行结果

## 查看 sequence file
$ ./bin/hadoop fs -text /numbers.seq
## 排序
$ ./bin/hadoop jar ./hadoop-examples-1.2.1.jar sort -r 1 -inFormat org.apache.hadoop.mapred.SequenceFileInputFormat -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat -outKey org.apache.hadoop.io.IntWritable -outValue org.apache.hadoop.io.Text /numbers.seq sorted
## 查看排序后的结果(原键降序排列为从1到100升序排列)
$ ./bin/hadoop fs -text /user/hadoop/sorted/part-00000

博客参考

MapFile

MapFile 是已经排序的 SequenceFile,可以视为 java.util.Map 持久化形式。它已加入了搜索键的索引,可以根据 key 进行查找。它的键必须是 WritableComparable 类型的实例,值必须是 Writable 类型的实例,而 SequenceFile 无此要求。使用 MapFile.fix() 方法进行索引重建,把 SequenceFile 转换为 MapFile。

MapFile java 源代码

org.apache.hadoop.io.MapFile.Writer{ 
// 类的内部结构(MapFile是已经排序的SequenceFile):
private SequenceFile.Writer data;
private SequenceFile.Writer index;
… … 
}
org.apache.hadoop.io.MapFile.Reader{
// 二分法查找。一次磁盘寻址 + 一次最多顺序128(默认值等于每128下一个索引)个条目顺序扫瞄
public synchronized Writable get(WritableComparable key, Writable val){
… … 
}

实例程序代码

运行结果

$ ./bin/hadoop fs -text /numbers.map/data 
$ ./bin/hadoop fs -text /numbers.map/index

SequenceFile合并为MapFile

  1. 新建SequenceFile文件
    $ ./bin/hadoop jar ./hadoop-examples-1.2.1.jar sort -r 1 -inFormat org.apache.hadoop.mapred.SequenceFileInputFormat -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat -outKey org.apache.hadoop.io.IntWritable -outValue org.apache.hadoop.io.Text /numbers.seq /numbers2.map
    
  2. 重命名文件夹
    $ ./bin/hadoop fs -mv /numbers2.map/part-00000 /numbers2.map/data
    
  3. 运行测试用例
  4. 验证结果
    Created MapFile hdfs://master11:9000/numbers2.map with 100 entries
    rw-r--r--   2 hadoop      supergroup       4005 2014-02-09 20:06 /numbers2.map/data
    -rw-r--r--   3 kangfoo-mac supergroup        136 2014-02-09 20:13 /numbers2.map/index
    

属于 hadoop 分类 被贴了 hadoop1 标签

相关文章

« Hadoop pipes 编译 Hadoop RPC »

评论