当前位置: 首页 > >

hadooo 源代码分析

发布时间:

HDFS
HDFS client 分析

客户端的功能


今天开始分析HDFS源代码,首先从HDFS的client端分析。对于分布式文件系统,Client端的功能,就是接收用户的请求,通过网络,与 NameNode 和 DataNode交互。


首先确定的是,client端是一个hdfs提供的lib库,用户的应用程序需要包含该库,调用该库提供的函数来访问NameNode和DataNode


HDFS提供了一个shell程序,通过shell程序,可以通过一下命令比较简洁的访问HDFS


HDFS的文件系统抽象层

抽象类FileSystem提供了一个文件系统的抽象层,它包括了分布式文件系统和local文件系统的一个统一的抽象接口。它囊括了所有的文件系统的操作接口,包括元数据和数据接口。对于HDFS,实现该接口的类为DistributedFileSystem.


DirstrubtedFileSystem类是DFSClient 的wrap类。其主要的功能由DFSClient完成。


客户端shell程序的启动


Client 的的shell程序的main函数在类org.apache.hadoop.fs.FsShell类中。我们顺着main函数执行的流程,来逐步分析整个client端代码。


我们先看一些interface,我们知道,interface没有具体的实现,只是规定一些操作的规范给其实现的类,这样就可以实现要做什么(interface)和实际实现者的功能上的分离。


Interface? Configurable? 实现了两个操作,就是


public interface Configurable {


?? voidsetConf(Configuration conf);


??? ConfigurationgetConf();


}


然后是接口 Tool,实现了执行命令的接口。


public interface Tool extends Configurable {


? int run(String []args) throws Exception;


}


?


我们看到,具体实现以上两个接口的,就是FsShell类。


public class FsShell extends Configured implements Tool {


}


下面正式看一下类org.apache.hadoop.fs.FsShell的main函数来的运行过程。


其主要的过程如下: 分析命令行参数,调用FsShell 的run函数来处理相关的命令。


我们再看一下FsShell里的run函数,其就是匹配各种命令,调用FsShell里相关的处理函数。其对于的处理命令被最终由FileSystem处理。在FsShell类的init函数里,通过从配置文件获取具体的文件系统类(FileSystem)的实现,当client端起来后,其处理相关的命令的功能交给DistributedFileSystem类来实现。


Client 元数据的操作
Client的数据操作

我们重点关注一下读写出错时的错误处理,这是分布式系统的关键。


我们看到数据操作,无论数据操作,无论是写,还是读,在客户端都没有缓存,都是在写或者读的系统调用返回后,对于写,数据都flush都DataNode上,对于read,客户端的系统里是没有数据缓存的。


我们先一下block,packet,chunk之间的区别。


Packet类,一个Packet就是数据发送的基本单位,一个Packet由多个chunk组成,一个数据块就是数据校验的单位,默认为512字节,也就是说一个512字节的数据块加一个checksum,checksum的长度一般为4字节。一个Block有多个packet组成。一个block为64M,一个packet默认为64k,


?


Client数据的读

我们先看client的读,其主要有DFSInputStream类来完成。


首先看一下读操作的函数,其主要有三种方式:


public synchronized int read(byte buf[], int off, int len) ,这个read中,给出了读的数据存放的buf,buf内的开始位置off,长度len,并没有给出文件的position的位置,默认的使用内部的pos变量。


public synchronized int read(long position, byte[] buffer,int offset, int length),这是给定了文件的偏移量position的读,它不改变内部的pos值


public synchronized int read() ,默认从当前位置读一个字节,其值保持在int的最后一个字节中。


?


下面我们主要介绍一下第一种方式的read,同时重点介绍一下其read的错误处理流程。


?


首先看一下DFSInputStream的内部的全局的变量。


private String src;?? //文件的路径

private boolean closed = false;? //文件是否关闭

private boolean verifyChecksum;?

private long prefetchSize = 10 * defaultBlockSize;

private LocatedBlocks locatedBlocks = null;

private long pos = 0;


?


这几个变量和文件级的全局变量,src为文件的路径,closed表面这个文件流是否关闭,verifyChecksum表面文件的读是否需要checksum的验证,prefetchSize是预取的元数据的块数目,默认设置的是10个block,这个怎么理解呢?我们是当读到一个block的时候,就到namenode上去取该block的元数据信息,也就是LocatedBlock信息,为了加速,我一次去取当前block连续的10个block相关的信息。LocatedBlocks保持当前文件块的信息。pos为文件当前读的位置。


private Block currentBlock = null;


? ?privateDatanodeInfo currentNode = null;


? ? privateSocket s = null;


??? private BlockReader blockReader = null;


??? private long blockEnd = -1;


?


这几个是和当前的block相关的信,currentBlock为当前block,currentNode为当前块所在的DataNode的相关信息,包括副本的所在的DataNode相关的信息。blockReader为和块相关的BlockReader类,主要处理block内的读。BlockEnd该block最后位置在文件中的偏移。


