NameNode 将对文件系统的改动追加保存到本地文件系统上的一个日志文件(edits)。当一个 NameNode 启动时,它首先从一个映像文件(fsimage)中读取 HDFS 的状态,接着应用日志文件中的 edits 操作。然后它将新的 HDFS 状态写入(fsimage)中,并使用一个空的 edits 文件开始正常操作。因为 NameNode 只有在启动阶段才合并 fsimage 和 edits,所以久而久之日志文件可能会变得非常庞大,特别是对大型的集群。日志文件太大的副作用是下一次 NameNode 启动会花很长时间。
在上图中NameNode是Master上的进程,复制控制底层文件的io操作,处理mapreduce任务等。
DataNode运行在slave机器上,负责实际的地层文件io读写。由NameNode存储管理文件系统的命名空间。
在客户端执行 class DistributedFileSystem open() 方法(装饰模式),打开文件并返回 DFSInputStream。
// DistributedFileSystem extends FileSystem --> 调用 open() 方法
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
statistics.incrementReadOps(1);
return new DFSClient.DFSDataInputStream(
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
}
// dfs 是 DFSClient 的实例对象。
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
FileSystem.Statistics stats
) throws IOException {
checkOpen();
// Get block info from namenode
return new DFSInputStream(src, buffersize, verifyChecksum);
}
// DFSInputStream 是 DFSClient 的内部类,继承自 FSInputStream。
// 调用的构造函数(具体略)中调用了 openInfo() 方法。在 openInfo() 中 重要的是 fetchLocatedBlocks() 向 NameNode 询问所需要的数据的元信息,通过 callGetBlockLocations() 实现。 此过程若没有找到将尝试3次。
//
// 由 callGetBlockLocations()通过 RPC 方式询问 NameNode 获取到 LocatedBlocks 信息。
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
String src, long start, long length) throws IOException {
… …
return namenode.getBlockLocations(src, start, length);
… …
}
// 此处的 namenode 是通过代理模式创建的。它是 namenode ClientProtocol 的实现(interface ClientProtocol extends VersionedProtocol)。
private static ClientProtocol createNamenode(ClientProtocol rpcNamenode,
Configuration conf) throws IOException {
… …
final ClientProtocol cp = (ClientProtocol)RetryProxy.create(ClientProtocol.class, rpcNamenode, defaultPolicy, methodNameToPolicyMap);
RPC.checkVersion(ClientProtocol.class, ClientProtocol.versionID, cp);
… …
}
在 class NameNode 端获取数据块位置信息并排序
public LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException {
myMetrics.incrNumGetBlockLocations();
// 获取数据块信息。namenode 为 FSNamesystem 实例。
// 保存的是NameNode的name space树,其属性 FSDirectory dir 关联着 FSImage fsimage 信息,
// fsimage 关联 FSEditLog editLog。
return namesystem.getBlockLocations(getClientMachine(), src, offset, length);
}
// 类 FSNamesystem.getBlockLocationsInternal() 是具体获得块信息的实现。
private synchronized LocatedBlocks getBlockLocationsInternal(String src,
long offset, long length, int nrBlocksToReturn,
boolean doAccessTime, boolean needBlockToken) throws IOException {
… …
}
在客户端DFSClient将步骤1中打开的读文件, DFSDataInputStream 对象内部的 DFSInputStream 对象的 read(long position, byte[] buffer, int offset, int length)方法进行实际的文件读取
// class DFSInputStream
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
// sanity checks
checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
failures = 0;
long filelen = getFileLength();
if ((position < 0) || (position >= filelen)) {
return -1;
}
int realLen = length;
if ((position + length) > filelen) {
realLen = (int)(filelen - position);
}
//
// determine the block and byte range within the block
// corresponding to position and realLen
// 判断块内的块和字节范围,位置和实际的长度
List<LocatedBlock> blockRange = getBlockRange(position, realLen);
int remaining = realLen;
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
fetchBlockByteRange(blk, targetStart,
targetStart + bytesToRead - 1, buffer, offset);
remaining -= bytesToRead;
position += bytesToRead;
offset += bytesToRead;
}
assert remaining == 0 : "Wrong number of bytes read.";
if (stats != null) {
stats.incrementBytesRead(realLen);
}
return realLen;
}
// fetchBlockByteRange() 通过 socket 连接一个最优的 DataNode 来读取数据
private void fetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset) throws IOException {
//
// Connect to best DataNode for desired Block, with potential offset
//
Socket dn = null;
int refetchToken = 1; // only need to get a new access token once
//
while (true) {
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
block = getBlockAt(block.getStartOffset(), false);
DNAddrPair retval = chooseDataNode(block); // 选者最DataNode
DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
BlockReader reader = null;
try {
Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
int len = (int) (end - start + 1);
//
// first try reading the block locally.
if (shouldTryShortCircuitRead(targetAddr)) {// 本地优先
try {
reader = getLocalBlockReader(conf, src, block.getBlock(),
accessToken, chosenNode, DFSClient.this.socketTimeout, start);
} catch (AccessControlException ex) {
LOG.warn("Short circuit access failed ", ex);
//Disable short circuit reads
shortCircuitLocalReads = false;
continue;
}
} else {
// go to the datanode
dn = socketFactory.createSocket(); // socke datanode
LOG.debug("Connecting to " + targetAddr);
NetUtils.connect(dn, targetAddr, getRandomLocalInterfaceAddr(),
socketTimeout);
dn.setSoTimeout(socketTimeout);
reader = RemoteBlockReader.newBlockReader(dn, src,
block.getBlock().getBlockId(), accessToken,
block.getBlock().getGenerationStamp(), start, len, buffersize,
verifyChecksum, clientName);
}
int nread = reader.readAll(buf, offset, len); // BlockReader 负责读取数据
return;
}
… …
finally {
IOUtils.closeStream(reader);
IOUtils.closeSocket(dn);
}
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode); // dead datanode
}
}
NameNode 实例化启动时便监听客户端请求
DataNode(final Configuration conf,
final AbstractList<File> dataDirs, SecureResources resources) throws IOException {
super(conf);
SecurityUtil.login(conf, DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY,
DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
//
datanodeObject = this;
durableSync = conf.getBoolean("dfs.durable.sync", true);
this.userWithLocalPathAccess = conf
.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
try {
startDataNode(conf, dataDirs, resources);// startDataNode
} catch (IOException ie) {
shutdown();
throw ie;
}
}
// startDataNode
void startDataNode(Configuration conf,
AbstractList<File> dataDirs, SecureResources resources
) throws IOException {
… …
// find free port or use privileged port provide
ServerSocket ss;
if(secureResources == null) {
ss = (socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(ss, socAddr, 0);
} else {
ss = resources.getStreamingSocket();
}
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
// adjust machine name with the actual port
tmpPort = ss.getLocalPort();
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
// tmpPort);
this.dnRegistration.setName(machineName + ":" + tmpPort);
LOG.info("Opened data transfer server at " + tmpPort);
//
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // DataXceiverServer为守护线程监控客户端连接
}
// class DataXceiverServer.run()
public void run() {
while (datanode.shouldRun) {
try {
Socket s = ss.accept();
s.setTcpNoDelay(true);
new Daemon(datanode.threadGroup,
new DataXceiver(s, datanode, this)).start();
} catch (SocketTimeoutException ignored) {
}
}
}
// class DataXceiver.run()
// Read/write data from/to the DataXceiveServer.
// 操作类型:OP_READ_BLOCK,OP_WRITE_BLOCK,OP_REPLACE_BLOCK,
// OP_COPY_BLOCK,OP_BLOCK_CHECKSUM
public void run() {
DataInputStream in=null;
try {
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
SMALL_BUFFER_SIZE));
… …
switch ( op ) {
case DataTransferProtocol.OP_READ_BLOCK:
readBlock( in );// 读数据
datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime);
if (local)
datanode.myMetrics.incrReadsFromLocalClient();
else
datanode.myMetrics.incrReadsFromRemoteClient();
break;
… …
default:
throw new IOException("Unknown opcode " + op + " in data stream");
}
}
// class DataXceiver.readBlock()
// Read a block from the disk.
private void readBlock(DataInputStream in) throws IOException {
//
// Read in the header,读指令
//
long blockId = in.readLong();
Block block = new Block( blockId, 0 , in.readLong());
//
long startOffset = in.readLong();
long length = in.readLong();
String clientName = Text.readString(in);
Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
accessToken.readFields(in);
// 向客户端写数据
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
… …
// send the block,读取本地的block的数据,并发送给客户端
BlockSender blockSender = null;
final String clientTraceFmt =
clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
"%d", "HDFS_READ", clientName, "%d",
datanode.dnRegistration.getStorageID(), block, "%d")
: datanode.dnRegistration + " Served " + block + " to " +
s.getInetAddress();
try {
try {
blockSender = new BlockSender(block, startOffset, length,
true, true, false, datanode, clientTraceFmt);
} catch(IOException e) {
out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
throw e;
}
out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
long read = blockSender.sendBlock(out, baseStream, null); // send data,发送数据
… …
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(blockSender);
}
}