博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HDFS追本溯源:HDFS操作的逻辑流程与源码解析
阅读量:7288 次
发布时间:2019-06-30

本文共 13978 字,大约阅读时间需要 46 分钟。

转自:

 

本文主要介绍5个典型的HDFS流程,这些流程充分体现了HDFS实体间IPC接口和stream接口之间的配合。

1. Client和NN

Client到NN有大量的元数据操作,比如修改文件名,在给定目录下创建一个子目录,这些操作一般只涉及Client和NN的交互,通过IPC调用ClientProtocol进行。创建子目录的逻辑流程如下图:

\

从图中可见,创建子目录这种操作并没有涉及DN。因为元数据会被NN持久化到edits中,因此在持久化结束之后,这个调用就会被成功返回。复习一下:NN维护了HDFS的文件系统目录树和文件与数据块的对应关系,和数据块与DN的对应关系。因此,创建目录仅仅是在NN上也就很容易理解了。

一些更为复杂的操作,如使用

 

1.
DistributedFileSystem.setReplication()

来增加文件的副本数,再如通过

 

 

1.
DistributedFileSystem.delete()

来删除HDFS上的文件,都需要DN配合执行一些动作。其中DistributedFileSystem源码在hadoop-hdfs-projecthadoop-hdfssrcmainjavaorgapachehadoophdfsDistributedFileSystem.java

 

 