BlockReader类负责一个block的读取操作。


我们看一下其构造函数:


public static BlockReader newBlockReader( Socket sock,String file,


?????????????????????????????????????? longblockId,


?????????????????????????????????????? longgenStamp,


?????????????????????????????? ????????long startOffset,


long len,


????????????????????????????????? ?????intbufferSize,


?booleanverifyChecksum,


?????????????????????????????????????? StringclientName)


?????????????????????????????????????? throwsIOException ;


其参数为sock为该block的Socket,file为文件的路径,blockId为块号,genStamp为该块的时间戳。startOffset为要读的数据的块内偏移量,len为长度,bufferSize为块大小,verifyChecksum为是否要checksum,ClientName客户端的名字。


?


在该newBlockReader的构造函数里,其主要完成:


? 读请求的发送和接受,完成同步的请求。


文件的打开


Open函数


public FSDataInputStream open(Path f, int bufferSize) throws IOException {


??? returnnew DFSClient.DFSDataInputStream(


????????? dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));


? }


其构造了一个DFSDataInputStream类,通过dfs.open函数返回一个FSDataInputStream类,其特别的是,在其构造函数里调用了openInfo函数,去NameNode上预读了10个块的文件block信息。


我们看一下数据读的具体过程:DFSClient 的read函数:


?


public synchronized int read(byte buf[], int off, int len)throws IOException


其流程如下:


1)首先确保本输入流没有关闭,并且pos < getFileLength(),也就是文件的pos小于文件的长度。否则直接返回-1


2)blockEnd为当前block的末尾在文件中的偏移量。其初始化为-1,如何pos > blockEnd, 则需要调用blockSeekTo定位到下一个数据块。其具体的作用我们下面回详细介绍。


3)调用readBuffer去读数据。


我们看一下blokcSeekTo函数


? privatesynchronized DatanodeInfo blockSeekTo(long target) throws IOException


1)首先检查blockReader和 s 是否为空,如果不为空,则关闭,清空。


2)调用getBlockat获取当前偏移对已的block信息。


3)调用chooseDataNode在block的副本中选择一个Datanode信息。


4)构建一个blockReader类,其构成函数里,已经发出了读请求。


?


?


?


1)首先调用blockSeekTo函数来获取该block所有的Datanode


Client数据的写

当create一个文件后,其返回DFSDataStream类,我们通过这个类来完成读写。


?


DFSDataStream类并没实现自己的write接口,而是继承了FSOutSummer接口的write操作。FSOutSummer主要完成计算chunk的checksum操作。


?


我们看一下FSOutSummer的write操作接口。


publicsynchronizedvoid write(byte b[], int off, int len)


? throws IOException;


?


其首先对参数检查后,循环调用write1操作接口


?


private int write1(byte b[],int off, int len) throws IOException


在这个函数里以每次写一个chunk,计算一个chunk的checksum,调用writeChecksumChunk函数。


?


其首先查看一个len 是否够buf.length,也就是一个chunk的长度,如果够,就直接调用writeChecksumChunk函数,如果不够一个chunk,就保存在buf中,当buf满后,调用flushBuffer函数发送出去。如果最后没有满一个chunk,该如何处理?


?


我们看一下DFSOutputStream类的writeChunk函数


protectedsynchronizedvoid writeChunk(byte[] b, int offset, int len, byte[] checksum) throws IOException;


?


writeChunk函数就比较简单,其主要就是把当前的chunk添加的currentPacke中,如果packet满,就是把该packet添加到dataQueue链表中。


接下了我们看DataStream类,其为一个独立的线程。其在创建文件完成之后,构造DFSOutputStream时启动。其完成把dataQueue中的packe取出并发送给相应的dataNode


?


下面介绍DataStreamer类和ResponserProcessor类,其分别为两个线程,一个处理发送请求,和一个处理响应的ack。


?


我们看到DataStreamer类里最重要的函数为run方法。其主要功能分析如下:


1)? 首先从dataQueue中获取一个packet


2)? 当blockStream为Null时,就要建立和该block所在的datanode建立socket的链接,其调用nextBlockOutputStream函数来实现,其产生的两个socket相关的stream保存在blockStream和blockReplyStream,同时启动ResponserProcessor


3)? 从dataQueue中把该packet删除,添加到ackQueue中


4)? 调用BlockStream发送该packet


5)? 如果是最好一个packet,写一个空packet为结束标志


6)? 如果是最后一个packet,一直等到ackQueue队列空时候,关闭response,关闭blockStream流等。


?


我们看一下nextBlockOutputStream函数


private DatanodeInfo[]nextBlockOutputStream(String client) throws IOException;


?


1) 首先调用locateFollowingBlock函数获取最新的block的信息,其直接调用nameNode的addBlock函数来完成。该函数里处理了各种异常情况,其主要的策略就是retry多次。


