kangfoo's blog

工作学习笔记,生活掠影。

Hadoop RPC

| 评论

Remote Procedure Call 远程方法调用。不需要了解网络细节,某一程序即可使用该协议请求来自网络内另一台及其程序的服务。它是一个 Client/Server 的结构,提供服务的一方称为Server,消费服务的一方称为Client。

Hadoop 底层的交互都是通过 rpc 进行的。例 如:datanode 和 namenode、tasktracker 和 jobtracker、secondary namenode 和 namenode 之间的通信都是通过 rpc 实现的。

TODO: 此文未写明了。明显需要画 4张图, rpc 原理图,Hadoop rpc 时序图, 客户端 流程图,服端流程图。最好帖几个包图+ 类图(组件图)。待完善。

要实现远程过程调用,需要有3要素: 1、server 必须发布服务 2、在 client 和 server 两端都需要有模块来处理协议和连接 3、server 发布的服务,需要将接口给到 client

Hadoop RPC

  1. 序列化层。 Client 与 Server 端通讯传递的信息采用实现自 Writable 类型
  2. 函数调用层。 Hadoop RPC 通过动态代理和 java 反射实现函数调用
  3. 网络传输层。Hadoop RPC 采用 TCP/IP socket 机制
  4. 服务器框架层。Hadoop RPC 采用 java NIO 事件驱动模型提高 RPC Server 吞吐量

TODO 缺个 RPC 图

Hadoop RPC 源代码主要在org.apache.hadoop.ipc包下。org.apache.hadoop.ipc.RPC 内部包含5个内部类。

  • Invocation :用于封装方法名和参数,作为数据传输层,相当于VO(Value Object)。
  • ClientCache :用于存储client对象,用 socket factory 作为 hash key,存储结构为 hashMap
  • Invoker :是动态代理中的调用实现类,继承了 java.lang.reflect.InvocationHandler。
  • Server :是ipc.Server的实现类。
  • VersionMismatch : 协议版本。

从客户端开始进行通讯源代码分析

org.apache.hadoop.ipc.Client 有5个内部类

  • Call: A call waiting for a value.
  • Connection: Thread that reads responses and notifies callers. Each connection owns a socket connected to a remote address. Calls are multiplexed through this socket: responses may be delivered out of order.
  • ConnectionId: This class holds the address and the user ticket. The client connections to servers are uniquely identified by
  • ParallelCall: Call implementation used for parallel calls.
  • ParallelResults: Result collector for parallel calls.

客户端和服务端建立连接的大致执行过程为

  1. 在 Object org.apache.hadoop.ipc.RPC.Invoker.invoke(Object proxy, Method method, Object[] args) 方法中调用
    client.call(new Invocation(method, args), remoteId);

  2. 上述的 new Invocation(method, args) 是 org.apache.hadoop.ipc.RPC 的内部类,它包含被调用的方法名称及其参数。此处主要是设置方法和参数。 client 为 org.apache.hadoop.ipc.Client 的实例对象。

  3. org.apache.hadoop.ipc.Client.call() 方法的具体源代码。在call()方法中 getConnection()内部获取一个 org.apache.hadoop.ipc.Client.Connection 对象并启动 io 流 setupIOstreams()。

    Writable org.apache.hadoop.ipc.Client.call(Writable param, ConnectionId remoteId) throwsInterruptedException, IOException {
    Call call = new Call(param); //A call waiting for a value.   
    // Get a connection from the pool, or create a new one and add it to the
    // pool.  Connections to a given ConnectionId are reused. 
    Connection connection = getConnection(remoteId, call);// 主要在 org.apache.hadoop.net 包下。
    connection.sendParam(call); //客户端发送数据过程
    boolean interrupted = false;
    synchronized (call) {
       while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }
    … …
    }
    }
    // Get a connection from the pool, or create a new one and add it to the
    // pool.  Connections to a given ConnectionId are reused. 
    private Connection getConnection(ConnectionId remoteId,
                                   Call call)
                                   throws IOException, InterruptedException {
    if (!running.get()) {
      // the client is stopped
      throw new IOException("The client is stopped");
    }
    Connection connection;
    // we could avoid this allocation for each RPC by having a  
    // connectionsId object and with set() method. We need to manage the
    // refs for keys in HashMap properly. For now its ok.
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call)); 
    //we don't invoke the method below inside "synchronized (connections)"
    //block above. The reason for that is if the server happens to be slow,
    //it will take longer to establish a connection and that will slow the
    //entire system down.
    connection.setupIOstreams(); // 向服务段发送一个 header 并等待结果
    return connection;
    }
    
  4. setupIOstreams() 方法。

    void org.apache.hadoop.ipc.Client.Connection.setupIOstreams() throws InterruptedException {
    // Connect to the server and set up the I/O streams. It then sends
    // a header to the server and starts
    // the connection thread that waits for responses.
    while (true) {
          setupConnection();//  建立连接
          InputStream inStream = NetUtils.getInputStream(socket); // 输入
          OutputStream outStream = NetUtils.getOutputStream(socket); // 输出
          writeRpcHeader(outStream);
          }
    … … 
    // update last activity time
      touch();
    // start the receiver thread after the socket connection has been set up            start(); 
    }        
    
  5. 启动org.apache.hadoop.ipc.Client.Connection 客户端获取服务器端放回数据过程

    void org.apache.hadoop.ipc.Client.Connection.run()
    while (waitForWork()) {//wait here for work - read or close connection
        receiveResponse();
      }
    