01.
/****************************************************************
02.
* Implementation of the abstract FileSystem for the DFS system.
03.
* This object is the way end-user code interacts with a Hadoop
04.
* DistributedFileSystem.
05.
*
06.
*****************************************************************/
07.
@InterfaceAudience.LimitedPrivate({ "MapReduce""HBase" })
08.
@InterfaceStability.Unstable
09.
public class DistributedFileSystem extends FileSystem {
10.
private Path workingDir;
11.
private URI uri;
12.
 
13.
DFSClient dfs;
14.
private boolean verifyChecksum = true;
15.
 
16.
static{
17.
HdfsConfiguration.init();
18.
}

以客户端删除HDFS文件为例,操作在NN上执行完成后,DN存放的文件内容的数据块也必须删除。但是,NN在执行delete()方法时,它只标记需要删除的数据块(当然,delete的操作日志也会被持久化),而不会主动联系DN去立即删除这些数据。当保存着这些数据块的DN在向NN发送心跳时,NN会通过心跳应答携带DatanodeCommand命令来通知DN删除数据。也就是说,被删除的数据块在Client接到成功的响应后,会在一段时间后才能真正删除,NN和DN永远只维护简单的主从关系。NN永远不会主动发起向DN的调用。NN只能通过DN心跳应答中携带DatanodeCommand的指令对DN进行管理。

 

\

2. Client读文件

使用Java API读取文件的源码如下:

 

01.
FileSystem hdfs = FileSystem.get(new Configuration());
02.
Path path = new Path("/testfile");// reading
03.
FSDataInputStream dis = hdfs.open(path);
04.
byte[] writeBuf = new byte[1024];
05.
int len = dis.read(writeBuf);
06.
System.out.println(new String(writeBuf, 0, len, "UTF-8"));
07.
dis.close();
08.
 
09.
hdfs.close();

下图显示了HDFS在读取文件时,Client,NN和DN发生的事件和这些事件的顺序:

 

\

步骤1的源码:

 

01.
public FSDataInputStream open(Path f, final int bufferSize)
02.
throws IOException {
03.
statistics.incrementReadOps(1);
04.
Path absF = fixRelativePart(f);
05.
return new FileSystemLinkResolver<FSDataInputStream>() {
06.
@Override
07.
public FSDataInputStream doCall(final Path p)
08.
throws IOException, UnresolvedLinkException {
09.
return new HdfsDataInputStream(
10.
dfs.open(getPathName(p), bufferSize, verifyChecksum));
11.
}
12.
@Override
13.
public FSDataInputStream next(final FileSystem fs, final Path p)
14.
throws IOException {
15.
return fs.open(p, bufferSize);
16.
}
17.
}.resolve(this, absF);
18.
}

可见open返回的是HdfsDataInputStream。dfs为hadoop-hdfs-projecthadoop-hdfssrcmainjavaorgapachehadoophdfsDFSClient.java。HdfsDataInputStream继承自FSDataInputStream。构造是并没有额外的处理。

 

 

1.
public class HdfsDataInputStream extends FSDataInputStream {
2.
public HdfsDataInputStream(DFSInputStream in) throws IOException {
3.
super(in);
4.
}

FSDataInputStream继承自DFSInputStream。

01.
DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum
02.
throws IOException, UnresolvedLinkException {
03.
this.dfsClient = dfsClient;
04.
this.verifyChecksum = verifyChecksum;
05.
this.buffersize = buffersize;
06.
this.src = src;
07.
this.cachingStrategy =
08.
dfsClient.getDefaultReadCachingStrategy();
09.
openInfo();
10.
}
11.
 
12.
/**
13.
* Grab the open-file info from namenode
14.
*/
15.
synchronized void openInfo() throws IOException, UnresolvedLinkException {
16.
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
17.
int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
18.
while (retriesForLastBlockLength > 0) {
19.
// Getting last block length as -1 is a special case. When cluster
20.
// restarts, DNs may not report immediately. At this time partial block
21.
// locations will not be available with NN for getting the length. Lets
22.
// retry for 3 times to get the length.
23.
if (lastBlockBeingWrittenLength == -1) {
24.
DFSClient.LOG.warn("Last block locations not available. "
25.
"Datanodes might not have reported blocks completely."
26.
" Will retry for " + retriesForLastBlockLength + " times");
27.
waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
28.
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
29.
else {
30.
break;
31.
}
32.
retriesForLastBlockLength--;
33.
}
34.
if (retriesForLastBlockLength == 0) {
35.
throw new IOException("Could not obtain the last block locations.");
36.
}
37.
}

 

fetchLocatedBlocksAndGetLastBlockLength通过调用getLocatedBlocks实现了示意图中的步骤二:

 

01.
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
02.
final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
03.
if (DFSClient.LOG.isDebugEnabled()) {
04.
DFSClient.LOG.debug("newInfo = " + newInfo);
05.
}
06.
if (newInfo == null) {
07.
throw new IOException("Cannot open filename " + src);
08.
}
09.
 
10.
if (locatedBlocks != null) {
11.
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
12.
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
13.
while (oldIter.hasNext() && newIter.hasNext()) {
14.
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
15.
throw new IOException("Blocklist for " + src + " has changed!");
16.
}
17.
}
18.
}
19.
locatedBlocks = newInfo;
20.
long lastBlockBeingWrittenLength = 0;
21.
if (!locatedBlocks.isLastBlockComplete()) {
22.
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
23.
if (last != null) {
24.
if (last.getLocations().length == 0) {
25.
if (last.getBlockSize() == 0) {
26.
// if the length is zero, then no data has been written to
27.
// datanode. So no need to wait for the locations.
28.
return 0;
29.
}
30.
return -1;
31.
}
32.
final long len = readBlockLength(last);
33.
last.getBlock().setNumBytes(len);
34.
lastBlockBeingWrittenLength = len;
35.
}
36.
}
37.
 
38.
currentNode = null;
39.
return lastBlockBeingWrittenLength;
40.
}

你可能会说步骤二调用的是getBlockLocations。看以下的代码:

 

 

01.
@VisibleForTesting
02.
public LocatedBlocks getLocatedBlocks(String src, long start, long length)
03.
throws IOException {
04.
return callGetBlockLocations(namenode, src, start, length);
05.
}
06.
 
07.
/**
08.
* @see ClientProtocol#getBlockLocations(String, long, long)
09.
*/
10.
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
11.
String src, long start, long length)
12.
throws IOException {
13.
try {
14.
return namenode.getBlockLocations(src, start, length);
15.
catch(RemoteException re) {
16.
throw re.unwrapRemoteException(AccessControlException.class,
17.
FileNotFoundException.class,
18.
UnresolvedPathException.class);
19.
}
20.
}

 

然后就可以开始读文件的数据了。通过NameNode.getBlockLocations的远程调用接口获得了文件开始部分的数据块的保存位置。对于文件中的每个块,NN返回保存着该副本的DN的地址。注意,这些DN根据它们与Client的距离进行了简单的排序(利用了网络的拓扑信息)。

Client调用HdfsDataInputStream的read方法读取文件数据时,DFSInputStream对象会通过和DN间的读数据stream接口,和最近的DN建立连接。Client反复调用read方法,数据会通过DN和Client的连接上的数据包返回Client。当到达块的末端时,DFSInputStream会关闭和DN的连接。并通过getBlockLocations()远程方法获得保存着下一个数据块的DN信息,严格来说,在对象没有缓存该数据块的位置时,才会使用这个远程方法。这就是上图中的步骤五。然后重复上述过程。

另外,由于NameNode.getBlockLocations()不会一次返回文件的所有的数据块信息,DFSInputStream可能需要多次调用该远程方法,检索下一组数据块的位置信息。对于使用者来说,它读取的是一个连续的数据流,上面所讲的联系不同的DN,多次定位数据块的过程,都是透明的。当使用者完成数据读取任务后,通过FSDataInputStream.close()关系数据流。即图中的步骤六。

如果DN发生了错误,如节点停机或者网络出现故障,那么Client会尝试连接下一个Block的位置。同时它会记住出现故障的那个DN,不会再进行徒劳的尝试。在数据的应答中,不单包含了数据,还包含了数据的校验和,Client会检查数据的一致性,如果发现了校验错误,它会将这个信息报告给NN;同时,尝试从别的DN读取另外一个副本的内容。由Client在读取数据时进行数据完整性检查,可以降低DN的负载,均衡各个节点的计算能力。

这样的设计其实可以给我们一个很好的设计大型分布式系统的例子。通过一些有效的设计,将计算和网络等分散到各个节点上,这样可以最大程度的保证scalability。

3. Client写文件

即使不考虑出现错误的情况,写文件也是HDFS最复杂的流程。本节通过创建一个新文件并向文件写入数据,结束后关闭这个文件为例,分析文件写入时各个节点之间的配合。

\

 

Client调用DistributedFileSystem.create()创建文件(上图中的步骤一),这时DistributedFileSystem创建了DFSOutputStream,并由RPC,让NN执行同名的方法,在文件系统的命名空间创建一个新文件。NN创建新文件时,需要执行检查,包括NN是否处理正常工作状态,被创建的文件不存在,Client是否有在父目录中创建文件的权限等。通过检查后,NN会构建一个新文件,记录创建操作到编辑日志edits中。RPC结束后,DistributedFileSystem将该DFSOutputStream对象包装到FSDataOutputStream实例中,返回Client。

 

01.
@Override
02.
public FSDataOutputStream create(final Path f, final FsPermission permission,
03.
final EnumSet<CreateFlag> cflags, final int bufferSize,
04.
final short replication, final long blockSize, final Progressable progress,
05.
final ChecksumOpt checksumOpt) throws IOException {
06.
statistics.incrementWriteOps(1);
07.
Path absF = fixRelativePart(f);
08.
return new FileSystemLinkResolver<FSDataOutputStream>() {
09.
@Override
10.
public FSDataOutputStream doCall(final Path p)
11.
throws IOException, UnresolvedLinkException {
12.
return new HdfsDataOutputStream(dfs.create(getPathName(p), permission,
13.
cflags, replication, blockSize, progress, bufferSize, checksumOpt),
14.
statistics);
15.
}
16.
@Override
17.
public FSDataOutputStream next(final FileSystem fs, final Path p)
18.
throws IOException {
19.
return fs.create(p, permission, cflags, bufferSize,
20.
replication, blockSize, progress, checksumOpt);
21.
}
22.
}.resolve(this, absF);
23.
}

 

关键的调用点有DFSClient.create:

 

01.
/**
02.
* Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
03.
* Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
04.
* a hint to where the namenode should place the file blocks.
05.
* The favored nodes hint is not persisted in HDFS. Hence it may be honored
06.
* at the creation time only. HDFS could move the blocks during balancing or
07.
* replication, to move the blocks from favored nodes. A value of null means
08.
* no favored nodes for this create
09.
*/
10.
public DFSOutputStream create(String src,
11.
FsPermission permission,
12.
EnumSet<CreateFlag> flag,
13.
boolean createParent,
14.
short replication,
15.
long blockSize,
16.
Progressable progress,
17.
int buffersize,
18.
ChecksumOpt checksumOpt,
19.
InetSocketAddress[] favoredNodes) throws IOException {
20.
checkOpen();
21.
if (permission == null) {
22.
permission = FsPermission.getFileDefault();
23.
}
24.
FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
25.
if(LOG.isDebugEnabled()) {
26.
LOG.debug(src + ": masked=" + masked);
27.
}
28.
String[] favoredNodeStrs = null;
29.
if (favoredNodes != null) {
30.
favoredNodeStrs = new String[favoredNodes.length];
31.
for (int i = 0; i < favoredNodes.length; i++) {
32.
favoredNodeStrs[i] =
33.
favoredNodes[i].getHostName() + ":"
34.
+ favoredNodes[i].getPort();
35.
}
36.
}
37.
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
38.
src, masked, flag, createParent, replication, blockSize, progress,
39.
buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs);
40.
beginFileLease(src, result);
41.
return result;
42.
}

 