2) 调用createBlockOutputStream函数来建立该block上Datanode的链接,并发送了一个包测试该pipeline是否通。


3) 如果链接建立不成功,就调用namenode.abandonBlock函数来放弃分配的block,重新分配block。


?


在函数createBlockOutputStream里,建立和各个Datanode的pipeline。并把blockheader发送出去。


?


接下来我们看一下ResponseProcessor类,其处理ack应答。它也是一个独立的线程处理。其主要的处理流程就在run函数中:


首先从BlockReplyStream中读取ack应答消息,其格式比较简单:


|seqno(long)|short|short|short|


开始一个seqno开始,后面分别为该packet的副本的应答消息,成功是,其都为DataTransferProtocol.OP_STATUS_SUCCESS


?


我们看一下具体流程:


1)?首先从ack.ReadFileds来读取该ack字段


2)?根据seqno判断,如果是hearbeat应答,我们忽略,如何使-2,则该packet出错,则我们也忽略。


3)?否则,从ack队列中取出一个packet,比较seqno是否正确。(这里的疑问是,ack都是顺序回来的吗?)


4)?检查所有ack返回的所有datanode的状态,如果都是DataTransferProtocol.OP_STATUS_SUCCESS,则成功,否则抛出异常。


5)?从ack队列中删除该packet,并调用ackQueue.notifyAll()通知。


?


client错误处理

对于client,我们主要考察一下read和write的错误处理的情况。


?


Write操作可以说分成三步骤:


1)? write操作中,把数据打包为packet,添加到dataQueue中,这都是本地操作,产生的异常都是IOException,其自己抛出的异常。


2)? ?


DataNode

我们先看一下


DataNode介绍

?


DataNode作为文件系统的数据服务器,对外提供数据服务的功能。其主要的功能如下:


1)? 提供客户端数据的读写功能


2)? 提供其他datanode的数据拷贝功能


3)? 给NameNode上报其相关的数据。


?


DataNode的本地磁盘结构

?


本地存储目录结构

Datanode把数据保存在data server的本地磁盘结构中。系统可以有多个存储目录,其存储目录可以通过配置文件conf/hdfs-site.xml文件的配置项dfs.data.dir 保存了其放置数据块block的目录,用“,”隔开,可以设置多项。每一项称为存储目录,在Datanode上,其对应的空间为一个Volume,用FSVolume来管理。所有的存储目录用FSVolumeSet来管理。


?


例如配置了两个目录 /dfs/data1 和 /dfs/data2


|??


?????????dfs.data.dir


?????????/dfs/data1,/dfs/data2



?


其目录结构如下图:



以dfs/data1为例:


其下有三个目录分别为current目录,其下存放数据block数据,例如blk_-2248160896468466026 就是一个数据块文件,blk_-2248160896468466026_1002.meta为其对应的数据块验证文件。detach目录存储snapshort信息,本版本中没有实现。tmp目录保存临时文件的地方。还有in_use.lock为本存储目录的锁,实现对本存储目录的加锁操作,其在后面我们会详细介绍。Storage为一个文件,保存了一下版本验证信息。


再次强调一下比较容易混淆的概念,一个存储目录是指 完整的本地存储结构:例如/data1, 其下面的空间对应一个FSVolume,多个存储目录由FSVolumeSet管理,一个存储目录其目录信息保存在StroageDir类中。


对于存储目录下的current目录,是真正存储数据块的地方,其对于的数据块的管理有FSDir类来管理。


知道上述基本的结构后,我们就可以轻松的看一下Datanode是如何管理本地数据块的。其基本的类为DataStorage 和 类FSDataset,类FSDataSet包括了三个我们提到过的子类FSDir, FSVolume, FSVolumeSet,已经类ActiveFile共同完成管理工作。


数据块的管理 FSdataSet

下面我们具体逐个类来介绍:


FSDir来完成current目录下block的管理工作。我们看一下其数据成员:


File dir;? //存储目录,这里就是current对于的文件结构


??? intnumBlocks = 0; //current目录下存放的数据块,不包括子目录下存放的数据块


??? FSDirchildren[];?? //子目录


int lastChildIdx =0;? //最后一次存储数据块的子目录


首先我们要知道,所有的数据块文件(block文件)并不是都直接存储在current目录下,当存储的block文件达到一定的数目的限制后,需要保存在子目录中,这就是children数据成员用来保存该子目录。为什么要限制一个目录存储的文件数目,我们知道,对于不同的local文件系统,例如ext2,ext3,其目录下的文件达到一定的数目后,性能会急剧下降。


下面我们分析一下其重要的函数:


private FileaddBlock(Block b, File src, boolean createOk,


??????????????????? ??????boolean resetIdx) throws IOException


这个在存储目录下添加一个block。这里需要注意是src参数一般传入的是和current在同一存储目录下的temp目录,我们在创建blokc时,首先在temp中创建,当其写数据的操作完成后,我们在mv到current目录下。其主要工作流程如下:


1)如果numBlocks 小于 maxBlocksPerDir,既没有达到目录的上限,就在current目录创建。


2)创建remame 有点问题?


3)如果numBlocks>= maxBlocksPerDir, 那么就需要创建子目录来保存,如果子目录满,则继续在子目录中创建新的子目录保存。


?


public voidgetBlockInfo(TreeSet blockSet)该函数递归的扫描整个存储目录,获取所有的block信息,block对应的信息包括blockId,genStamp,numBytes都可以从block文件名和文件获取。


?


下面我们看一下FSVolume类:其基本上没有太多的东西,添加了磁盘空间的统计信息。其他和数据块相关的操作直接调用FSDir的操作。FSVolumeSet的也没有太多的东西,这里就不详细讲述了。


ActiveFile类表示一个block正在创建的过程中。


FSDataSet

下面我们看一下FSDataSet类


?


FSVolumeSet volumes;


private HashMap ongoingCreates = new HashMap();


privateintmaxBlocksPerDir = 0;


private HashMap volumeMap = null;


static? Random random = newRandom();


其重要的数据结构保存了 volumeMap,保存了block->DatanodeBlockInfo的映射关系。DatanodeBlockInfo的结构为:FSVolume信息和File结构,也就是说,在内存中保存了block 和该block文件打开的句柄,相对于缓存了所有的block的文件打开句柄?


有了上述信息后,我们看到,FSDataset就可以完成了block->block file的任何的关联操作。


Block getStoredBlock(longblkid) 通过blkid获取block,


已经其他从block可以获取block 文件的输入或者输出流,已经block 的metafile的输入和输出流。


这个函数比较重要


public BlockWriteStreamswriteToBlock(Block b, boolean isRecovery) throws IOException ;


当一个要写数据块的时候,在/tmp ,目录下创建一个临时的块文件。并返回读写流。


public synchronized voidfinalizeBlock(Block b) throws IOException;


当完成一个数据块时,调用本函数,其主要是完成数据块文件从temp文件mv到current目录中。


DataStorage 状态的管理?数据升级

类Storage中保存的数据为:


private NodeType storageType;??? // Type of the node using this storage


protected List storageDirs = new ArrayList();


一个是NoteType其为NAME_NODE,DATA_NODE两种,我们后面看到namenode中也会用到该类。其次就是保存StorageDirectory的一个列表。类StorageDirectory保存了数据存储的目录信息:根目录root,文件锁FileLock,存储类型的接口dirType。


Storage的主要的功能,就是管理这些StorageDirectory,这里不在详细讲述了。


?


DataStorage 主要完成了一些文件系统的升级,回滚,提交的操作。


软件的升级是软件生命周期的重要的组成部分。软件的升级,只要指软件从低版本向高版本升级的过程。这个过程可能伴随着内存数据结构的变换,磁盘数据结构的变换,版本信息的变化,这些都需要从老的版本中迁移到新的版本中,需要保证在数据比丢失的情况下,迁移正常完成。


对于HDFS这样的大型分布式系统,升级可能成功,也可能不成功,在不成功的情况下,需要rollback到就的版本中,如果升级后,在一定的时间内运行良好,我们就提交Finalize这次升级。


?


DataStorage主要完成这样的升级的功能。其doUpgrade, doRollBack,doFinallize来完成上述功能。


其主要流程在函数recoverTransitionRead


DataNode的 启动过程

我们主要看一下Namenode的main函数的启动过程:


1)?首先调用startupShutdownMessage函数,其基本没有做什么工作,在启动时打印一条日志信息,其通过添加addShutdownHook函数,在数据服务器退出时,打印相应的退出日志信息。


2)?调用createDataNode函数,其调用instatiateDataNode来初始化一个Datanode,之后调用runDatanodeDaemon来启动器Dameno线程。


3)?函数instantiateDataNode里的基本工作,就是产生一个conf对象来获取配置文件相关信息,同时调用parseArgument函数分析命令行参数args,从配置文件里获取dataDirs,也就是数据存储的目录。调用makeInstance函数。


4)?makeInstance 对数据存储目录做了check之后,调用Datanode的构成函数返回一个DataNode的对象。


5)?Datanode的构成函数直接调用startDataNode来初始化Datanode,这个函数比较复杂,我们来重点介绍。


6)?StartDataNode函数里,其首先获取相关的配置参数,依次为:machineName本DataNode的机器名,调用NameNode的静态函数,获取getAddress获取namenode的nameNodeaddr,获取建立socket的超时时间socketTimeout,获取socket写数据的超时时间socketWriteTimeout,获取该Datanode是否允许transfer数据的配置,获取写数据的packet的大小writePacketSize,获取本datanode的address.


7)?其次,该函数创建了一个DataStorage对象storage,以及DatanodeRegistration对象。最重要的是建立了Datanode的连接,并通过handshake函数调用了namenode.versionRequest()操作获取了namespaceInfo