ipc.Server源码分析

ipc.Server 有6个内部类:

  • Call :用于存储客户端发来的请求
  • Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
  • ExceptionsHandler: 异常管理
  • Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
  • Connection :连接类,真正的客户端请求读取逻辑在这个类中。
  • Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

大致过程为:

  1. Namenode的初始化时,RPC的server对象是通过ipc.RPC类的getServer()方法获得的。

    void org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(Configuration conf) throwsIOException
    // create rpc server
    InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
    if (dnSocketAddr != null) {
      int serviceHandlerCount =
        conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), 
          dnSocketAddr.getPort(), serviceHandlerCount,
          false, conf, namesystem.getDelegationTokenSecretManager());
      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
      setRpcServiceServerAddress(conf);
    }
    … …
    this.server.start();  //start RPC server  
    
  2. 启动 server

    void org.apache.hadoop.ipc.Server.start()
    // Starts the service.  Must be called before any calls will be handled.
    public synchronized void start() {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start(); //处理call
    }
    }
    
  3. Server处理请求, server 同样使用非阻塞 nio 以提高吞吐量

    org.apache.hadoop.ipc.Server.Listener.Listener(Server) throws IOException
    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);
    … … }     
    
  4. 真正建立连接

    void org.apache.hadoop.ipc.Server.Listener.doAccept(SelectionKey key) throws IOException,OutOfMemoryError
    

    Reader 读数据接收请求

    void org.apache.hadoop.ipc.Server.Listener.doRead(SelectionKey key) throws InterruptedException
    try {
        count = c.readAndProcess();
      } catch (InterruptedException ieo) {
        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
        throw ieo;
      }
    
    int org.apache.hadoop.ipc.Server.Connection.readAndProcess() throws IOException,InterruptedException
    if (!rpcHeaderRead) {
          //Every connection is expected to send the header.
          if (rpcHeaderBuffer == null) {
            rpcHeaderBuffer = ByteBuffer.allocate(2);
          }
          count = channelRead(channel, rpcHeaderBuffer);
          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
            return count;
          }
          int version = rpcHeaderBuffer.get(0);
    … … 
    processOneRpc(data.array()); // 数据处理
    
  5. 下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。

    void org.apache.hadoop.ipc.Server.Connection.processOneRpc(byte[] buf) throws IOException,InterruptedException
    private void processOneRpc(byte[] buf) throws IOException,
        InterruptedException {
      if (headerRead) {
        processData(buf);
      } else {
        processHeader(buf);
        headerRead = true;
        if (!authorizeConnection()) {
          throw new AccessControlException("Connection from " + this
              + " for protocol " + header.getProtocol()
              + " is unauthorized for user " + user);
        }
      }
    }
    
  6. 处理call

    void org.apache.hadoop.ipc.Server.Handler.run()
    while (running) {
        try {
          final Call call = callQueue.take(); // pop the queue; maybe blocked here
          … … 
          CurCall.set(call);
          try {
            // Make the call as the user via Subject.doAs, thus associating
            // the call with the Subject
            if (call.connection.user == null) {
              value = call(call.connection.protocol, call.param, 
                           call.timestamp);
            } else {
    … …}
    
  7. 返回请求

下面贴出Server.Responder类中的doRespond()方法源码:

void org.apache.hadoop.ipc.Server.Responder.doRespond(Call call) throws IOException
    //
    // Enqueue a response from the application.
    //
    void doRespond(Call call) throws IOException {
      synchronized (call.connection.responseQueue) {
        call.connection.responseQueue.addLast(call);
        if (call.connection.responseQueue.size() == 1) {
          processResponse(call.connection.responseQueue, true);
        }
      }
    }

补充: notify()让因wait()进入阻塞队列里的线程(blocked状态)变为runnable,然后发出notify()动作的线程继续执行完,待其完成后,进行调度时,调用wait()的线程可能会被再次调度而进入running状态。

参考资源:

Hadoop I/O

| 评论

HDFS 对网络IO, 磁盘IO 的操作是比较复杂且开销还比较高的。Hadoop 在设计中使用了内部的原子操作、压缩、随机读写、流式存储、数据完整性校验、序列化、基于文件的数据结构等方面进行 IO 操作。

数据完整性

保证数据在传输过程中不损坏,常见的保证数据完整性采用的技术

  • 奇偶校验技术
  • ECC 内存纠错校验技术
  • CRC-32 循环冗余校验技术

HDFS的数据完整性

HDFS 会对写入的所有数据计算校验和,并在读取数据时验证校验和。它针对每个由 io.bytes.per.checksum(默认512字节,开销低于1%)指定字节数据技术校验和。

DataNode 负责在验证收到的数据后存储数据及其校验和。从客户端和其它数据节点复制过来的数据。客户端写入数据并且将它发送到一个数据节点管线中,在管线的最后一个数据节点验证校验和。

客户端读取 DataNode 上的数据时,也会验证校验和。将其与 DataNode 上存储的校验和进行对比。每个 DataNode 维护一个连续的校验和验证日志,因此它知道每个数据块最后验证的时间。

每个 DataNode 还会在后台线程运行一个 DataBlockScanner(数据块检测程序),定期验证存储在数据节点上的所有块,以解决物理存储媒介上位损坏问题。

HDFS 通过复制完整的数据复本来修复损坏的数据块,进而得到一个新的、完好无损的复本。基本思路:如果客户端读取数据块时检测到错误,就向 NameNode 汇报已损坏的数据块及它试图从名称节点中要读取的 DataNode,并抛出 ChecksumException。 NameNode 将这个已损坏的数据块复本标记为已损坏,并不直接与 datanode 联系,或尝试将这个个复本复制到另一个 datanode。之后,namennode 安排这个数据块的一个复本复制到另一个 datanode。 至此,数据块复制因子恢复到期望水平。此后,并将已损坏的数据块复本删除。

LocalFileSystem

Hadoop的 LocalFileSystem 执行客户端校验。意味着,在写一个名filename的文件时,文件系统的客户端以透明的方式创建一个隐藏.filename.crc。在同一个文件夹下,包含每个文件块的校验和。

禁用校验和,使用底层文件系统原生支持校验和。这里通过 RawLocalFileSystem 来替代 LocalFileSystem 完成。要在一个应用中全局使用,只需要设置 fs.file.impl值为 org.apache.hadoop.fs.RawLocalFileSystem 来重新 map 执行文件的 URL。或者只想对某些读取禁用校验和校验。例:

Configuration conf = ...
FileSystem fs = new RawLocalFileSystem();
fs.initialize(null, conf);

ChecksumFileSystem

LocalFileSystem 继承自 ChecksumFileSystem(校验和文件系统),ChecksumFileSystem 继承自 FileSystem。ChecksumFileSystem 可以很容易的添加校验和功能到其他文件系统中。

压缩

将文件压缩有两大好处

  • 减少存储文件所需要的磁盘空间
  • 加速数据在网络和磁盘上的传输

编译native-hadoop

参见 《Native-hadoop 编译》

压缩算法

所有的压缩算法都需要权衡时间/空间比.压缩和解压缩速度越快,节省空间越少。gizp压缩空间/时间性能比较适中。bzip2比gzip更高效,但数度更慢; lzo 压缩速度比gzip比较快,但是压缩效率稍微低一点。

Hadoop支持的压缩格式

压缩格式 工具 算法 文件扩展名 多文件 可切分
DEFLATE DEFLATE .deflate
Gzip gzip DEFLATE .gz
bzip2 bzip2 bzip2 .bz
LZO Lzop LZO .lzo


编码/解码 用以执行压缩解压算法,是否有java/原生实现

压缩格式 codec java实现 原生实现
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.Bzip2Codec
LZO com.hadoop.compression.lzo.LzopCodec


压缩算法相关的 API

使用 CompressionCodecFactory.getCodec()方法来推断 CompressionCodec 具体实现。由 CompressionCodec 接口的实现对流进行进行压缩与解压缩。CodecPool 提供了重复利用压缩和解压缩的对象的机制。

… … 画个类图。## TOTO

NativeCodeLoader 加载 native-hadoop library 若想使用 snappycode 首先加载 snappy.so,再判断加载 native hadoop–>hadoop.so。native hadoop 中包含了 java 中申明的native 方法,由 native 方法去调用第三方的 natvie library。native_libraries官方参考文档

在 Hadoop core-site.xml 配置文件中可以设置是否使用本地库,默认以启用。

<property>
  <name>hadoop.native.lib</name>
  <value>true</value>
  <description>Should native hadoop libraries, if present, be used.</description>
</property>

编写使用压缩的测试程序

  1. 首先下载并编译 snappy,zlib
  2. 编写 java 代码 CompressionTest.java, DeCompressionTest.java。 程序是由 maven test 进行。
  3. 执行
    $ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/hadoop/env/hadoop/lib/native/Linux-amd64-64:/usr/local/lib
    $ mvn test -Dtest=com.kangfoo.study.hadoop1.io.CompressionTest
    $ mvn test -Dtest=com.kangfoo.study.hadoop1.io.DeCompressionTest
    ## 结果:
    rw-rw-r--. 1 hadoop hadoop 531859 7月  23 2013 releasenotes.html
    -rw-rw-r--. 1 hadoop hadoop 140903 1月  22 15:13 releasenotes.html.deflate
    -rw-rw-r--. 1 hadoop hadoop 531859 1月  22 15:22 releasenotes.html.deflate.decp
    -rw-rw-r--. 1 hadoop hadoop 140915 1月  22 15:13 releasenotes.html.gz
    -rw-rw-r--. 1 hadoop hadoop 531859 1月  22 15:22 releasenotes.html.gz.decp
    -rw-rw-r--. 1 hadoop hadoop 224661 1月  22 15:13 releasenotes.html.snappy
    -rw-rw-r--. 1 hadoop hadoop 531859 1月  22 15:22 releasenotes.html.snappy.decp
    ## 日志:
    Running com.kangfoo.study.hadoop1.io.CompressionTest
    2014-01-22 15:13:31,312 WARN  snappy.LoadSnappy (LoadSnappy.java:<clinit>(36)) - Snappy native library is available
    2014-01-22 15:13:31,357 INFO  util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(43)) - Loaded the native-hadoop library
    2014-01-22 15:13:31,357 INFO  snappy.LoadSnappy (LoadSnappy.java:<clinit>(44)) - Snappy native library loaded
    2014-01-22 15:13:31,617 INFO  zlib.ZlibFactory (ZlibFactory.java:<clinit>(47)) - Successfully loaded & initialized native-zlib library
    

启用压缩

出于性能考虑,使用原生的压缩库要比同时提供 java 实现的开销更小。可以修改 Hadoop core-site.xml 配置文件 io.compression.codecs 以启用压缩,前提是必须安装好对应的原生压缩库依赖,并配置正确的 Codec。

  • 属性名: io.compression.codecs
  • 默认值:org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.ompress.Bzip2Codec

压缩与输入分割

考虑如何压缩将由 MapReduce 处理的数据时,是否支持分割很重要。

案例假设,一个gzip压缩的文件的为1GB。HDFS 将其分为16块(64mb 块大小),其中每一数据块最为一个 map 任务输入。那么在 map 任务中,每一个分块是无法独立工作的( gzip 是使用的 DEFLATE 算法,它将数据存储在一系列的压缩块中。无法实现从数据流的任意位置读取数据,那么这些分块必须全部读取并与整个数据流进行同步才能从任意位置进行读取数据)。这样就失去了本地化的优势。一个 map 要处理其他15个分块的数据,而大多数据并不存储在当前 map 节点上。Map的任务数越少,作业的粒度就较大,运行的时间可能会更长。

具体应该选择哪种压缩形式,还要经过测试,才可以决定。大文件选择支持分割的压缩形式,目前只有 bzip2 支持分片,但没有原生库的实现。或者使用 SequenceFile, MapFile 数据格式进行小文件的合并再存储,这样可以满足分片。

在 MapReduce 中使用压缩

如果文件是压缩过的,那么在被 MapReduce 读取时,它们会被解压,根据文件扩展名选择对应的解码器。可参考 MapReduce 块压缩相关知识。

压缩 MapReduce 的作业输出

  1. 在作业配置中将 mapred.output.compress 属性设置为 true
  2. 将 mapred.output.compression.codec 属性设置为自己需要使用的压缩解码/编码器的类名。

代码示例

conf.setBoolean(“mapred.output.compress”,true)
Conf.setClass(“mapred.output.compression.codec”,GizpCodec.class,CompressionCodec.class);

对 Map 任务输出结果的压缩

压缩 Map 作业的中间结果以减少网络传输。

Map输出压缩属性
属性名称: mapred.compress.map.output
类型: boolean
默认值: false
描述: 对 map 任务输出是否进行压缩

属性名称: mapred.map.output.compression.codec
类型: Class
默认值: org.apache.hadoop.io.compress.DefaultCodec
描述: map 输出所用的压缩 codec

代码示例

conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.classs)


序列化和反序列化

什么是Hadoop的序列化? 序列化,将结构化对象转换为字节流,以便于在网络传输和磁盘存储的过程。反序列化,将字节流转化为结构化的对象的逆过程。可用于进程间的通讯和永久存储,数据拷贝

序列化特点:

  • 紧凑:可充分利用网络带宽(哈夫曼编码)
  • 快速:尽量减少序列化和反序列化的开销
  • 可扩展:通讯协议升级向下兼容
  • 互操作:支持不同语言间的通讯

Hadoop1.x 仅满足了紧凑和快速两个特性。 java 自身提供的序列化并不精简。Java Serializaiton 是序列化图对象的通讯机制,它有序列化和反序列化的开销。 java 序列化比较复杂,不能很精简的控制对象的读写。连接/延迟/缓冲。java 序列化不能满足: 精简,快速,可扩展,可互操作。

Hadoop1.x 使用 Writable 实现自己的序列化格式。它结构紧凑,快速。但难以用 java 以外的语言进行扩展。

Writable 接口

Writeable 接口定义了2个方法:

void write(DataOutput out) throws IOException; // 将其状态写入二进制格式的 DataOutput 流;
void readFields(DataInput in) throws IOException; // 从二进制格式的 DataInput 流读取其状态

画个类图 ## TODO

writable
writableComparable(interface WritableComparable<T> extends Writable, Comparable<T> )
comparator(int compare(T o1, T o2);)
comparable(public int compareTo(T o);)
rawcomparator(public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);)
writablecomparator(ReflectionUtils.newInstance(keyClass, null);)