在步骤三Client写入数据时,由于create()调用创建了一个空文件,所以,DFSOutputStream实例首先需要想NN申请数据块,addBlock()方法成功执行后,返回一个LocatedBlock对象。该对象包含了新数据块的数据块标识和版本好,同时,它的成员变量locs提供了数据流管道的信息,通过上述信息,DFSOutputStream就可以和DN连接,通过些数据接口建立数据流管道。Client写入FSDataOutputStream流中的数据,被分成一个一个的文件包,放入DFSOutputStream对象的内部队列。该队列中的文件包最后打包成数据包,发往数据流管道,流经管道上的各个DN,并持久化,确认包逆流而上,从数据流管道依次发往Client,当Client收到应答时,它将对应的包从内部队列删除。

 

01.
public class LocatedBlock {
02.
 
03.
private ExtendedBlock b;
04.
private long offset;  // offset of the first byte of the block in the file
05.
private DatanodeInfo[] locs;
06.
/** Storage ID for each replica */
07.
private String[] storageIDs;
08.
// Storage type for each replica, if reported.
09.
private StorageType[] storageTypes;
10.
// corrupt flag is true if all of the replicas of a block are corrupt.
11.
// else false. If block has few corrupt replicas, they are filtered and
12.
// their locations are not part of this object
13.
private boolean corrupt;
14.
private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
15.
/**
16.
* List of cached datanode locations
17.
*/
18.
private DatanodeInfo[] cachedLocs;
19.
 
20.
// Used when there are no locations
21.
private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0];

 