8)?之后,调用storage的recoverTransitionRead完成DataStorage的初始化工作。初始化了FSDataset对象。


9)?最后,建立Datanode端的监听socket,启动线程组,设置相关的参数。启动serverlet服务。


当在NameNode的过程中,其东了DataXeciverServer类,其run函数的主要的工作就是监听socket,如果有新的socket,就启动Deamon一个线程,运行DataXceiver的run函数来处理该请求。


try {


??????? Socket s = ss.accept();


??????? s.setTcpNoDelay(true);


??????? new Daemon(datanode.threadGroup,


??????????? new DataXceiver(s, datanode, this)).start();


????? }catch (SocketTimeoutException ignored)


?


DataXceiver来的run函数里,从accpet到的输入流里,获取版本号和op操作码,其根据操作码,分别调用相应的处理函数。


其操作码和对应的操作如下:


操作码

相应的 操作

备注:

?

OP_READ_BLOCK

readBlock

读一个数据块

?

OP_WRITE_BLOCK

writeBlock

写一个数据块

?

OP_READ_METADATA

readMetadata

获取数据块的元数据

?

OP_COPY_BLOCK

copyBlock

拷贝数据块

?

OP_BLOCK_CHECKSUM

getBlockChecksum

获取block的checksum

?


?


接下来具体介绍各种操作。


数据块的读取
readBlock

我们看一下读数据块的入户函数:DataXceiver里的readBlock方法,其就一个类型为DataInputStream的in参数。


void readBlock(DataInputStream in)


?


|version(short)|? 2 bytes???????????? 版本号

|op (bytes)|??????? 1 bytes???????????? 操作码

|blockId (long)|??? 8 bytes???????????? block的唯一ID

|generationStamp (long)| 8 bytes??????? GeneratonStamp

|startOffset (long)|??? 8 bytes???????? 要读的block的开始偏移

|length? (long)??? |??? 8 bytes?? ??????要读的数据块的长度

|clientName(String)|?? int(len) + len?? 客户端的名字

……data……


??????????????????????????????? 数据块的block 头信息


我们看一readBlock函数的主要工作:


1)从输入流in中读取block的header信息,包括了要读取的Block的ID,GenerationStamp,开始偏移和读取的长度,最后是客户端的名字(只在写日志的时候用到了)其格式如上图所示。


2)根据socket信息,构建输出流out,用于发送数据。


3)根据block的header信息构建BlockSender


blockSender = new BlockSender(block, startOffset, length,


???????????true, true, false, datanode, clientTraceFmt);


4)调用sendBlock发送数据。


?long read = blockSender.sendBlock(out,baseStream, null);


?


BlockSender

接下来我们就主要分析BlockSender类,BlockSender主要完成一个block的读请求。核心的就是上述用到的构成函数和sendBlock函数。


我们先看一下一些成员变量


private Block block; 这是一个对应要读的块,其三个数据成员保存该块的信息,分别为blockId,块内的字节数numBytes,以及一个stamp值:generationStamp


private long offset; //要读的block的起始位置,主要这个是修正后的按照chunk对象的偏移


private long length?//要读的长度


private long endOffset; // 要读的block的结束位置


?


我们看一下blockSender的构造函数


BlockSender(Block block, long startOffset, long length,


????????????? booleancorruptChecksumOk, boolean chunkOffsetOK,


????????????? booleanverifyChecksum, DataNode datanode, String clientTraceFmt)


?


其主要的功能为:


1)如果需要验证checksum,则读取该block的checksum


2)计算offset 和 endOffset的值。 offset 和 startOffset的区别在于startOffset为原始的要读的数据在block的偏移量, 而offset是根据startOffset按照chunk的大小对齐后的偏移量。我们知道,发送数据是按照packet来发的,packet是按照chunk整数倍发送的。


3)获取block对于的本地文件的输入流blockIn


blockIn= datanode.data.getBlockInputStream(block, offset);


?


下面我们看SendBlock函数


?

字段

类型

长度

描述

Packet的头

packetLen

int

4 bytes

包的长度,包括(length,chucksum,data三个的总长度)

Offset

Long

8 bytes

该packet在block中的偏移量

Seqno

Long

8 bytes

Packet在block的序列号

tail

Byte

1 byte

是否是block中的最后一个包

Packet的数据

length

Int

4 bytes

数据的长度,注意着这一项表示下面data的长度

Checksum

一个512bytes一个checksum,16k就是

32个checksum,一个checksum 4字节,总共 占32*4 个字节

data

一个packet一般16k


?表1.Packet的结构


SendBlock函数完成的工作为:


1)向out发送block头信息:调用checksum.writeHeader写 type,和bytesPerChecksum两个字段。同时写offset。


2)计算packet的长度pktSize


int pktSize =DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;


其格式如表1.所示