Writable 类的层次结构 image

部分类型列举

  • NullWritable 是一种特殊的Writable类型,单例的, 序列化的长度是零。可以做占位符。
  • Text 是针对UTF-8序列化的Writable类。一般可等价于 java.lang.String 的 Writable。Text是可变的。
  • BytesWritable 是一个对二进制的封装,序列化为一个格式为一个用于制定后面数据字节数的整数域(4字节),后跟字节本身。它是可变的。如:
    BytesWritable b = new BytesWritable(new byte[]{2,5,127}); // 3个长度
    byte[] bytes = serialize(b);
    assertThat(StringUtils.byteToHexString(bytes), is("0000000302057f"));
    
  • ObjectWritable 适用于java基本类型(String,enum,Writable,null或者这些类型组成的数组)的一个封装。
  • Writable集合。ArrayWritable和TwoDArrayWritable针对于数组和二维数组,它们中所有的元素必须是同一个类的实例。MapWritable和SortedMapWritable是针对于 Map 和 SorMap。

自定义Writable •实现WritableComparable •实现

write(); // 将对象转换为字节流并写入到输出流 out 中
readFields(); // 从输入流 in 中读取字节流并反序列化为对象
compareTo()方法。 // 将 this 对像与对象 O 比较

示例程序代码

序列化框架

  • apache avro 旨在解决Hadoop中Writable类型的不足:缺乏语言的可移植性。
  • apache thrift 可伸缩的跨语言, 提供了 PRC 实现层。
  • Protocol Buffers 是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,很适合做数据存储或 RPC 数据交换格式。它可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。目前提供了 C++、Java、Python 三种语言的 API。