DFSOutputStream在写完一个数据块后,数据流管道上的节点,会通过和NN的DatanodeProtocol远程接口的blockReceived()方法,向NN提交数据块。如果数据队列还有等到输出的数据,DFSOutputStream会再次调用addBlock(),为文件添加新的数据块。

Client完成数据的写入后,会调用close()方法关闭流,关闭流意味着Client不会再向流中写入数据。所以,当DFSOutputStream数据队列的文件包都收到应答后,就可以使用ClientProtocol.complete()方法通知NN关闭文件,完成一次正常的文件写入。

如果在文件写入期间DN发生故障,则会执行下面的操作(注意,这些操作对于写入数据的Client是透明的):

数据流管道会被关闭,已经发送到管道但是还没有收到确认的文件包,会被重新添加到DFSOutputStream的输出队列,这样就保证了无路数据流管道的哪个DN发生故障,都不会丢失数据。当前正常工作的DN的数据块会被赋予一个新的版本号,并通知NN。这样,失败的DN在从故障恢复过来以后,上面只有部分数据的Block会因为版本号和NN保存的版本号不匹配而被删除。在数据流管道中删除错误的DN并建立新的管道,继续写数据到正常工作的DN。文件关闭后,NN会发现该Block的副本数没有达到要求,会选择一个新的DN并复制Block,创建新的副本。DN的故障只会影响一个Block的写操作,后续Block的写入不会受到影响。

 

4. DataNode的启动与心跳机制

本节讨论DN的启动及其与NN之间的交互。包括DN从启动到进入正常工作状态的注册,Block上报,以及正常工作过程中的心跳等与NN相关的远程调用。这部分虽然只涉及DatanodeProtocol的接口,但是有助于进一步理解DN与NN的关系。

