kangfoo's blog

工作学习笔记,生活掠影。

HDFS API 练习使用

| 评论

经过前几页博客的知识巩固,现在开始使用 Hadoop API 不是什么难事。此处不重点讲述。参考 Hadoop API 利用 FileSystem 实例对象操作 FSDataInputStream/FSDataOutputStream 基本不是问题。

HDFS API入门级别的使用

  1. 获取 FileSystem 对象
    get(Configuration conf)
    Configuration 对象封装了客户端或者服务器端的 conf/core-site.xml 配置

  2. 通过 FileSystem 对象进行文件操作
    读数据:open()获取FSDataInputStream(它支持随机访问),
    写数据:create()获取FSDataOutputStream

参考代码:HDFSTest.java

代码中主要利用 FileSystem 对象进行文件的 读、写、重命名、删除、文件信息获取等操作。

Hadoop 机架感知

| 评论

HDFS 和 Map/Reduce 的组件是能够感知机架的。

NameNode 和 JobTracker 通过调用管理员配置模块中的 API resolve 来获取集群里每个 slave 的机架id。该 API 将 slave 的 DNS 名称(或者IP地址)转换成机架id。使用哪个模块是通过配置项 topology.node.switch.mapping.impl 来指定的。模块的默认实现会调用 topology.script.file.name 配置项指定的一个的脚本/命令。 如果 topology.script.file.name 未被设置,对于所有传入的IP地址,模块会返回 /default-rack 作为机架 id。

在 Map/Reduce 部分还有一个额外的配置项 mapred.cache.task.levels ,该参数决定 cache 的级数(在网络拓扑中)。例如,如果默认值是2,会建立两级的 cache—— 一级针对主机(主机 -> 任务的映射)另一级针对机架(机架 -> 任务的映射)。

我目前没有模拟环境先纪录个参考博客 机架感知 以备后用。

模拟使用 SecondaryNameNode 恢复 NameNode

| 评论

SecondaryNameNode

在试验前先了解下什么是 SecondaryNameNode、它的原理、检查点等知识点。再依次从开始配置 SecondaryNameNode 检查点、准备测试环境、模拟正常的 NameNode 故障,并手动启动 NameNode 并从 SecondaryNameNode 中恢复 fsimage。

:此试验思路主要借鉴于开源力量LouisT 老师 Hadoop Development 课程中的SecondaryNameNode章节。

作用

主要是为了解决namenode单点故障。不是 namenode 的备份。它周期性的合并 fsimage ( namenode 的镜像)和 editslog(或者 edits——所有对 fsimage 镜像文件操作的步骤),并推送给 namenode 以辅助恢复namenode。

SecondaryNameNode 定期合并 fsimage 和 edits 日志,将 edits 日志文件大小控制在一个限度下。因为内存需求和 NameNode 在一个数量级上,所以通常 SecondaryNameNode 和 NameNode 运行在不同的机器上。SecondaryNameNode 通过bin/start-dfs.sh 在 conf/masters 中指定的节点上启动。

在hadoop 2.x 中它的作用可以被两个节点替换:checkpoint node(于 SecondaryNameNode 作用相同), backup node( namenode 的完全备份)

原理(具体可参见《Hadoop权威指南》第10章 管理Hadoop)

edits 文件纪录了所有对 fsimage 镜像文件的写操作的步骤。文件系统客户端执行写操作时,这些操作首先会被记录到 edits 文件中。Nodename 在内存中维护文件系统的元数据;当 edits 被修改时,相关元数据也同步更新。内存中的元数据可支持客户端的读请求。

在每次执行写操作之后,且在向客户端发送成功代码之前,edits 编辑日志都需要更新和同步。当 namedone 向多个目录写数据时,只有在所有写操作执行完毕之后方可返回成功代码,以保证任何操作都不会因为机器故障而丢失。

fsimage 文件是文件系统元数据的一个永久检查点。它包含文件系统中的所有目录和文件 inode 的序列化谢谢。每个 inode 都是一个文件或目录的元数据的内部描述方式。对于文件来说,包含的信息有“复制级别”、修改时间、访问时间、访问许可、块大小、组成一个文件的块等;对于目录来说,包含的信息有修改时间、访问许可和配额元数据等信息。

fsimage 是一个大型文件,频繁执行写操作,会使系统运行极慢。并非每一写操作都会更新到 fsimage 文件。 SecondaryNameNode 辅助 namenode,为 namenode 内存中的文件系统元数据创建检查点,并最终合并并更新 fsimage 镜像和减小 edits 文件。

SecondaryNameNode 的检查点

SecondaryNameNode 进程启动是由两个配置参数控制的。

  • fs.checkpoint.period,指定连续两次检查点的最大时间间隔, 默认值是1小时。
  • fs.checkpoint.size 定义了 edits 日志文件的最大值,一旦超过这个值会导致强制执行检查点(即使没到检查点的最大时间间隔)。默认值是64MB。

SecondaryNameNode 检查点的具体步骤