参考

基于文件的数据结构

使用 SequenceFile, MapFile 主要解决的问题是:支持分片的数据压缩格式的比较有限,对于某些应用而言,需要处理的数据格式来存储自己的格式,MapRedurce 需要更高级的容器。

SequenceFile

  1. 文件是以二进制键/值对形式存储的平面文件
  2. 可以作为小文件的容器,它将小文件包装起来,以获取更高效率的存储和处理
  3. 存储在 SequenceFile 中的 key/valu e并不一定是 Writable 类型
  4. 可使用 append()方法在文件末位附加 key/value 对

好处

  1. 支持纪录或者块压缩
  2. 支持splittable, 可作为mapreduce输入分片
  3. 修改简单(har是不可以修改的)

SequenceFile 压缩

SequenceFile 文件格式内部结构与是否启用压缩有关。启用压缩又分两类:纪录压缩;数据块压缩。

  1. 无压缩。 默认是不启用压缩,则每个纪录就是它的纪录长度(字节数)、键长度、键和值组成。长度字段为4字节的整数。

  2. 纪录压缩。其格式与无压缩情况相同,不同在于纪录压缩的值需要通过文件头中定义的压缩codec进行压缩。键不压缩。
    无压缩和纪录压缩的示意图: image

  3. 块压缩。一次压缩多条纪录,比单条纪录压缩效率高。可以不断的向数据块中压缩纪录,直到字节数不小于io.seqfile.compress.blocksize属性中设置的字节数。默认1MB.每个新的块的开始处都需要插入同步标识。数据块的格式如下:首先是一个指示数据块中字节数的字段;紧跟是4个压缩字段(键长度、键、值长度、值)。块压缩示意图如下: image