如果transferToAllowed这个标志设置,则其其bufSize一次为MIN_BUFFER_WITH_TRANSFERTO 和BUFFER_SIZE 中最大的一个。这代表了一次从文件中读取的大小。


3)不断调用send_chunks函数按发送


4)最后要write一个0 表示该packet结束。


?


SendChunks 函数的流程:


1)? 计算要读数据的长度len


2)? 就是numChunks 和 packetlen


int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;


?? int packetLen = len +numChunks*checksumSize + 4;


3)? 写packe 的头


4)? 读取checksum


checksumIn.readFully(buf, checksumOff, checksumLen);


5)? 读取data


DataCheckSum

DataCheckSum 类完成计算checksum的功能


两种类型的checksum


private final int type; chusm的类型


? private final intsize;? checksum的大小


? private finalChecksum summer;??


? private final intbytesPerChecksum; 一个checksum对于的要校验的数据的长度


? private int inSum =0; 实际数据的长度


我们看到又两种构造的方法:CRC32 和 NULL两种。


?


写数据块

写数据要比读数据的不同之处在于,写要以流水线的方式写多个副本。客户端写一个block的过程如下:


writeBlock

写操作的入口函数为:


private voidwriteBlock(DataInputStream in) throws IOException;


其输入参数为:DataInputStream in


1)从流in中获取block的头相关的信息。


2)?


如上图所示:verson和操作码81已经在之前处理命令时consume掉了,结下了的就是block_id, generstionStamp, startOffset, length, clientName,hasSrcDataNode, srcDataNode, numTargets, targets等相关的信息的获取。


?


2)接下来我们看到一些stream


DataOutputStream mirrorOut = null;? // stream to next target


??? DataInputStream mirrorIn = null;??? // replyfrom next target


??? DataOutputStream replyOut = null;?? // streamto prev target


??? Socket mirrorSock = null;?????????? // socket to next target


??? BlockReceiver blockReceiver = null; // responsible for data handling


??? String mirrorNode = null;?????????? // the name:port of next target


???String firstBadLink = "";?????????? // first datanodethat failed in connection setup



其流的创建过程如上图所示。


3)看一下BlockReceiver的构造函数的功能:


??其处理初始化一写变量之外,最重要的功能是建立里两个流:out,checksumOut,其分别对应本地磁盘的输出流和本地磁盘的checksum文件,也就是是该block对应该Datanode磁盘上的block文件流和blcck的checksum流,用来写本地磁盘和本地checksum文件。


?


4) 建立replyOut流


5)如果targets.length> 0,说明后面还有datanode,则建立mirrorOut 和 mirrorIn流。同时通过mirrorOut流把blcok的 header发送出去。


6)调用blockReceiver.receiveBlock(mirrorOut,mirrorIn, replyOut,


???????????????????????????????? mirrorAddr,null, targets.length);


?函数,把该block的数据按照packet发送出去。后面我们回详细分析该函数。


7)如果是副本上的block,我们需要调用datanode.notifyNamenodeReceivedBlock向nameNode报告。并把它添加到datanode.blockScanner.addBlock(block);里面。


?


?


我们看一下receiveBlock函数的功能:


1) 首先把block的header写入磁盘中的checksum中。


2) 创建了一个Daemon程序,对于的PacketResponder的类,其发出去的packet的ack应答。


3) 通过ReceivePacket函数从流in,读出的block数据,其通过一次系统调用先把数据放在ByteBuffer中,后面不断的通过循环按照packet大小多次发送给下一个datanode,也就是通过流 mirrorOut.write来实现。同时该函数还把数据写入本地磁盘中并flush到磁盘中。同时把该packet加入ack应答队列。


4) 关闭相关的资源


5) Finalize block


?


?


数据信息的的上报和命令的处理

DataNode定期向NameNode报告其block的信息。


有两种信息:


1)? sendHeartbeat


定期向NameNode发送消息,附带Datanode的容量信息,表面自己还活着。同时返回NameNode给DataNode的一些命令。


2)? blockReport


定期向NameNode报告自己的所有的数据块信息


3)? blockReceived


向NameNode报告自己的最*写的块信息。


4)? errorReport


?


?


?


我们看一下写数据异常情况下的处理:


?


?


DataBlockScanner

?实现了DataNode之间磁盘空间的负载均衡。


NameNode

?


我们知道,namenode的功能主要有两部分:1)对文件系统的目录进行管理 2)对文件的数据块进行管理和映射。


?


我们先看一下基本的NameNode的概念


?


熟悉linux下文件系统的人都知道,inode 和 dentry 项是其基本的数据结构,在HDFS里面,并没有dentry项的概念。其dentry和inode一起保存在INode的结构中,所以HDFS不支持link的操作。


?


另外,HDFS的所有的元数据都保存在内存中,这就减少了许多磁盘的操作。HDFS增加了日志操作和定期的把NameNode在内存中的image保存在磁盘中,用来当NameNode意外崩溃时,可以从image和journal中恢复出NameNode 的内存数据。