正常启动的DN或者为升级而启动的DN,都会向NN发送远程调用versionRequest(),进行必要的版本检查。这里的版本检查,只涉及构建版本号,保证它们间的HDFS版本是一致的。

\

在版本检查结束后,DN会接着通过远程调用register(),向NN注册。DatanodeProtocol.register()的主要工作也是检查,确认该DN是NN所管理集群的成员。也就是说,用户不能把某一个集群中的某个node直接注册到另外一个集群中去,保证了整个系统的数据一致性。

注册成功后,DN会将它所管理的所有Block的信息,通过blockRequest()方法上报到NN(步骤三),以帮助NN建立HDFS文件数据块到DN的映射关系。在此后,DN才正式的提供服务。

 由于NN和DN是简单的主从关系,DN需要每隔一段时间发送心跳到NN(步骤四和步骤五)。如果NN长时间收不到DN的心跳,它会认为DN已经失效。如果NN需要一些DN需要配合的动作,则会通过sendHeartbeat()的方法返回。该返回值是一个DatanodeCommand数组,它是NN的指令。

应该说,DN和NN的交互逻辑非常简单。大部分是通过DN到NN的心跳来完成的。但是考虑到一定规模的HDFS集群,一个NN会管理上千个DN,这样的设计也就非常自然了。

 

5. SNN节点的元数据合并

当Client对HDFS的文件目录进行修改时,NN都会在edits中留下记录,以保证在系统出现问题时,通过日志可以进行恢复。

fsimage是某一个时刻的检查点(checkpoint)。由于fsimage很大,因此不会在每次的元数据修改都写入到它里边,而只是存在到edits中。在系统启动时,会首先状态最近时刻的fsimage,然后在通过edits,恢复系统的最新状态。

当时如果edits太大,那么节点启动时将用很长的时间来执行日志的每一个操作,使得系统恢复最近的状态。在启动恢复的这段时间,服务是不可用的。为了避免edits多大,增加集群的可用时间,HDFS引入了第二名字节点,即SNN(Secondary NameNode)。SNN不是NN的standby,它只是辅助NN完成合并(merge)fsimage和edits。过程涉及NamenodeProtocol和NN与SNN之间的流式接口。

\

该过程由SNN发起,首先通过远程方法NamenodeProtocol.getEditLogSize()获得NN上edits的大小。如果日志很小,SNN就会在指定的时间后重新检查。否则,继续通过远程接口rollEditLog(),启动一次检查点的过程。这时,NN需要创建一个新的编辑日志edits.new,后续对元数据的改动,都会记录到这个新日志中。而原有的fsimage和edits,会由SNN通过HTTP下载到本地(步骤三和步骤四),在内存中进行merge。合并的结果就是fsimage.ckpt。然后SNN通过HTTP接口通知NN fsimage已经准备好。NN会通过HTTP get获取merge好的fsimage。在NN下载完成后,SNN会通过NamenodeProtocol.rollFsImage(),完成这次检查点。NN在处理这个远程方法时,会用fsimage.ckpt 覆盖原来的fsimage,并且将新的edits.new改名为edit。

转载地址:http://vbcjm.baihongyu.com/

你可能感兴趣的文章
七牛云王珂 直播分享 | 如何快速搭建智能化的统一日志管理系统
查看>>
BusyBox
查看>>
configure make make install in linux
查看>>
剑指offer:调整数组顺序使奇数位于偶数前面
查看>>
一步一步学Silverlight 2系列(3):界面布局
查看>>
本人部分博客导航(ing...)
查看>>
redis环境搭建笔记
查看>>
作业一:计算机是如何工作的进行
查看>>
使用Spring JdbcTemplate实现数据库操作
查看>>
上传文件前台后台必备的条件
查看>>
内存管理原则
查看>>
C++运算符重载详解
查看>>
Analyzing Storage Performance using the Windows Performance Analysis ToolKit (WPT)
查看>>
Android测试之Monkey
查看>>
Android实现二维码扫描登录网页
查看>>
关于一些Python的一些基础语法训练
查看>>
性能提升永远在路上 全闪存如何求突破?
查看>>
苹果计划在美组装计算机:面向数据中心
查看>>
无线医疗保卫战: 精准战略瞄准网络安全
查看>>
南京北斗与物联网研究院成立 推动融合发展应对智慧需求
查看>>