image

  1. SecondaryNameNode 请求主 namenode 停止使用 edits 文件,暂时将新的操作记录到 edits.new 文件中;
  2. SecondaryNameNode 以 http get 复制 主 namenode 中的 fsimage, edits 文件;
  3. SecondaryNameNode 将 fsimage 载入到内存,并逐一执行 edits 文件中的操作,创建新的fsimage.ckpt 文件;
  4. SecondaryNameNode 以 http post 方式将新的fsimage.ckp 复制到主namenode.
  5. 主 namenode 将 fsimage 文件替换为 fsimage.ckpt,同时将 edits.new 文件重命名为 edits。并更新 fstime 文件来记录下次检查点时间。

SecondaryNameNode 保存最新检查点的目录与 NameNode 的目录结构相同。 所以 NameNode 可以在需要的时候读取 SecondaryNameNode上的检查点镜像。

模拟 NameNode 故障以从 SecondaryNameNode 恢复

场景假设:如果NameNode上除了最新的检查点以外,所有的其他的历史镜像和 edits 文件都丢失了,NameNode 可以引入这个最新的检查点以恢复。具体模拟步骤如下:

  1. 在配置参数 dfs.name.dir 指定的位置建立一个空文件夹;
  2. 把检查点目录的位置赋值给配置参数 fs.checkpoint.dir;
  3. 启动NameNode,并加上-importCheckpoint。

NameNode 会从 fs.checkpoint.dir 目录读取检查点,并把它保存在 dfs.name.dir 目录下。 如果 dfs.name.dir 目录下有合法的镜像文件,NameNode 会启动失败。 NameNode 会检查fs.checkpoint.dir 目录下镜像文件的一致性,但是不会去改动它。

试验从 SecondaryNameNode 中备份恢复 NameNode

注意:此步骤执行并不能将原的数据文件系统从物理磁盘上移除,同样也不能在新格式化的 namenode 中查看旧的文件系统文件。请确定无误再试验。

试验知识准备

命令的使用方法请参考 SecondaryNameNode 命令。在试验前,可先了解些 hadoop 的默认配置 core-site.xml-default, hdfs-site.xml-default, mapred-site.xml-default

SecondarynameNode 相关属性描述:

属性:fs.checkpoint.dir     
值:${hadoop.tmp.dir}/dfs/namesecondary
描述:Determines where on the local filesystem the DFS secondary name node should store the temporary images to merge. If this is a comma-delimited list of directories then the image is replicated in all of the directories for redundancy.
fs.checkpoint.edits.dir

属性:${fs.checkpoint.dir}     
值:Determines where on the local filesystem the DFS secondary name node should 
描述:store the temporary edits to merge. If this is a comma-delimited list of directoires then teh edits is replicated in all of the directoires for redundancy. Default value is same as fs.checkpoint.dir

属性:fs.checkpoint.period  
值:3600   
描述:The number of seconds between two periodic checkpoints.

属性:fs.checkpoint.size    
值:67108864   
描述:The size of the current edit log (in bytes) that triggers a periodic checkpoint even if the fs.checkpoint.period hasn't expired.

试验环境配置

  1. 首先修改 core-site.xml 文件中的配置,主要是调小了 checkpoint 的周期并指定 SSN 的目录。

    <property>
    <name>fs.checkpoint.period</name>
    <value>120</value>
    </property>
    <property>
    <name>fs.checkpoint.dir</name>
    <value>/home/${user.name}/env/data/snn</value>
    </property>
    

    vi hdfs-site.xml 查看 NameNode 数据文件存储路径

    <property>
     <name>dfs.name.dir</name>
     <value>/home/${user.name}/env/data/name</value>
    </property>
    <property>
     <name>dfs.data.dir</name>
     <value>/home/${user.name}/env/data/data</value>
    </property>
    
  2. 再次,format namenode 。 ./bin/hadoop namenode -format。查看当前的 master namenode namespaceID cat ./name/current/VERSION

    #Tue Jan 21 15:14:40 CST 2014
    namespaceID=1816120670 ## 文件系统的唯一标识符
    cTime=0 ## namenode的创建时间,刚格式化为0,升级之后为时间戳
    storageType=NAME_NODE ## 存储类型
    layoutVersion=-41 ## 负的整数。描述了hdfs永久性数据结构的版本。与Hadoop的版本无关。与升级有关。
    
  3. 查看 datanode 下的version。cat data/current/VERSION

    #Tue Jan 21 09:51:42 CST 2014
    namespaceID=80003531
    storageID=DS-949100596-192.168.56.12-50010-1387691685116
    cTime=0
    storageType=DATA_NODE
    layoutVersion=-41
    

    若 namespaceID 不相同,请将 datanode 中的id修改为 namenode 相同的 namespaceID。 同样的步骤修改其他的 datanode. 若是第一次format可以跳过此步骤。此步骤要注意避免如下错误(Incompatible namespaceID):

    2014-01-21 15:07:54,890 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException: Incompatible namespaceIDs in /home/hadoop/env/data/data: namenode namespaceID = 2020545490; datanode namespaceID = 80003531
    