?


我们把NameNode的实现主要分成分个模块来讲解。1)Inode的相关结构,这是所有NameNode的基础结构。2)其次时以FSDirectroy为主的有关文件目录的相关的操作。3)之后是数文件数据块管理相关的操作,主要以FSNamesytem类为主。4)日志和checkpoint相关的操作。5)整个NameNode的启动和初始化。


Inode 文件和目录的基本结构

?



?????????????????????????? 图1 Inode结构类图


?


我们先看一下inode的数据结构:class Inode是一个abstractclass,提供了所有inode的基本的数据成员:


protectedbyte[] name;? // Name,文件或目录的名字


? protected INodeDirectory parent;? //父母录的inode


? protectedlongmodificationTime;? //最后修改时间修改时间


? protectedlongaccessTime;???????? //最后访问时间


private long permission;???? //访问权限:需要注意的是:权限为long类型的64位数据中:


MODE(0, 16), 前16为mode


??? GROUP(MODE.OFFSET + MODE.LENGTH, 25), 后面是goupname的一个id,对应该id,可以获得groupname,其映射关系保存在 SerialManger里面。


USER(GROUP.OFFSET + GROUP.LENGTH, 23);


?


下面我们看一下INodeFile类,其extend类INode,增加了如下的数据成员:


protected BlockInfo blocks[] = null;? //文件的block数组


? protectedshortblockReplication;????? //该文件的副本数


? protectedlongpreferredBlockSize;???? //该文件的blockSize


?


?


INodeFileUnderConstruction来表示该file正在被某一个client打开并正在进行操作。我们看一下其成员变量可能就比较清楚了。


final String clientName;???????? // lease holder


? privatefinal String clientMachine;


? privatefinal DatanodeDescriptor clientNode; // if client is a cluster node too.


?


? privateintprimaryNodeIndex = -1; //the nodeworking on lease recovery


? private DatanodeDescriptor[] targets = null;?? //locations for last block


? privatelonglastRecoveryTime = 0;


?


我们看到上面的成员变量为clientName为客户端的名字,也就是该文件锁的持有者lease holder,这用来查找其相关的leaser。 ClientNode为额外的client的信息。primaryNodeIndex是锁恢复使用,targets最后一个块所在的Datanode的信息。我们看到INodeFileUnderConstruction类和InodeFile的最大的不同之处,在于保存了lease这些动态的信息。这些都是一个open的文件需要保存的信息。


?


INodeDirectory是目录的Inode信息,其extend了INode类, 再起基础之上,增加了


privateList children; 其要保存该目录下的所以的文件和子目录的inode


?


我们看一下该类比较重要的一个函数:


INodegetNode(String path)


其给定一个绝对路径path,来查找相应的INode结构。其主要流程如下:


1)? 首先调用父类的getPathComponents函数来获取路径的各个component,例如


/root/test/file1? 获得各个 bytes[][]={‘’,’root’,’test’,’file1’}


2)?调用getExistingPathINodes来从当前目录获取相关的inode


3)?其首先检查当前目录的name和component[0]是一致的。


4)??


?


?


类INodeDirectoryWithQuota继承了InodeDirectory,添加了quota的控制,相对比较简单。


?


至于类的函数,都是相关Inode的上的操作,大多都是set和get操作,一些比较复杂的操作我们在后面用到时,再详细讲述。


Permission权限管理

以上就是INode类的基本的数据成员。下面我们看一下比较重要的权限管理:


?


?


?


?


?


Lease

HDFS的lease锁实现的比较简单。通过lease,我们就能保证系统中只能有一个客户端对文件进行操作。当一个客户端需要写文件时,需要到NameNode上去申请lease锁,其它客户端需要等待该操作完成后获取该锁后才能执行。


?


我们看到,这个锁分了两次映射:


?


Lease??clientName -à path的映射


???????Path???? à? lease 的映射


?


我们先看一下lease的数据结构


privatefinal String holder;


??? privatelonglastUpdate;


??? privatefinalCollection paths = new TreeSet();


其三个数据成员为: holder,为客户端的clientName,最后一次更新时间lastUpdate,还有一个树实现的集合TreeSet,保存了该lease,也就是该clientName上已经获取的文件上的锁。其上面的操作都比较简单。比较重要的是renew操作,更新lastUpdate的时间。需要注意的是,整个一个客户端上的锁只有一个更新时间。


?


下面我们看一下leaseManager


?


? privatelongsoftLimit = FSConstants.LEASE_SOFTLIMIT_PERIOD;


? privatelonghardLimit = FSConstants.LEASE_HARDLIMIT_PERIOD;


?


? //


? // Usedfor handling lock-leases


? //Mapping: leaseHolder -> Lease


? //


? private SortedMap leases = new TreeMap();


? // Set of:Lease


? private SortedSet sortedLeases = new TreeSet();


