kangfoo's blog

工作学习笔记,生活掠影。

Hadoop RPC

| 评论

Remote Procedure Call 远程方法调用。不需要了解网络细节,某一程序即可使用该协议请求来自网络内另一台及其程序的服务。它是一个 Client/Server 的结构,提供服务的一方称为Server,消费服务的一方称为Client。

Hadoop 底层的交互都是通过 rpc 进行的。例 如:datanode 和 namenode、tasktracker 和 jobtracker、secondary namenode 和 namenode 之间的通信都是通过 rpc 实现的。

TODO: 此文未写明了。明显需要画 4张图, rpc 原理图,Hadoop rpc 时序图, 客户端 流程图,服端流程图。最好帖几个包图+ 类图(组件图)。待完善。

要实现远程过程调用,需要有3要素: 1、server 必须发布服务 2、在 client 和 server 两端都需要有模块来处理协议和连接 3、server 发布的服务,需要将接口给到 client

Hadoop RPC

  1. 序列化层。 Client 与 Server 端通讯传递的信息采用实现自 Writable 类型
  2. 函数调用层。 Hadoop RPC 通过动态代理和 java 反射实现函数调用
  3. 网络传输层。Hadoop RPC 采用 TCP/IP socket 机制
  4. 服务器框架层。Hadoop RPC 采用 java NIO 事件驱动模型提高 RPC Server 吞吐量

TODO 缺个 RPC 图

Hadoop RPC 源代码主要在org.apache.hadoop.ipc包下。org.apache.hadoop.ipc.RPC 内部包含5个内部类。

  • Invocation :用于封装方法名和参数,作为数据传输层,相当于VO(Value Object)。
  • ClientCache :用于存储client对象,用 socket factory 作为 hash key,存储结构为 hashMap
  • Invoker :是动态代理中的调用实现类,继承了 java.lang.reflect.InvocationHandler。
  • Server :是ipc.Server的实现类。
  • VersionMismatch : 协议版本。

从客户端开始进行通讯源代码分析

org.apache.hadoop.ipc.Client 有5个内部类

  • Call: A call waiting for a value.
  • Connection: Thread that reads responses and notifies callers. Each connection owns a socket connected to a remote address. Calls are multiplexed through this socket: responses may be delivered out of order.
  • ConnectionId: This class holds the address and the user ticket. The client connections to servers are uniquely identified by
  • ParallelCall: Call implementation used for parallel calls.
  • ParallelResults: Result collector for parallel calls.

客户端和服务端建立连接的大致执行过程为

  1. 在 Object org.apache.hadoop.ipc.RPC.Invoker.invoke(Object proxy, Method method, Object[] args) 方法中调用
    client.call(new Invocation(method, args), remoteId);

  2. 上述的 new Invocation(method, args) 是 org.apache.hadoop.ipc.RPC 的内部类,它包含被调用的方法名称及其参数。此处主要是设置方法和参数。 client 为 org.apache.hadoop.ipc.Client 的实例对象。

  3. org.apache.hadoop.ipc.Client.call() 方法的具体源代码。在call()方法中 getConnection()内部获取一个 org.apache.hadoop.ipc.Client.Connection 对象并启动 io 流 setupIOstreams()。

    Writable org.apache.hadoop.ipc.Client.call(Writable param, ConnectionId remoteId) throwsInterruptedException, IOException {
    Call call = new Call(param); //A call waiting for a value.   
    // Get a connection from the pool, or create a new one and add it to the
    // pool.  Connections to a given ConnectionId are reused. 
    Connection connection = getConnection(remoteId, call);// 主要在 org.apache.hadoop.net 包下。
    connection.sendParam(call); //客户端发送数据过程
    boolean interrupted = false;
    synchronized (call) {
       while (!call.done) {
        try {
          call.wait();                           // wait for the result
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }
    … …
    }
    }
    // Get a connection from the pool, or create a new one and add it to the
    // pool.  Connections to a given ConnectionId are reused. 
    private Connection getConnection(ConnectionId remoteId,
                                   Call call)
                                   throws IOException, InterruptedException {
    if (!running.get()) {
      // the client is stopped
      throw new IOException("The client is stopped");
    }
    Connection connection;
    // we could avoid this allocation for each RPC by having a  
    // connectionsId object and with set() method. We need to manage the
    // refs for keys in HashMap properly. For now its ok.
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call)); 
    //we don't invoke the method below inside "synchronized (connections)"
    //block above. The reason for that is if the server happens to be slow,
    //it will take longer to establish a connection and that will slow the
    //entire system down.
    connection.setupIOstreams(); // 向服务段发送一个 header 并等待结果
    return connection;
    }
    
  4. setupIOstreams() 方法。

    void org.apache.hadoop.ipc.Client.Connection.setupIOstreams() throws InterruptedException {
    // Connect to the server and set up the I/O streams. It then sends
    // a header to the server and starts
    // the connection thread that waits for responses.
    while (true) {
          setupConnection();//  建立连接
          InputStream inStream = NetUtils.getInputStream(socket); // 输入
          OutputStream outStream = NetUtils.getOutputStream(socket); // 输出
          writeRpcHeader(outStream);
          }
    … … 
    // update last activity time
      touch();
    // start the receiver thread after the socket connection has been set up            start(); 
    }        
    
  5. 启动org.apache.hadoop.ipc.Client.Connection 客户端获取服务器端放回数据过程

    void org.apache.hadoop.ipc.Client.Connection.run()
    while (waitForWork()) {//wait here for work - read or close connection
        receiveResponse();
      }
    