实例程序代码

运行结果

## 查看 sequence file
$ ./bin/hadoop fs -text /numbers.seq
## 排序
$ ./bin/hadoop jar ./hadoop-examples-1.2.1.jar sort -r 1 -inFormat org.apache.hadoop.mapred.SequenceFileInputFormat -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat -outKey org.apache.hadoop.io.IntWritable -outValue org.apache.hadoop.io.Text /numbers.seq sorted
## 查看排序后的结果(原键降序排列为从1到100升序排列)
$ ./bin/hadoop fs -text /user/hadoop/sorted/part-00000

博客参考

MapFile

MapFile 是已经排序的 SequenceFile,可以视为 java.util.Map 持久化形式。它已加入了搜索键的索引,可以根据 key 进行查找。它的键必须是 WritableComparable 类型的实例,值必须是 Writable 类型的实例,而 SequenceFile 无此要求。使用 MapFile.fix() 方法进行索引重建,把 SequenceFile 转换为 MapFile。

MapFile java 源代码

org.apache.hadoop.io.MapFile.Writer{ 
// 类的内部结构(MapFile是已经排序的SequenceFile):
private SequenceFile.Writer data;
private SequenceFile.Writer index;
… … 
}
org.apache.hadoop.io.MapFile.Reader{
// 二分法查找。一次磁盘寻址 + 一次最多顺序128(默认值等于每128下一个索引)个条目顺序扫瞄
public synchronized Writable get(WritableComparable key, Writable val){
… … 
}

实例程序代码

运行结果

$ ./bin/hadoop fs -text /numbers.map/data 
$ ./bin/hadoop fs -text /numbers.map/index

SequenceFile合并为MapFile

  1. 新建SequenceFile文件
    $ ./bin/hadoop jar ./hadoop-examples-1.2.1.jar sort -r 1 -inFormat org.apache.hadoop.mapred.SequenceFileInputFormat -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat -outKey org.apache.hadoop.io.IntWritable -outValue org.apache.hadoop.io.Text /numbers.seq /numbers2.map
    
  2. 重命名文件夹
    $ ./bin/hadoop fs -mv /numbers2.map/part-00000 /numbers2.map/data
    
  3. 运行测试用例
  4. 验证结果
    Created MapFile hdfs://master11:9000/numbers2.map with 100 entries
    rw-r--r--   2 hadoop      supergroup       4005 2014-02-09 20:06 /numbers2.map/data
    -rw-r--r--   3 kangfoo-mac supergroup        136 2014-02-09 20:13 /numbers2.map/index
    

Hadoop Pipes 编译

| 评论

我在编译 Hadoop Pipes 的时候,出现了些小问题。主要是我没有安装 openssl-devel。本以为安装 openssl 就差不多了,可这个就是问题的根源, 我现在是自己动手编译 pipes, 而 Hadoop 的 pipes 编译需要 openssl 的依赖,那么在编译的时候最好还是将 openssl-devel 开发支持的依赖补上比较省事。在解决问题的时发现网上向我一样的同学还是有的。在此我就贴下我编译时的部分日志。

  1. 在Hadoop 根目录下执行

    ant -Dcompile.c++=yes examples
    ##错误
    [exec] checking for HMAC_Init in -lssl... no
    BUILD FAILED
    /home/hadoop/env/hadoop-1.2.1/build.xml:2164: exec returned: 255
    … … 
    ./configure: line 5234: exit: please: numeric argument required
    ##具体日志:
    … … 
     [exec] configure: error: Cannot find libssl.so ## 没有 libssl.so
     [exec] /home/hadoop/env/hadoop-1.2.1/src/c++/pipes/configure: line 5234: exit: please: numeric argument required
     [exec] /home/hadoop/env/hadoop-1.2.1/src/c++/pipes/configure: line 5234: exit: please: numeric argument required
     [exec] checking for HMAC_Init in -lssl... no 
    
  2. 检查 ssl

    $ yum info openssl
    $ ll /usr/lib64/libssl*
    -rwxr-xr-x. 1 root root 221568 2月  23 2013 /usr/lib64/libssl3.so
    lrwxrwxrwx. 1 root root     16 12月  8 18:14 /usr/lib64/libssl.so.10 -> libssl.so.1.0.1e
    -rwxr-xr-x. 1 root root 436984 12月  4 04:21 /usr/lib64/libssl.so.1.0.1e
    ## 缺个 libssl.so 的文件, 于是添加软链接:
    sudo ln -s /usr/lib64/libssl.so.1.0.1e /usr/lib64/libssl.so
    
  3. 切换目录到 pipes 下再次编译

    $cd /home/hadoop/env/hadoop/src/c++/pipes
    执行
    $ make distclean
    $ ./configure 
    [hadoop@master11 pipes]$ ./configure 
    checking for a BSD-compatible install... /usr/bin/install -c
    … … 
    checking whether it is safe to define __EXTENSIONS__... yes
    checking for special C compiler options needed for large files... no
    checking for _FILE_OFFSET_BITS value needed for large files... no
    checking pthread.h usability... yes
    checking pthread.h presence... yes
    checking for pthread.h... yes
    checking for pthread_create in -lpthread... yes
    checking for HMAC_Init in -lssl... no
    configure: error: Cannot find libssl.so ## 还是没找到
    ./configure: line 5234: exit: please: numeric argument required
    ./configure: line 5234: exit: please: numeric argument required
    
  4. 安装openssl-devel, sudo yum install openssl-devel

  5. 再切换到Hadoop根目录下执行

    ant -Dcompile.c++=yes examples
    ##搞定,编译通过
    compile-examples:
    [javac] /home/hadoop/env/hadoop-1.2.1/build.xml:742: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
    [javac] Compiling 24 source files to /home/hadoop/env/hadoop-1.2.1/build/examples
    [javac] 警告: [options] 未与 -source 1.6 一起设置引导类路径
    [javac] 注: /home/hadoop/env/hadoop-1.2.1/src/examples/org/apache/hadoop/examples/MultiFileWordCount.java使用或覆盖了已过时的 API。
    [javac] 注: 有关详细信息, 请使用 -Xlint:deprecation 重新编译。
    [javac] 1 个警告
    examples:
      [jar] Building jar: /home/hadoop/env/hadoop-1.2.1/build/hadoop-examples-1.2.2-SNAPSHOT.jar
    BUILD SUCCESSFUL
    Total time: 1 minute 11 seconds
    

Native-hadoop 编译

| 评论

对我来讲编译 native hadoop 并不是很顺利。现将问题纪录在案。

主要问题

  1. ivy 联网获取资源并不稳定
  2. hadoop-1.2.1/build.xml:62: Execute failed: java.io.IOException: Cannot run program “autoreconf” (in directory “/home/userxxx/hadoop/hadoop-1.2.1/src/native”): java.io.IOException: error=2, No such file or directory
  3. [exec] configure: error: Zlib headers were not found… native-hadoop library needs zlib to build. Please install the requisite zlib development package.
  4. 多次编译失败之后要记得执行 make distclean 清理一下。
  5. 编译完 ant compile-native 之后,启动 hadoop 使用 http 访问 /dfshealth.jsp /jobtracker.jsp HTTP ERROR 404
  6. 在 Linux 平台下编译 native hadoop 是不可以的,目前。错误:/hadoop-1.2.1/build.xml:694: exec returned: 1

解决方案

  1. 第一个问题只能多次尝试。
  2. 第二,第三个问题主要是是没有安装 zlib。顺便请保证 gcc c++, autoconf, automake, libtool, openssl,openssl-devel 也安装。安装 zlib 请参考 http://www.zlib.net/ 。
  3. 第四个问题就是 基本的 make 三部曲的步骤。
  4. 第五个问题原因是在 build native 库的同时,生成了 webapps 目录(在当前的 target 这个目录是个基本的结构,没有任何 jsp 等资源,404找不到很正常)。那么当我们编译过build之后,hadoop启动时又指向了这个目录,就导致这个错误。我们就可以直接将这个 build 文件夹删除了或者改脚本。问题搞定了。
  5. 第六个问题,援引官方
    Supported Platforms
    The native hadoop library is supported on *nix platforms only. The library does not to work with Cygwin or the Mac OS X platform.
    
    那就老实点用 *nix platforms,就没事了。

Java JNI练习

| 评论

在 Hadoop 中大量使用了 JNI 技术以提高性能并利用其他语言已有的成熟算法简化开发难度。如压缩算法、pipes 等。那么在具体学习 native hadoop, hadoop io 前先简单复习下相关知识。在此我主要是参见了 Oracle 官方网站 + IBM developerworks + csdn 论坛 写了个简单的 Hello World 程序。

此处主要是将我参考的资源进行了列举,已备具体深入学习参考。

JNI 编程主要步骤

  1. 编写一个.java
  2. javac *.java
  3. javah -jni className -> *.h
  4. 创建一个.so/.dll 动态链接库文件

编程注意事项

  1. 不要直接使用从java里面传递过来的value.(在java里面的对象在本地调用前可能被jvm析构函数了)
  2. 一旦不使用某对象或者变量,要去ReleaseXXX()。
  3. 不要在 native code 里面去申请内存
  4. 使用 javap -s 查看java 签名