?


? //


? // Mappath names to leases. It is protected by the sortedLeases lock.


? // The mapstores pathnames in lexicographical order.


? //


? privateSortedMap sortedLeasesByPath = new TreeMap();


?


其主要保存了两个常量softlimit 和 hardlimit,其次是一个lease集合和两个map,一个是文件到leaseHolder ?>lease的map,一个是 path 到lease的 Map.


?


BlockMap的管理

Block为基本的信息,其数据成员包括3个:


privatelongblockId;


? privatelongnumBytes;


? privatelonggenerationStamp;


?


BlockInfo为 extend类block,同时增加了两个数据成员:


private INodeFile?????????inode; // 用来和该block对于的文件关联


private Object[3*replication]triplets;?????? // 用来和DataNode关联


?


其实类似于:


?


Struct{


?? DatanodeDescriptor dn;


?? BlockInfo pre;?? 其保存了一个该Datanode上所有blockInfo组成的双向链表


?? BlockInfo next;


}Triplesp[replication];??


?


其实 triplets可以更简单的定义为一下数组:


0

?

?

1

?

?

2

?

?

BlockInfo pre

DatanodeDescriptor

dn

BlockInfo

next

?

?

?

?

?

?


?


其blockInfo链表的头保存在DatanodeDescriptor的


privatevolatile BlockInfo blockList = null;


该DatanodeDescriptor的所以的BlockInfo都连接在blocklist链表中。


?


我们看到BlockInfo保存了完整的该block的信息,获得一个block的blockInfo信息,就可以获取该block所在的文件的INode,同时获取该bloc所有的副本的DataNodeDescription信息。


?


其比较重要的函数就是 numNodes 返回该block对于的Datanode的数量,也就是副本的数量。


?


BlocksMap类里数据成员为:


privateMap map; 需要注意的其,其定义为BlockInfo->BlockInfo的map关系,但实际上是Block->BlockInfo的映射关系。


?


其为全局的映射,其初始化为一个HashMap. ?我们在INodeFile里保存文件所在的block信息,知道了block,我们就可以在上述的map中获得BlockInfo信息,获取BlockInfo信息,我们就可以获得其相关的DataNode的信息。


?


我们看到了整个NameNode的数据块保存了两层映射:


InodeFile 保存了??file(offset, length)? à block 的映射


?BlocksMap保存了 block?àBlockInfo (DataNode)的映射


?


前者会保存在磁盘上,以NameNode的image的形式,定期checkpoint到磁盘上。当问及Namenode启动或恢复是,从image和editlog中获得。后者的映射关系仅保存在内存中,其启动或恢复时,各个Datanode需要把其映射关系上报给Namenode.


?


我们看一下DataNodeDescription


?


DatanodeID定义了如下的信息:


public String name;????? /// hostname:portNumber


? public String storageID; /// unique percluster storageID


? protectedintinfoPort;???? /// the port where the infoserver is running


? publicintipcPort;????


?


DatanodeInfo定义了如下信息:


? protectedlongcapacity;


? protectedlongdfsUsed;


? protectedlongremaining;


? protectedlonglastUpdate;


? protectedintxceiverCount;


? protected String location = NetworkTopology.DEFAULT_RACK;


?


下面我们看一下DatanodeDescriptor类:


DatanodeDescription类里面定义了该Datanode上的block的一下集合和操作。


?


privatevolatile BlockInfo blockList = null; //这个该DataNode上的所有的BlockInfo组成的链表的头


?


? /**Aqueueofblockstobereplicatedbythisdatanode*/


? private BlockQueue replicateBlocks = new BlockQueue();


需要复制的块


? /**Aqueueofblockstoberecoveredbythisdatanode*/


? private BlockQueue recoverBlocks = new BlockQueue();


需要回复的块


? /**Asetofblockstobeinvalidatedbythisdatanode*/


? private Set invalidateBlocks = newTreeSet();


无效的块的集合


?


在FSNameSystem里面定义了一些管理结构:


?private Map> recentInvalidateSets =


??? new TreeMap>();


?最*无效的块的集合?storageID->Collection 的集合


?


Map> excessReplicateMap =


new TreeMap>();


多余的块的集合


?


Datanode 和 NameNode的通信有三种:


?


?


blockReport


FSDirectory
FSEdit
FSNamesystem

ReplicationTarget


我们看一下FSNameSystem的成员变量


?


?


?


NameNode

NameNode 上对文件的基本操作。


我们看一下NameNode实现ClientProtocol上的接口。


public LocatedBlocks? getBlockLocations(Stringsrc,


????????????????????????????????????????? long offset,


????????????????????????????????????????? long length) throws IOException;


这是获取LocatedBlocks上的操作。


?


我们看一下这个操作,其用户获取一个文件的 offset开始的length长度的数据块信息。


?


BlockInfo?map


DataNodeDescription dn;


?


FsName



友情链接: