以下是本人研究源代码成果, 此文僅献给我和我的小伙伴们,不足之处,欢迎斧正-------------------------------------------------致谢道格等人!
注:hadoop版本0.20.2,有童鞋表示看代码头晕,所以本文采用纯文字描述,哥还特意为你们把字体调调颜色噢
^ o ^
上一篇文章,我们一起讨论了DFSClient数据的校验和读取过程,下面我们一起讨论关于数据写入的详细过程,该过程对比前面,稍稍复杂一点点啦,
====================================================================================
--------------------------------------------------------------------------------------------------------------------------------------------
分析写入数据前,我们先弄清楚几个重量级的概念
Block数据块是数据写入的基本单位,Block大小默认为64MB
Block由若干个Packet数据包组成,Packet默认大小为64KB
Packet由若干个chunk组成,chunk是校验和的基本单元,一个chunk对应,chunk的数据=数据(512B)+校验和(4B)
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
第一种创建OutputStream方式分析:DFClient.create();
public OutputStream create(String src,
boolean overwrite
) throws IOException {
return create(src, overwrite, defaultReplication, defaultBlockSize, null);
}
----------------------------------------------------------------------------------------------------------------------------------------------------------------
public OutputStream create(String src,
boolean overwrite,
short replication,
long blockSize,
Progressable progress
) throws IOException {
return create(src, overwrite, replication, blockSize, progress,
conf.getInt("io.file.buffer.size", 4096));
}
----------------------------------------------------------------------------------------------------------------------------------------------------------------
public OutputStream create(String src,
boolean overwrite,
short replication,
long blockSize,
Progressable progress,
int buffersize
) throws IOException {
return create(src, FsPermission.getDefault(),
overwrite, replication, blockSize, progress, buffersize);
}
----------------------------------------------------------------------------------------------------------------------------------------------------------------
public OutputStream create(String src,
FsPermission permission,
boolean overwrite,
short replication,
long blockSize,
Progressable progress,
int buffersize
) throws IOException {
//检查HDFS文件系统是否已经打开
checkOpen();
if (permission == null) {
//如果创建写入文件夹时没有指定文件的权限,则采用系统默认的权限,可配置
permission = FsPermission.getDefault();
}
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
LOG.debug(src + ": masked=" + masked);
//创建文件输出流 ,很关键的一步,初始化DFSOutputStream
OutputStream result = new DFSOutputStream(src, masked,
overwrite, replication, blockSize, progress, buffersize,
conf.getInt("io.bytes.per.checksum", 512));
//将文件路径和输出流加入租约表中
leasechecker.put(src, result);
return result;
}
create方法小结:创建输出流DFSOutputStream然后将文件路径和输出流加入到租约表
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/**
* 该构造方法会被DFSClient的create方法调用,
* 然后调用computePacketChunkSize方法来计算出Packet和Chunk的大小,
* 之后调用ClientProtocol的create方法来在NameNode的命令空间中创建了一个用于
* 接收DFSOutputStream中的数据文件,最后启动用于向DataNode写入数据的Streamer线程
*
*/
DFSOutputStream(String src, FsPermission masked, boolean overwrite,
short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum) throws IOException {
//初始化校验和相关对象
this(src, blockSize, progress, bytesPerChecksum);
//计算发往数据节点的数据包能够包含多少个检验块,以及包的长度
computePacketChunkSize(writePacketSize, bytesPerChecksum);
try {
//执行远程过程在namenode上创建一个新的处于构建状态的文件
namenode.create(
src, masked, clientName, overwrite, replication, blockSize);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class);
}
//非常重要的一步:启动数据发送器。开始写入数据
streamer.start();
}
------------------------------------------------------------------------------------------------------------------------------------------------------------------------- /**
* 私有的构造方法,如果指定的bytesPerChecksum小于1或者blockSize不能整除bytesPerChecksum,
* 则会抛出相应的异常,而且在方法中完成了对DataChecksum校验和对象的初始化工作
*
* @param src
* @param blockSize
* @param progress
* @param bytesPerChecksum
* @throws IOException
*/
private DFSOutputStream(String src, long blockSize, Progressable progress,
int bytesPerChecksum) throws IOException {
//调用父类的FSOutputSummer的构造函数,提供校验方法,校验块大小等参数
super(new CRC32(), bytesPerChecksum, 4);
//初始化
this.src = src;
this.blockSize = blockSize;
this.progress = progress;
if (progress != null) {
LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
}
if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
") and blockSize(" + blockSize +
") do not match. " + "blockSize should be a " +
"multiple of io.bytes.per.checksum");
}
// 根据bytesPerChecksum取得数据的校验和
checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, bytesPerChecksum);
}
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
// FSOutputSummer构造函数
protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {
this.sum = sum;
this.buf = new byte[maxChunkSize]; //存储数据的缓存区
this.checksum = new byte[checksumSize];//存储校验和缓冲区
this.count = 0;
}
---------------------------------------------------------------------------------------------------------------------------------------------------------------
computePacketChunkSize(int psize, int csize) 方法:
/**
* 方法来计算出接下来的需要的Packet大小,和数据包能包含多少校验块
* @param psize 64KB
* @param csize 512B
*/
private void computePacketChunkSize(int psize, int csize) {
//chunk的大小为原生数据的大小加上与数据对应的校验和的大小
int chunkSize = csize + checksum.getChecksumSize();
int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
//packet的大小为Packet的头部信息的大小加上packet中所有chunk的总大小
packetSize = n + chunkSize*chunksPerPacket;
if (LOG.isDebugEnabled()) {
LOG.debug("computePacketChunkSize: src=" + src +
", chunkSize=" + chunkSize +
", chunksPerPacket=" + chunksPerPacket +
", packetSize=" + packetSize);
}
}
=========================================================================================================
第二种创建OutputStream方式分析:DFClient.create();
/**
* 该方法用于向一个已经存在的文件的末尾添加内容,
* 首先通过namenode的append方法来定位文件的最后一个Block的位置,
* 之后创建DFSOutputStream对象来向文件的末尾添加内容,
* 同时将创建的DFSOutputSream添加到客户端的租约中
* @param src
* @param buffersize
* @param progress
* @return
* @throws IOException
*/
OutputStream append(String src, int buffersize, Progressable progress
) throws IOException {
checkOpen();
FileStatus stat = null;
LocatedBlock lastBlock = null;
try {
stat = getFileInfo(src);
lastBlock = namenode.append(src, clientName);
} catch(RemoteException re) {
throw re.unwrapRemoteException(FileNotFoundException.class,
AccessControlException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class);
}
//很关键的一步,初始化 DFSOutputStream
OutputStream result = new DFSOutputStream(src, buffersize, progress,
lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
leasechecker.put(src, result);
return result;
}
--------------------------------------------------------------------------------------------------------------------------------------------------------
/**
*
* 该构造器被DFSClient的append方法来调用
*/
DFSOutputStream(String src, int buffersize, Progressable progress,
LocatedBlock lastBlock, FileStatus stat,
int bytesPerChecksum) throws IOException {
//首先调用私有的DFSDataOutputStream的构造方法,完成了对DataChecksum校验和对象的初始化工作
this(src, stat.getBlockSize(), progress, bytesPerChecksum);
//create方法里没有这步。。为了获得文件的长度
initialFileSize = stat.getLen(); // length of file when opened
//由于是追加写入的方式,所以上次写入可能会有未被填满的数据块,所以继续利用
if (lastBlock != null) {
//取得最后一个数据Block对象
block = lastBlock.getBlock();
//计算最后一个数据Block已经被使用的空间的大小
long usedInLastBlock = stat.getLen() % blockSize;
//计算最后一个数据Block剩余的空间
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
// 计算出最后一个crc chunk数据块中,已经使用的空间大小
int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
//计算出最后一个crc chunk数据块中空闲的空间的大小
int freeInCksum = bytesPerChecksum - usedInCksum;
// if there is space in the last block, then we have to
// append to that block
if (freeInLastBlock > blockSize) {
throw new IOException("The last block for file " +
src + " is full.");
}
// 取得最后一个数据Block的大小
bytesCurBlock = lastBlock.getBlockSize();
/**
* 如果usedInCksum > 0 && freeInCksum > 0
* 则最后一个Chunk还有空闲的空间,
* 那么需要使用将要发送的Packet来填充还有空闲空间的Chunk
*/
if (usedInCksum > 0 && freeInCksum > 0) {
computePacketChunkSize(0, freeInCksum);
resetChecksumChunk(freeInCksum);
this.appendChunk = true;
} else {
// if the remaining space in the block is smaller than
// that expected size of of a packet, then create
// smaller size packet.
//
computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock),
bytesPerChecksum);
}
//取得最后一个数据Block所在的DataNode集合
//很关键的一步:初始化正在被写入的Block的DataNode集合
nodes = lastBlock.getLocations();//lastBlock=namenode.append(src, clientName);
//用于标识写入数据出错的DataNode的索引
errorIndex = -1; //
if (nodes.length < 1) {
throw new IOException("Unable to retrieve blocks locations " +
" for last block " + block +
"of file " + src);
}
//尝试建立起用于写入数据的DataNode管道
processDatanodeError(true, true);
//开启写数据线程
streamer.start();
}
else {
computePacketChunkSize(writePacketSize, bytesPerChecksum);
streamer.start();
}
}
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
处理IO异常的专属方法:
/**
* 处理任何可能的IO错误
* @param hasError
* @param isAppend
* @return
*/
private boolean processDatanodeError(boolean hasError, boolean isAppend) {
//如果不包含错误,则直接返回false
if (!hasError) {
return false;
}
//如果等待DataNode返回信息的线程已经关闭,则方法直接返回true
if (response != null) {
LOG.info("Error Recovery for block " + block +
" waiting for responder to exit. ");
return true;
}
if (errorIndex >= 0) {
LOG.warn("Error Recovery for block " + block
+ " bad datanode[" + errorIndex + "] "
+ (nodes == null? "nodes == null": nodes[errorIndex].getName()));
}
//关闭打开的blockStream 和 blockReplyStream
if (blockStream != null) {
try {
blockStream.close();
blockReplyStream.close();
} catch (IOException e) {
}
}
blockStream = null;
blockReplyStream = null;
// 将确认队列 中的未收到DataNode返回的响应信息的所有数据Packet添加到 数据队列的头部
synchronized (ackQueue) {
dataQueue.addAll(0, ackQueue);
ackQueue.clear();
}
boolean success = false;
//开始进入到恢复写入出错的Block的循环中
while (!success && clientRunning) {
DatanodeInfo[] newnodes = null;
//如果需要进行写入操作的DataNode集合为空,则抛出I/O异常,而且需要将Streamer线程关闭,方法直接返回false
if (nodes == null) {
String msg = "Could not get block locations. " +
"Source file \"" + src
+ "\" - Aborting...";
LOG.warn(msg);
//TODO step3
setLastException(new IOException(msg));
closed = true;
if (streamer != null) streamer.close();
return false;
}
//取得DataNode管道中的所有DataNode的名称字符串
StringBuilder pipelineMsg = new StringBuilder();
for (int j = 0; j < nodes.length; j++) {
pipelineMsg.append(nodes[j].getName());
if (j < nodes.length - 1) {
pipelineMsg.append(", ");
}
}
/**
* 如果 errorIndex<0,则将DataNode管道集合直接赋值给新的DataNode集合,
* 否则需要将出错的DataNode从DataNode管道中移除,然后将剩余的DataNode添加到新的DataNode集合中
*/
if (errorIndex < 0) {
newnodes = nodes;
} else {
if (nodes.length <= 1) {
lastException = new IOException("All datanodes " + pipelineMsg +
" are bad. Aborting...");
closed = true;
if (streamer != null) streamer.close();
return false;
}
LOG.warn("Error Recovery for block " + block +
" in pipeline " + pipelineMsg +
": bad datanode " + nodes[errorIndex].getName());
newnodes = new DatanodeInfo[nodes.length-1];
//TODO
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
newnodes.length-errorIndex);
}
// Tell the primary datanode to do error recovery
// by stamping appropriate generation stamps.
//
LocatedBlock newBlock = null;
ClientDatanodeProtocol primary = null;
DatanodeInfo primaryNode = null;
try {
// 从新的DataNode集合中选择最小的DataNode节点作为执行恢复操作的主节点
primaryNode = Collections.min(Arrays.asList(newnodes));
//TODO step4
//建立到主DataNode的RPC连接
primary = createClientDatanodeProtocolProxy(primaryNode, conf);
//通过建立的RPC的recoverBlock方法来启动DataNode上Block的恢复过程,并返回最新Block
newBlock = primary.recoverBlock(block, isAppend, newnodes);
} catch (IOException e) {
recoveryErrorCount++;
if (recoveryErrorCount > maxRecoveryErrorCount) {
if (nodes.length > 1) {
// if the primary datanode failed, remove it from the list.
// The original bad datanode is left in the list because it is
// conservative to remove only one datanode in one iteration.
for (int j = 0; j < nodes.length; j++) {
if (nodes[j].equals(primaryNode)) {
//从所有节点中找出最小的,并将其所在的位置标识为errorIndex
errorIndex = j;
}
}
// 从DataNode集合中移除最小DadaNode
newnodes = new DatanodeInfo[nodes.length-1];
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
newnodes.length-errorIndex);
nodes = newnodes;
LOG.warn("Error Recovery for block " + block + " failed " +
" because recovery from primary datanode " +
primaryNode + " failed " + recoveryErrorCount +
" times. " + " Pipeline was " + pipelineMsg +
". Marking primary datanode as bad.");
//将恢复次数设置为0
recoveryErrorCount = 0;
errorIndex = -1;
return true; // sleep when we return from here
}
/**
* 如果是DataNode全部死亡造成,则无法恢复
*/
String emsg = "Error Recovery for block " + block + " failed " +
" because recovery from primary datanode " +
primaryNode + " failed " + recoveryErrorCount +
" times. " + " Pipeline was " + pipelineMsg +
". Aborting...";
LOG.warn(emsg);
lastException = new IOException(emsg);
closed = true;
if (streamer != null) streamer.close();
return false; // abort with IOexception
}
LOG.warn("Error Recovery for block " + block + " failed " +
" because recovery from primary datanode " +
primaryNode + " failed " + recoveryErrorCount +
" times. " + " Pipeline was " + pipelineMsg +
". Will retry...");
return true; // sleep when we return from here
} finally {
RPC.stopProxy(primary);
}
/**
* 至此,数据块恢复成功
*/
recoveryErrorCount = 0; // block recovery successful
// If the block recovery generated a new generation stamp, use that
// from now on. Also, setup new pipeline
if (newBlock != null) {
//更新恢复之后的Block对象的信息
block = newBlock.getBlock();
nodes = newBlock.getLocations();
}
this.hasError = false;
lastException = null;
errorIndex = 0;
//建立起到DataNode管道中的第一个DataNode的Socket连接,
success = createBlockOutputStream(nodes, clientName, true);
}
//出错的Block恢复之后,重新启动ResponseProcessor线程处理
response = new ResponseProcessor(nodes);
response.start();
return false; // do not sleep, continue processing
}
DFSClient技术内幕(数据的写入--OutputStream的初始化)
原文:http://blog.csdn.net/u013494310/article/details/19236919