以下是本人研究源代码成果, 此文僅献给我和我的小伙伴们,不足之处,欢迎斧正-------------------------------------致谢道格等人!
注:hadoop版本0.20.2,有童鞋表示看代码头晕,所以本文采用纯文字描述,哥还特意为你们把字体调调颜色噢 ^ o ^
上一篇文章,我们一起讨论了写入数据的准备过程,创建OutputStream下面我们一起讨论关于数据写入的详细过程,该过程对比前面,稍稍复杂一点点。。
=========================================================================================================
DFSOutputStream(String src, FsPermission masked, boolean overwrite,
short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum) throws IOException {
//初始化校验和相关对象
this(src, blockSize, progress, bytesPerChecksum);
computePacketChunkSize(writePacketSize, bytesPerChecksum);
try {
namenode.create(
src, masked, clientName, overwrite, replication, blockSize);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class);
}
streamer.start();
}
上面我们通过 streamer.start();启动了数据发送器,下面详细分析数据是如何写入的,在分析写入过程之前,我们介绍下几个与数据写入的DFSOutputStream的内部类及其成员方法
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
数据校验器FSOutputSummer
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
下面详细分析
/**
* DataStreamer是发送数据的进程
* 在发送Packet之前,它会首先从Namenode中获得一个blockid和Block的位置信息
* 然后它会循环地从dataQueue中取得一个Packet,然后将该Packet真正写入到与DataNode所建立的socket中
* 当将属于一个Block的所有Packet都发送给DataNode,并且返回了与每个Packet所对应的响应信息之后
* ,DataStream 会关闭当前的数据Block
* @author Administrator
*
*/
private class DataStreamer extends Daemon {
/**DataStreamer进程是否被关闭的标识**/
private volatile boolean closed = false;
public void run() {
//当·线程没有关闭,而且客户端正在运行着,则开始循环发送Packet
while (!closed && clientRunning) {
//如果在发送Packet过程中,处理DataNode返回的响应的ResponseProcessor线程发生了错误,那么需要将response关闭掉
if (hasError && response != null) {
try {
response.close();
//当主线程处理完其他的事务后,需要用到子线程的处理结果,这个时候就要用到join()
response.join();
response = null;
} catch (InterruptedException e) {
}
}
Packet one = null;
synchronized (dataQueue) {
// 调用processDatanodeError来处理任何可能的IO错误
boolean doSleep = processDatanodeError(hasError, false);
// 然后DataStreamer会在dataQueue上进行等待,一直到dataQueue上出现需要发送的Packet为止
while ((!closed && !hasError && clientRunning
&& dataQueue.size() == 0) || doSleep) {
try {
到这里,数据发送器需要等待,下面我们回到主线程,等待写入数据,并将数据打包,将数据打包并加入到数据队列 的过程
dataQueue.wait(1000);
} catch (InterruptedException e) {
}
doSleep = false;
}
//如果在等待数据包的过程中,DataStreamer线程被关闭,或者发生了IO错误,
//或者客户端停止了运行,那么将直接跳过此次发送Packet的循环
if (closed || hasError || dataQueue.size() == 0 || !clientRunning) {
continue;
}
try {
// 获取第一个Packet等待发送
one = dataQueue.getFirst();
//获取该packet在Block中的偏移量
long offsetInBlock = one.offsetInBlock;
//如果到DataNode的blockStream输出流还没被打开,
//那么首先需要调用nextBlockOutputStream方法来连接起与DataNode的连接
//然后启动ResponseProcessor线程
if (blockStream == null) {
LOG.debug("Allocating new block");
//非常关键的一步:数据管道的建立
nodes = nextBlockOutputStream(src);
this.setName("DataStreamer for file " + src +
" block " + block);
response = new ResponseProcessor(nodes);
//启动数据响应线程,确保数据都成功发送
response.start();
}
//如果Packet在数据Block中的偏移量大于Block的大小,那么会抛出对应的异常
if (offsetInBlock >= blockSize) {
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
" Offset of packet in block " +
offsetInBlock +
" Aborting file " + src);
}
//取得Packet中的所有信息
//非常重要的一步:数据都以packet为单位打包写入了数据缓冲区
ByteBuffer buf = one.getBuffer();
// 将Packet从数据队列移动到确认队列
//该过程的作用:数据写入dataNode后,需要将当前数据包packet从等待发送的数据队列中移除,
//然后将其加入到确认队列,等待确认线程确认
dataQueue.removeFirst();
dataQueue.notifyAll();
synchronized (ackQueue) {
ackQueue.addLast(one);
ackQueue.notifyAll();
}
// 通过blockStream 将ByteBuffer里面的所有信息都发送给 远程的DataNode
//非常重要的一步:真正将数据从ByteBuffer发送给DataNode
blockStream.write(buf.array(), buf.position(), buf.remaining());
//如果该Packet是最后一个,那么写入0,通知DataNode数据传输完成
if (one.lastPacketInBlock) {
blockStream.writeInt(0); // indicate end-of-block
}
//刷新blockStream,将数据写入
blockStream.flush();
if (LOG.isDebugEnabled()) {
LOG.debug("DataStreamer block " + block +
" wrote packet seqno:" + one.seqno +
" size:" + buf.remaining() +
" offsetInBlock:" + one.offsetInBlock +
" lastPacketInBlock:" + one.lastPacketInBlock);
}
} catch (Throwable e) {
LOG.warn("DataStreamer Exception: " +
StringUtils.stringifyException(e));
if (e instanceof IOException) {
setLastException((IOException)e);
}
hasError = true;
}
}
if (closed || hasError || !clientRunning) {
continue;
}
//当Block的最后一个Packet发送出去后,DataStreamer会一直等待ackQueue队列为空,
//即与所有Packet对应的响应都已经被接受了,然后执行清理工作,或者发送第二个Block
//首先关闭response线程,然后关闭socket连接
if (one.lastPacketInBlock) {
synchronized (ackQueue) {
while (!hasError && ackQueue.size() != 0 && clientRunning) {
try {
ackQueue.wait(); // 等待确认队列接收来自DataNode的响应,并做相应处理(运行)
} catch (InterruptedException e) {
}
}
}
LOG.debug("Closing old block " + block);
this.setName("DataStreamer for file " + src);
response.close(); // ignore all errors in Response
try {
response.join();
response = null;
} catch (InterruptedException e) {
}
if (closed || hasError || !clientRunning) {
continue;
}
synchronized (dataQueue) {
try {
blockStream.close();
blockReplyStream.close();
} catch (IOException e) {
}
nodes = null;
response = null;
blockStream = null;
blockReplyStream = null;
}
}
//汇报数据的写入进度信息
if (progress != null) { progress.progress(); }
// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && clientRunning) {
try {
Thread.sleep(artificialSlowdown);
} catch (InterruptedException e) {}
}
}
}
// close方法关于关闭DataStreamer线程,首先将线程的关闭标识closed设置为true,
// 然后分别通知dataQueue和ackQueue队列上的所有等待者
void close() {
closed = true;
synchronized (dataQueue) {
dataQueue.notifyAll();
}
synchronized (ackQueue) {
ackQueue.notifyAll();
}
this.interrupt();
}
}
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
数据管道的建立:
/**
* 申请一个新的数据块来存储用户写入的数据,并且与第一个存储节点建立网络连接
*/
private DatanodeInfo[ ] nextBlockOutputStream(String client) throws IOException {
LocatedBlock lb = null;
boolean retry = false;
DatanodeInfo[] nodes;
//取得新的写入Block的尝试次数
int count = conf.getInt("dfs.client.block.write.retries", 3);
boolean success;
do {
hasError = false;
lastException = null;
errorIndex = 0;
retry = false;
nodes = null;
success = false;
long startTime = System.currentTimeMillis();
//TODO step1
//申请数据块
lb = locateFollowingBlock(startTime);
//取得定位到的数据Block
block = lb.getBlock();
//取得包含定位到的Block的DataNode集合
nodes = lb.getLocations();
//
// 通过上面的createBlockOutputStream方法建立起与DataNode管道中第一个DataNode的socket链接
//
//创建数据流管道与第一个存储节点建立网络连接
success = createBlockOutputStream(nodes, clientName, false);
/**
* 如果与DataNode建立连接失败,那么需要将取得的数据Block添加到NameNode的抛弃Block集合中
* 同时将出错的DataNode添加到excludedNodes集合中,然后将标识需要重新尝试取得一个新的Block的标志retry设置为true
*/
if (!success) {
LOG.info("Abandoning block " + block);
//放弃该数据块
namenode.abandonBlock(block, src, clientName);
// Connection failed. Let‘s wait a little bit and retry
retry = true;
try {
if (System.currentTimeMillis() - startTime > 5000) {
LOG.info("Waiting to find target node: " + nodes[0].getName());
}
Thread.sleep(6000);
} catch (InterruptedException iex) {
}
}
} while (retry && --count >= 0);
if (!success) {
throw new IOException("Unable to create new block.");
}
return nodes;
}
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
申请数据块 locateFollowingBlock方法:比较简单
/**
* 申请数据块
* @param start
* @return
* @throws IOException
*/
private LocatedBlock locateFollowingBlock(long start
) throws IOException {
int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
while (true) {
try {
return namenode.addBlock(src, clientName);
} catch (RemoteException e) {
IOException ue =
e.unwrapRemoteException(FileNotFoundException.class,
AccessControlException.class,
NSQuotaExceededException.class,
DSQuotaExceededException.class);
if (ue != e) {
throw ue; // no need to retry these exceptions
}
if (NotReplicatedYetException.class.getName().
equals(e.getClassName())) {
if (retries == 0) {
throw e;
} else {
--retries;
LOG.info(StringUtils.stringifyException(e));
if (System.currentTimeMillis() - localstart > 5000) {
LOG.info("Waiting for replication for "
+ (System.currentTimeMillis() - localstart) / 1000
+ " seconds");
}
try {
LOG.warn("NotReplicatedYetException sleeping " + src
+ " retries left " + retries);
Thread.sleep(sleeptime);
sleeptime *= 2;
} catch (InterruptedException ie) {
}
}
} else {
throw e;
}
}
}
}
}
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
/**
* 用于建立起到DataNode管道中的第一个DataNode的Socket连接,并发送Block的头信息
* @param nodes 所有接收数据DataNode集合
* @param client 客户端名称
* @param recoveryFlag 标识是否为错误重新恢复建立的连接
* @return
*/
private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
boolean recoveryFlag) {
String firstBadLink = "";
if (LOG.isDebugEnabled()) {
for (int i = 0; i < nodes.length; i++) {
LOG.debug("pipeline = " + nodes[i].getName());
}
}
// 被写入的Block是否被记录到NameNode的标识
persistBlocks = true;
try {
//与远程DataNode管道中的第一个DataNode建立起Socket连接
LOG.debug("Connecting to " + nodes[0].getName());
InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
s = socketFactory.createSocket();
//设置Socket的超时时间和发送缓存区的大小
int timeoutValue = 3000 * nodes.length + socketTimeout;
NetUtils.connect(s, target, timeoutValue);
s.setSoTimeout(timeoutValue);
s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
LOG.debug("Send buf size " + s.getSendBufferSize());
long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
datanodeWriteTimeout;
//
// 创建用于向远程的DataNode写入数据的输出流
//
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout),
DataNode.SMALL_BUFFER_SIZE));
//创建用于接收DataNode发送过来的响应信息的输入流
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
//客户端向DataNode写入的数据Block的头信息的内容
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_WRITE_BLOCK );
out.writeLong( block.getBlockId() );
out.writeLong( block.getGenerationStamp() );
out.writeInt( nodes.length );
out.writeBoolean( recoveryFlag ); // recovery flag
Text.writeString( out, client );
out.writeBoolean(false); // Not sending src node information
out.writeInt( nodes.length - 1 );
for (int i = 1; i < nodes.length; i++) {
nodes[i].write(out);
}
checksum.writeHeader( out );
out.flush();
// 接收DataNode返回的连接响应
firstBadLink = Text.readString(blockReplyStream);
if (firstBadLink.length() != 0) {
throw new IOException("Bad connect ack with firstBadLink " + firstBadLink);
}
blockStream = out;
return true; // success
} catch (IOException ie) {
LOG.info("Exception in createBlockOutputStream " + ie);
// find the datanode that matches
if (firstBadLink.length() != 0) {
for (int i = 0; i < nodes.length; i++) {
if (nodes[i].getName().equals(firstBadLink)) {
errorIndex = i;
break;
}
}
}
hasError = true;
setLastException(ie);
blockReplyStream = null;
return false; // error
}
}
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
DFSClient技术内幕(写入数据——管道的建立)
原文:http://blog.csdn.net/u013494310/article/details/19236947