查看 NameNode 试验前正常环境状况

  1. 启动hdfs./bin/start-dfs.sh

  2. jps 检查所有的进程(当前NameNode进程正常)

    5832 SecondaryNameNode
    6293 Jps
    5681 NameNode
    2212 DataNode
    2198 DataNode
    
  3. 创建测试数据

    $ ./bin/hadoop fs -mkdir /test
    $ ./bin/hadoop fs -lsr /
    drwxr-xr-x   - hadoop supergroup          0 2014-01-21 15:21 /test
    [hadoop@master11 hadoop]$ ./bin/hadoop fs -put ivy.xml /test
    [hadoop@master11 hadoop]$ ./bin/hadoop fs -lsr /
    drwxr-xr-x   - hadoop supergroup          0 2014-01-21 15:22 /test
    -rw-r--r--   2 hadoop supergroup      10525 2014-01-21 15:22 /test/ivy.xml
    
  4. 查看 SecondaryNameNode 文件目录

    watch ls ./data/snn/ 
    current
    image
    in_use.l
    
  5. namenode 对应日志

    2014-01-21 15:54:02,654 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Roll Edit Log from 192.168.56.11
    2014-01-21 15:54:02,654 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Number of transactions: 0 Total time for transactions(ms): 0 Number of transactions batched in Syncs: 0 Number of syncs: 0 SyncTimes(ms): 0
    2014-01-21 15:54:02,655 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: closing edit log: position=4, editlog=/home/hadoop/env/data/name/current/edits
    2014-01-21 15:54:02,655 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: close success: truncate to 4, editlog=/home/hadoop/env/data/name/current/edits
    2014-01-21 15:54:02,778 INFO org.apache.hadoop.hdfs.server.namenode.TransferFsImage: Opening connection to http://0.0.0.0:50090/getimage?getimage=1
    2014-01-21 15:54:02,781 INFO org.apache.hadoop.hdfs.server.namenode.GetImageServlet: Downloaded new fsimage with checksum: 4a75545e83f108e21ef321fb0066ede4
    2014-01-21 15:54:02,781 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: Roll FSImage from 192.168.56.11
    2014-01-21 15:54:02,781 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: Number of transactions: 0 Total time for transactions(ms): 0 Number of transactions batched in Syncs: 0 Number of syncs: 1 SyncTimes(ms): 56
    2014-01-21 15:54:02,784 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: closing edit log: position=4, editlog=/home/hadoop/env/data/name/current/edits.new
    2014-01-21 15:54:02,784 INFO org.apache.hadoop.hdfs.server.namenode.FSEditLog: close success: truncate to 4, editlog=/home/hadoop/env/data/name/current/edits.new
    
  6. namenode 文件目录

    $ cd name/
    $ tree
    .
    ├── current
    │   ├── edits
    │   ├── fsimage
    │   ├── fstime
    │   └── VERSION
    ├── image
    │   └── fsimage
    ├── in_use.lock
    └── previous.checkpoint
    ├── edits
    ├── fsimage
    ├── fstime
    └── VERSION
    

模拟 NameNode 故障

  1. 人为的杀掉 namenode 进程
    kill -9 6690 ## 6690 NameNode
    删除 namenode 元数据
    $ rm -rf ./data/name/*
    删除 Secondary NameNode in_use.lock 文件 
    $ rm -rf ./snn/in_use.lock
    

从 SecondaryNameNode 中恢复 NameNode

  1. 启动以 importCheckpoint 方式启动 NameNode。$ ./bin/hadoop namenode -importCheckpoint

  2. 验证是否恢复成功

    ## HDFS 文件系统正常
    $ ./bin/hadoop fsck /
    The filesystem under path '/' is HEALTHY
    $ ./bin/hadoop fs -lsr /
    ## 元文件信息已恢复
    drwxr-xr-x   - hadoop supergroup          0 2014-01-21 15:22 /test
    -rw-r--r--   2 hadoop supergroup      10525 2014-01-21 15:22 /test/ivy.xml
    $ tree
    .
    ├── data
    ├── name(已恢复)
    │   ├── current
    │   │   ├── edits
    │   │   ├── fsimage
    │   │   ├── fstime
    │   │   └── VERSION
    │   ├── image
    │   │   └── fsimage
    │   ├── in_use.lock
    │   └── previous.checkpoint
    │       ├── edits
    │       ├── fsimage
    │       ├── fstime
    │       └── VERSION
    ├── snn
    │   ├── current
    │   │   ├── edits
    │   │   ├── fsimage
    │   │   ├── fstime
    │   │   └── VERSION
    │   ├── image
    │   │   └── fsimage
    │   └── in_use.lock
    └── tmp
    9 directories, 16 files
    
  3. 查看恢复日志信息(截取部分信息)

    ## copy fsimage
    14/01/21 16:57:52 INFO common.Storage: Storage directory /home/hadoop/env/data/name is not formatted.
    14/01/21 16:57:52 INFO common.Storage: Formatting ...
    14/01/21 16:57:52 INFO common.Storage: Start loading image file /home/hadoop/env/data/snn/current/fsimage
    14/01/21 16:57:52 INFO common.Storage: Number of files = 3
    14/01/21 16:57:52 INFO common.Storage: Number of files under construction = 0
    14/01/21 16:57:52 INFO common.Storage: Image file /home/hadoop/env/data/snn/current/fsimage of size 274 bytes loaded in 0 seconds.
    ##copy edits
    4/01/21 16:57:52 INFO namenode.FSEditLog: Start loading edits file /home/hadoop/env/data/snn/current/edits
    14/01/21 16:57:52 INFO namenode.FSEditLog: EOF of /home/hadoop/env/data/snn/current/edits, reached end of edit log Number of transactions found: 0.  Bytes read: 4
    14/01/21 16:57:52 INFO namenode.FSEditLog: Start checking end of edit log (/home/hadoop/env/data/snn/current/edits) ...
    14/01/21 16:57:52 INFO namenode.FSEditLog: Checked the bytes after the end of edit log (/home/hadoop/env/data/snn/current/edits):
    14/01/21 16:57:52 INFO namenode.FSEditLog:   Padding position  = -1 (-1 means padding not found)
    14/01/21 16:57:52 INFO namenode.FSEditLog:   Edit log length   = 4
    14/01/21 16:57:52 INFO namenode.FSEditLog:   Read length       = 4
    14/01/21 16:57:52 INFO namenode.FSEditLog:   Corruption length = 0
    14/01/21 16:57:52 INFO namenode.FSEditLog:   Toleration length = 0 (= dfs.namenode.edits.toleration.length)
    14/01/21 16:57:52 INFO namenode.FSEditLog: Summary: |---------- Read=4 ----------|-- Corrupt=0 --|-- Pad=0 --|
    14/01/21 16:57:52 INFO namenode.FSEditLog: Edits file /home/hadoop/env/data/snn/current/edits of size 4 edits # 0 loaded in 0 seconds.
    14/01/21 16:57:52 INFO common.Storage: Image file /home/hadoop/env/data/name/current/fsimage of size 274 bytes saved in 0 seconds.
    14/01/21 16:57:54 INFO namenode.FSEditLog: closing edit log: position=4, editlog=/home/hadoop/env/data/name/current/edits
    14/01/21 16:57:54 INFO namenode.FSEditLog: close success: truncate to 4, editlog=/home/hadoop/env/data/name/current/edits
    14/01/21 16:57:54 INFO namenode.FSEditLog: Number of transactions: 0 Total time for transactions(ms): 0 Number of transactions batched in Syncs: 0 Number of syncs: 0 SyncTimes(ms): 0 
    ## 恢复 fsimage
    14/01/21 16:57:54 INFO namenode.FSNamesystem: Finished loading FSImage in 1971 msecs
    14/01/21 16:57:54 INFO hdfs.StateChange: STATE* Safe mode ON
    ... ...
    14/01/21 16:58:26 INFO hdfs.StateChange: STATE* Safe mode termination scan for invalid, over- and under-replicated blocks completed in 15 msec
    14/01/21 16:58:26 INFO hdfs.StateChange: STATE* Leaving safe mode after 33 secs
    ## 离开安全模式 Safe mode is OFF
    14/01/21 16:58:26 INFO hdfs.StateChange: STATE* Safe mode is OFF
    14/01/21 16:58:26 INFO hdfs.StateChange: STATE* Network topology has 1 racks and 2 datanodes
    14/01/21 16:58:26 INFO hdfs.StateChange: STATE* UnderReplicatedBlocks has 0 blocks
    

Hadoop 分布式文件系统

| 评论

管理网络跨多台计算机存储的文件系统称为分布式文件系统。当数据的大小超过单台物理计算机存储能力,就需要对它进行分区存储。Hadoop提供了一个综合性的文件系统抽象, Hadoop Distributed FileSystem 简称 HDFS或者DFS,Hadoop 分布式文件系统。它是Hadoop 3大组件之一。其他两大组件为 Hadoop-common 和 Hadoop-mapreduce。

传统以文件为基本单位的存储缺点:首先它很难实现并行化处理某个文件。单个节点一次只能处理一个文件,无法同时处理其他文件;再者,文件大小不同很难实现负载均衡。

HDFS的设计

  • HDFS以流式数据访问模式来存储超大文件,部署运行于廉价的机器上。
  • 可存储超大文件;流式访问,一次写入,多次读取;商用廉价PC,并不需要高昂的高可用的硬件。
  • 但不适用于,低时间延迟的访问;大小文件处理(浪费namenode内存,浪费磁盘空间。);多用户写入,任意修改文件(不支持并发写入。 同一时刻只能一个进程写入,不支持随机修改。)。

数据块

块是磁盘进行数据读写的最小单位,默认是512字节,构建单个磁盘之上的文件系统通过磁盘块来管理文件系统的来管理该文件系统中的块。HDFS的块默认是64MB,HDFS上的文件也被划分为块大小的多个分块(chunk),作为独立的存储单元。HDFS块默认64MB的好处是为了简化磁盘寻址的开销。

HDFS块的抽象好处

  • 一个文件的大小,可以大于网络中任意一个硬盘的大小。文件的块并不需要存储在同一个硬盘上可以存储在分布式文件系统集群中任意一个硬盘上。
  • 大大简化系统设计。这点对于故障种类繁多的分布式系统来说尤为重要。以块为单位,不仅简化存储管理(块大小是固定的,很容易计算一个硬盘放多少个块);而且,消除了元数据的顾虑(因为Block仅仅是存储的一块数据,其文件的元数据,例如权限等就不需要跟数据块一起存储,可以交由另外的其他系来处理)。适合批处理。支持离线的批量数据处理,支持高吞吐量。
  • 块更适合于数据备份。进而提供数据容错能力和系统可用性(将每个块复制至少几个独立的机器上,可以确保在发生块、磁盘或机器故障后数据不丢失。一旦发生某个块不可用,系统将从其他地方复制一份复本。以保证复本的数量恢复到正常水平)。容错性高,容易实现负载均衡。

Namenode 和 Datanode

HDFS采用master/slave架构。一个HDFS集群是由一个Namenode和一定数目的 Datanodes 组成。Namenode是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问。集群中的Datanode一般是一个节点一个,负责管理它所在节点上的存储。HDFS暴露了文件系统的名字空间,用户能够以文件的形式在上面存储数据。从内部看,一个文件其实被分成一个或多个数据块,这些块存储在一组Datanode上。Namenode执行文件系统的namespace操作。比如打开、关闭、重命名文件或目录。它也负责确定数据块到具体Datanode节点的 映射。Datanode负责处理文件系统客户端的读写请求。在Namenode的统一调度下进行数据块的创建、删除和复制。

NameNode上维护文件系统树及整棵树内所有的文件和目录,并永久保存在本地磁盘,fsimage和editslog。

NameNode 将对文件系统的改动追加保存到本地文件系统上的一个日志文件(edits)。当一个 NameNode 启动时,它首先从一个映像文件(fsimage)中读取 HDFS 的状态,接着应用日志文件中的 edits 操作。然后它将新的 HDFS 状态写入(fsimage)中,并使用一个空的 edits 文件开始正常操作。因为 NameNode 只有在启动阶段才合并 fsimage 和 edits,所以久而久之日志文件可能会变得非常庞大,特别是对大型的集群。日志文件太大的副作用是下一次 NameNode 启动会花很长时间。

Hadoop HDFS 架构图

image

在上图中NameNode是Master上的进程,复制控制底层文件的io操作,处理mapreduce任务等。 DataNode运行在slave机器上,负责实际的地层文件io读写。由NameNode存储管理文件系统的命名空间。

客户端代表用户通过与NameNode和DataNode交互来访问整个文件系统。

HDFS 读过程

image

  1. 客户端通过调用 FileSystem 对象的 open() 方法来打开要读取的文件。(步骤 1)在HDFS中是个 DistributedFileSystem 中的一个实例对象。
  2. (步骤 2)DistributedFileSystem 通过PRC[需要提供一个外链接介绍RPC技术]调用 namenode ,获取文件起始块的位置。对于每个块,namenode 返回存有该块复本的 datanode 地址。Datanode 根据它们于该客户端的距离排序,如果该客户端就是一个 datanode 并保存有该数据块的一个复本,该节点就直接从本地 datanode 中读取数据。反之取网路拓扑最短路径。
  3. DistributedFileSystem 返回 FSDataInputStream 给客户端,用来读取数据。FSDataInputStream 类封装 DFSInputStream 对象。由 DFSInputStream 负责管理 DataNode 和 NameNode 的 I/O。
  4. (步骤 3)客户端调用 stream 的 read() 函数开始读取数据。
  5. 存储着文件起始块的 DataNode 地址的 DFSInputStream 随即连接距离最近的 DataNode。反复 read() 方法,将数据从 datanode 传输到客户端。(网络拓扑与Hadoop[机架感知] ? 连接待补。)
  6. 到达块的末端时,DFSInputStream 关闭和此数据节点的连接,然后连接此文件下一个数据块的最佳DataNode。
  7. 客户端读取数据时,块也是按照打开 DFSInputStream 与 datanode 建立连接的顺序读取的。以通过询问NameNode 来检索下一批所需块的 datanode。当客户端读取完毕数据的时候,调用 FSDataInputStream 的 close() 函数。
  8. 在读取数据的过程中,如果客户端在与数据节点通信出现错误,则尝试连接包含此数据块的下一个数据节点。失败的数据节点将被记录,以后不再连接。
源代码理解
  1. 在客户端执行 class DistributedFileSystem open() 方法(装饰模式),打开文件并返回 DFSInputStream。

    // DistributedFileSystem extends FileSystem --> 调用 open() 方法
    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
        statistics.incrementReadOps(1);
        return new DFSClient.DFSDataInputStream(
            dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
    }
    
    // dfs 是 DFSClient 的实例对象。
    public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
                   FileSystem.Statistics stats
    ) throws IOException {
        checkOpen(); 
        //    Get block info from namenode
        return new DFSInputStream(src, buffersize, verifyChecksum);
    }
    
    // DFSInputStream 是 DFSClient 的内部类,继承自 FSInputStream。
    // 调用的构造函数(具体略)中调用了 openInfo() 方法。在 openInfo() 中 重要的是 fetchLocatedBlocks() 向 NameNode 询问所需要的数据的元信息,通过 callGetBlockLocations() 实现。 此过程若没有找到将尝试3次。 
    //
    // 由 callGetBlockLocations()通过 RPC 方式询问 NameNode 获取到 LocatedBlocks 信息。
    static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) throws IOException {
    … … 
        return namenode.getBlockLocations(src, start, length);
    … … 
    }    
    
    // 此处的 namenode 是通过代理模式创建的。它是 namenode  ClientProtocol 的实现(interface ClientProtocol extends VersionedProtocol)。
    private static ClientProtocol createNamenode(ClientProtocol rpcNamenode,
    Configuration conf) throws IOException {
    … … 
        final ClientProtocol cp = (ClientProtocol)RetryProxy.create(ClientProtocol.class, rpcNamenode, defaultPolicy, methodNameToPolicyMap);
        RPC.checkVersion(ClientProtocol.class, ClientProtocol.versionID, cp);
    … … 
    }  
    
  2. 在 class NameNode 端获取数据块位置信息并排序

    public LocatedBlocks   getBlockLocations(String src, long offset, long length) throws IOException {
        myMetrics.incrNumGetBlockLocations();
        // 获取数据块信息。namenode 为 FSNamesystem 实例。
        // 保存的是NameNode的name space树,其属性 FSDirectory dir 关联着 FSImage fsimage 信息,
        // fsimage 关联 FSEditLog editLog。
        return namesystem.getBlockLocations(getClientMachine(), src, offset, length);
    }
    
    // 类 FSNamesystem.getBlockLocationsInternal() 是具体获得块信息的实现。
    private synchronized LocatedBlocks getBlockLocationsInternal(String src,
    long offset, long length, int nrBlocksToReturn, 
    boolean doAccessTime,  boolean needBlockToken) throws IOException {
        … … 
    }  
    
  3. 在客户端DFSClient将步骤1中打开的读文件, DFSDataInputStream 对象内部的 DFSInputStream 对象的 read(long position, byte[] buffer, int offset, int length)方法进行实际的文件读取

    // class DFSInputStream
    public int read(long position, byte[] buffer, int offset, int length)
      throws IOException {
      // sanity checks
      checkOpen();
      if (closed) {
        throw new IOException("Stream closed");
      }
      failures = 0;
      long filelen = getFileLength();
      if ((position < 0) || (position >= filelen)) {
        return -1;
      }
      int realLen = length;
      if ((position + length) > filelen) {
        realLen = (int)(filelen - position);
      }
      //
      // determine the block and byte range within the block
      // corresponding to position and realLen
      // 判断块内的块和字节范围,位置和实际的长度
      List<LocatedBlock> blockRange = getBlockRange(position, realLen);
      int remaining = realLen;
      for (LocatedBlock blk : blockRange) {
        long targetStart = position - blk.getStartOffset();
        long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
        fetchBlockByteRange(blk, targetStart, 
                            targetStart + bytesToRead - 1, buffer, offset);
        remaining -= bytesToRead;
        position += bytesToRead;
        offset += bytesToRead;
      }
      assert remaining == 0 : "Wrong number of bytes read.";
      if (stats != null) {
        stats.incrementBytesRead(realLen);
      }
      return realLen;
    } 
    
    // fetchBlockByteRange() 通过 socket 连接一个最优的 DataNode 来读取数据
    private void fetchBlockByteRange(LocatedBlock block, long start,
                                     long end, byte[] buf, int offset) throws IOException {
      //
      // Connect to best DataNode for desired Block, with potential offset
      //
      Socket dn = null;
      int refetchToken = 1; // only need to get a new access token once
      //      
      while (true) {
        // cached block locations may have been updated by chooseDataNode()
        // or fetchBlockAt(). Always get the latest list of locations at the 
        // start of the loop.
        block = getBlockAt(block.getStartOffset(), false);
        DNAddrPair retval = chooseDataNode(block); // 选者最DataNode
        DatanodeInfo chosenNode = retval.info;
        InetSocketAddress targetAddr = retval.addr;
        BlockReader reader = null;
        try {
          Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
          int len = (int) (end - start + 1);
      //
          // first try reading the block locally.
          if (shouldTryShortCircuitRead(targetAddr)) {// 本地优先
            try {
              reader = getLocalBlockReader(conf, src, block.getBlock(),
                  accessToken, chosenNode, DFSClient.this.socketTimeout, start);
            } catch (AccessControlException ex) {
              LOG.warn("Short circuit access failed ", ex);
              //Disable short circuit reads
              shortCircuitLocalReads = false;
              continue;
            }
          } else {
            // go to the datanode
            dn = socketFactory.createSocket(); // socke datanode
            LOG.debug("Connecting to " + targetAddr);
            NetUtils.connect(dn, targetAddr, getRandomLocalInterfaceAddr(),
                socketTimeout);
            dn.setSoTimeout(socketTimeout);
            reader = RemoteBlockReader.newBlockReader(dn, src, 
                block.getBlock().getBlockId(), accessToken,
                block.getBlock().getGenerationStamp(), start, len, buffersize, 
                verifyChecksum, clientName);
          }
          int nread = reader.readAll(buf, offset, len); // BlockReader 负责读取数据
          return;
        }
        … … 
        finally {
          IOUtils.closeStream(reader);
          IOUtils.closeSocket(dn);
        }
        // Put chosen node into dead list, continue
        addToDeadNodes(chosenNode); // dead datanode
      }
    }
    
  4. NameNode 实例化启动时便监听客户端请求

    DataNode(final Configuration conf,
           final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
    super(conf);
    SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, 
        DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
    //
    datanodeObject = this;
    durableSync = conf.getBoolean("dfs.durable.sync", true);
    this.userWithLocalPathAccess = conf
        .get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
    try {
      startDataNode(conf, dataDirs, resources);// startDataNode
    } catch (IOException ie) {
      shutdown();
      throw ie;
    }   
    }
    
    // startDataNode
    void startDataNode(Configuration conf, 
                     AbstractList<File> dataDirs, SecureResources resources
                     ) throws IOException {
    … …                      
    // find free port or use privileged port provide
    ServerSocket ss;
    if(secureResources == null) {
      ss = (socketWriteTimeout > 0) ? 
        ServerSocketChannel.open().socket() : new ServerSocket();
      Server.bind(ss, socAddr, 0);
    } else {
      ss = resources.getStreamingSocket();
    }
    ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
    // adjust machine name with the actual port
    tmpPort = ss.getLocalPort();
    selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
    //                                     tmpPort);
    this.dnRegistration.setName(machineName + ":" + tmpPort);
    LOG.info("Opened data transfer server at " + tmpPort);
    //
    this.threadGroup = new ThreadGroup("dataXceiverServer");
    this.dataXceiverServer = new Daemon(threadGroup, 
        new DataXceiverServer(ss, conf, this));
    this.threadGroup.setDaemon(true); // DataXceiverServer为守护线程监控客户端连接
    }
    
    // class DataXceiverServer.run()
    public void run() {
    while (datanode.shouldRun) {
      try {
        Socket s = ss.accept();
        s.setTcpNoDelay(true);
        new Daemon(datanode.threadGroup, 
            new DataXceiver(s, datanode, this)).start();
      } catch (SocketTimeoutException ignored) {
      }
    }
    }   
    
    // class DataXceiver.run()
    // Read/write data from/to the DataXceiveServer.
    // 操作类型:OP_READ_BLOCK,OP_WRITE_BLOCK,OP_REPLACE_BLOCK,
    // OP_COPY_BLOCK,OP_BLOCK_CHECKSUM
    public void run() {
    DataInputStream in=null; 
    try {
      in = new DataInputStream(
          new BufferedInputStream(NetUtils.getInputStream(s), 
                                  SMALL_BUFFER_SIZE));
    … … 
      switch ( op ) {
      case DataTransferProtocol.OP_READ_BLOCK:
        readBlock( in );// 读数据
        datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime);
        if (local)
          datanode.myMetrics.incrReadsFromLocalClient();
        else
          datanode.myMetrics.incrReadsFromRemoteClient();
        break;
    … … 
      default:
        throw new IOException("Unknown opcode " + op + " in data stream");
      }
    }  
    
    // class DataXceiver.readBlock()
    // Read a block from the disk.
    private void readBlock(DataInputStream in) throws IOException {
    //
    // Read in the header,读指令
    //
    long blockId = in.readLong();          
    Block block = new Block( blockId, 0 , in.readLong());
    // 
    long startOffset = in.readLong();
    long length = in.readLong();
    String clientName = Text.readString(in);
    Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
    accessToken.readFields(in);
    // 向客户端写数据
    OutputStream baseStream = NetUtils.getOutputStream(s, 
        datanode.socketWriteTimeout);
    DataOutputStream out = new DataOutputStream(
                 new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
    … … 
    // send the block,读取本地的block的数据,并发送给客户端
    BlockSender blockSender = null;
    final String clientTraceFmt =
      clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
        ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
            "%d", "HDFS_READ", clientName, "%d", 
            datanode.dnRegistration.getStorageID(), block, "%d")
        : datanode.dnRegistration + " Served " + block + " to " +
            s.getInetAddress();
    try {
      try {
        blockSender = new BlockSender(block, startOffset, length,
            true, true, false, datanode, clientTraceFmt);
      } catch(IOException e) {
        out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
        throw e;
      }
      out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
      long read = blockSender.sendBlock(out, baseStream, null); // send data,发送数据
      … … 
    } finally {
      IOUtils.closeStream(out);
      IOUtils.closeStream(blockSender);
    }
    }
    

HDFS 写过程

image

  1. 客户端通过对 DistributedFileSystem 对象调用 create() 方法来创建文件(步骤1)。
  2. DistributedFileSystem 通过 PRC 对 namenode 调用create() 方法,在文件系统的命名空间中创建一个新的没有数据块文件(步骤2)。
  3. namenode 检查并确保此文件不存在,并且客户端由创建该文件的权限。通过 namenode 即为创建的新文件创建一条纪录;否则,创建失败,并向客户端抛出 IOException 异常。
  4. DistributedFileSystem 向客户端返回一个 FSDataOutputStream 对象。客户端可以开始写数据。FSDataOutputStream 同样封装一个 DFSoutPutstream 对象。由 DFSInputStream 负责处理 DataNode 和 NameNode 的 I/O。
  5. (步骤3)在客户端写数据时,DFSoutPutstream 将它分成一个个的数据包,并写入内部队列(数据队列)。
  6. 由 DataStreamer(DFSClient 内部类) 处理数据队列。它根据 datanode 列表要求 namenode 分配适合的新块来存储数据备份。这组 datanode 构成一个管线。假设当前复制数为3,那么管线中将有3个节点。DataStreamer 将数据包流式传输到管线(pipeline)的第一个 datanode 节点。该 datanode 存储数据包并将它发送到管线中的第2个 datanode。 同样地,第二个 datanode 存储该数据包并发哦少年宫到管线中的第3个 datanode(步骤4)。
  7. DFSOutputStream 内部维护一个对应的数据包队列等待 datanode 收到确认确认回执(ack queue),当 DFSOutputStream 收到所有的 datanode 确认信息之后,该数据包才从确认队列中删除。
  8. 若在写数据时,datanode 发生故障。则先关闭管线,确认把队列中任何数据包都添加回数据队列的最前端,以确保故障节点下游的 datanode 不会漏掉任何一个数据包。并为存储在另一个 datanode 的当前数据块指定一个新的标志,并将该标志发送个 namenode,以便故障的 datanode 在恢复后可以删除存储的部分数据块。从管线中删除故障 datanode 节点并把余下的数据块写入管线中的2个 datanode。Namenode 注意到块复本量不足时,会在另一个节点上创建一个新的复本。后续数据块继续正常处理。一个块在写入期间发生多个 datanode 故障的概率不高,只要写入了最小复本数(dfs.replication.min默认为1),写入即为成功。此块由异步执行复制以达到目标复本数,默认为3。
  9. 当客户端结束写入数据,则调用 stream 的 close()函数。此操作将剩余所有的数据包写入 datanode pipeline 中,并等待 ack queue 返回成功。最后通知元数据节点写入完毕。namenode 是通过 Datastreamer 询问的数据块的分配,它在返回成功前只需要等待数据块进行最小量的复制。
源代码理解

TODO,原笔记已丢失,待补。 hdfs 架构一页。且读过程和写过程各独立一页。

NadeNode 和 DataNode 实现的协议

TODO ,独立 一页

补充

详细介绍HDFS读写过程解析

Hadoop1.x 学习准备

| 评论

开始学习Hadoop之前先了解下Hadoop之父Doug Cutting , 膜拜是必须的。路子是一步不走出来了。各位前辈给我们留下了宝贝的资源, 不加以学习利用有些说不过去。

我个人是从2013年夏天开始拿到 《Hadoop权威指南》第二版的,但由于各种原因,也可以直接说我比较懒,从夏天到冬季中途到是翻过那书,感觉每次都没由什么实际的记忆和理解过程。在12月初,发现开源力量提供了的开源力量培训课-Hadoop Development网上视频在线教学课程。选择适合自己的就是对的。 ,我毅然成为了其中的一员。 目前我没有从事相关的工作,出于在工作之外寻找乐趣,就来了。

学习之前还是多看些相关的资源比较容易找到赶脚。

资源列举:

部分网站可能是比较难打开的。经常遇到问题谷歌之后发现很多问题还是跳转到这些权威论坛上面了。先登记在案,以备使用。在此仅以纪录我学习《Hadoop权威指南》一书的笔记。 笔记中图片资源大部分出自于原书英文版,章节内容来自于原书中/英文版 + 开源力量 LouisT 老师ppt/课堂示例及扩展,当然不乏在网站上找到各位博主的精品博客参考。

我自己的练习代码存放在 github。