ipc.Server源码分析

ipc.Server 有6个内部类:

  • Call :用于存储客户端发来的请求
  • Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
  • ExceptionsHandler: 异常管理
  • Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
  • Connection :连接类,真正的客户端请求读取逻辑在这个类中。
  • Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

大致过程为:

  1. Namenode的初始化时,RPC的server对象是通过ipc.RPC类的getServer()方法获得的。

    void org.apache.hadoop.hdfs.server.namenode.NameNode.initialize(Configuration conf) throwsIOException
    // create rpc server
    InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
    if (dnSocketAddr != null) {
      int serviceHandlerCount =
        conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), 
          dnSocketAddr.getPort(), serviceHandlerCount,
          false, conf, namesystem.getDelegationTokenSecretManager());
      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
      setRpcServiceServerAddress(conf);
    }
    … …
    this.server.start();  //start RPC server  
    
  2. 启动 server

    void org.apache.hadoop.ipc.Server.start()
    // Starts the service.  Must be called before any calls will be handled.
    public synchronized void start() {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start(); //处理call
    }
    }
    
  3. Server处理请求, server 同样使用非阻塞 nio 以提高吞吐量

    org.apache.hadoop.ipc.Server.Listener.Listener(Server) throws IOException
    public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);
    … … }     
    
  4. 真正建立连接

    void org.apache.hadoop.ipc.Server.Listener.doAccept(SelectionKey key) throws IOException,OutOfMemoryError
    

    Reader 读数据接收请求

    void org.apache.hadoop.ipc.Server.Listener.doRead(SelectionKey key) throws InterruptedException
    try {
        count = c.readAndProcess();
      } catch (InterruptedException ieo) {
        LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
        throw ieo;
      }
    
    int org.apache.hadoop.ipc.Server.Connection.readAndProcess() throws IOException,InterruptedException
    if (!rpcHeaderRead) {
          //Every connection is expected to send the header.
          if (rpcHeaderBuffer == null) {
            rpcHeaderBuffer = ByteBuffer.allocate(2);
          }
          count = channelRead(channel, rpcHeaderBuffer);
          if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
            return count;
          }
          int version = rpcHeaderBuffer.get(0);
    … … 
    processOneRpc(data.array()); // 数据处理
    
  5. 下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。

    void org.apache.hadoop.ipc.Server.Connection.processOneRpc(byte[] buf) throws IOException,InterruptedException
    private void processOneRpc(byte[] buf) throws IOException,
        InterruptedException {
      if (headerRead) {
        processData(buf);
      } else {
        processHeader(buf);
        headerRead = true;
        if (!authorizeConnection()) {
          throw new AccessControlException("Connection from " + this
              + " for protocol " + header.getProtocol()
              + " is unauthorized for user " + user);
        }
      }
    }
    
  6. 处理call

    void org.apache.hadoop.ipc.Server.Handler.run()
    while (running) {
        try {
          final Call call = callQueue.take(); // pop the queue; maybe blocked here
          … … 
          CurCall.set(call);
          try {
            // Make the call as the user via Subject.doAs, thus associating
            // the call with the Subject
            if (call.connection.user == null) {
              value = call(call.connection.protocol, call.param, 
                           call.timestamp);
            } else {
    … …}
    
  7. 返回请求

下面贴出Server.Responder类中的doRespond()方法源码:

void org.apache.hadoop.ipc.Server.Responder.doRespond(Call call) throws IOException
    //
    // Enqueue a response from the application.
    //
    void doRespond(Call call) throws IOException {
      synchronized (call.connection.responseQueue) {
        call.connection.responseQueue.addLast(call);
        if (call.connection.responseQueue.size() == 1) {
          processResponse(call.connection.responseQueue, true);
        }
      }
    }

补充: notify()让因wait()进入阻塞队列里的线程(blocked状态)变为runnable,然后发出notify()动作的线程继续执行完,待其完成后,进行调度时,调用wait()的线程可能会被再次调度而进入running状态。

参考资源:

评论