首页 > 其他 > 详细

DFSClient技术内幕(数据的写入--OutputStream的初始化)

时间:2014-02-15 16:38:20      阅读:701      评论:0      收藏:0      [点我收藏+]
  以下是本人研究源代码成果, 此文僅献给我和我的小伙伴们,不足之处,欢迎斧正-------------------------------------------------致谢道格等人!
注:hadoop版本0.20.2,有童鞋表示看代码头晕,所以本文采用纯文字描述,哥还特意为你们把字体调调颜色噢 ^ o ^
上一篇文章,我们一起讨论了DFSClient数据的校验和读取过程,下面我们一起讨论关于数据写入的详细过程,该过程对比前面,稍稍复杂一点点啦,
 
====================================================================================
--------------------------------------------------------------------------------------------------------------------------------------------
分析写入数据前,我们先弄清楚几个重量级的概念
Block数据块是数据写入的基本单位,Block大小默认为64MB
Block由若干个Packet数据包组成,Packet默认大小为64KB
Packet由若干个chunk组成,chunk是校验和的基本单元,一个chunk对应,chunk的数据=数据(512B)+校验和(4B)

-------------------------------------------------------------------------------------------------------------------------------------------------------------------
第一种创建OutputStream方式分析:DFClient.create();
public OutputStream create(String src, 
                             boolean overwrite
                             ) throws IOException {
    return create(src, overwrite, defaultReplication, defaultBlockSize, null);
  }
 ----------------------------------------------------------------------------------------------------------------------------------------------------------------
public OutputStream create(String src, 
                             boolean overwrite, 
                             short replication,
                             long blockSize,
                             Progressable progress
                             ) throws IOException {
    return create(src, overwrite, replication, blockSize, progress,
        conf.getInt("io.file.buffer.size", 4096));
  }
 ----------------------------------------------------------------------------------------------------------------------------------------------------------------
  public OutputStream create(String src,
      boolean overwrite,
      short replication,
      long blockSize,
      Progressable progress,
      int buffersize
      ) throws IOException {
    return create(src, FsPermission.getDefault(),
        overwrite, replication, blockSize, progress, buffersize);
  }
  ----------------------------------------------------------------------------------------------------------------------------------------------------------------
public OutputStream create(String src, 
                             FsPermission permission,
                             boolean overwrite, 
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize
                             ) throws IOException {
    //检查HDFS文件系统是否已经打开 
    checkOpen();
    if (permission == null) {
    //如果创建写入文件夹时没有指定文件的权限,则采用系统默认的权限,可配置 
      permission = FsPermission.getDefault();
    }
     
    FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
    LOG.debug(src + ": masked=" + masked);
    //创建文件输出流 ,很关键的一步,初始化DFSOutputStream
    OutputStream result = new DFSOutputStream(src, masked,
        overwrite, replication, blockSize, progress, buffersize,
        conf.getInt("io.bytes.per.checksum", 512));
    //将文件路径和输出流加入租约表中 
    leasechecker.put(src, result);
    return result;
  } 
create方法小结:创建输出流DFSOutputStream然后将文件路径和输出流加入到租约表
 

------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    /**
     * 该构造方法会被DFSClient的create方法调用,
     * 然后调用computePacketChunkSize方法来计算出Packet和Chunk的大小,
     * 之后调用ClientProtocol的create方法来在NameNode的命令空间中创建了一个用于
     * 接收DFSOutputStream中的数据文件,最后启动用于向DataNode写入数据的Streamer线程
     * 
     */
    DFSOutputStream(String src, FsPermission masked, boolean overwrite,
        short replication, long blockSize, Progressable progress,
        int buffersize, int bytesPerChecksum) throws IOException {
    //初始化校验和相关对象
      this(src, blockSize, progress, bytesPerChecksum);
    //计算发往数据节点的数据包能够包含多少个检验块,以及包的长度
      computePacketChunkSize(writePacketSize, bytesPerChecksum);
    
      try {
       //执行远程过程在namenode上创建一个新的处于构建状态的文件 
        namenode.create(
            src, masked, clientName, overwrite, replication, blockSize);
      } catch(RemoteException re) {
        throw re.unwrapRemoteException(AccessControlException.class,
                                       NSQuotaExceededException.class,
                                       DSQuotaExceededException.class);
      }
      //非常重要的一步:启动数据发送器。开始写入数据 
      streamer.start();
    } 
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    /**
     * 私有的构造方法,如果指定的bytesPerChecksum小于1或者blockSize不能整除bytesPerChecksum,
     * 则会抛出相应的异常,而且在方法中完成了对DataChecksum校验和对象的初始化工作
     * 
     * @param src
     * @param blockSize
     * @param progress
     * @param bytesPerChecksum
     * @throws IOException
     */
    private DFSOutputStream(String src, long blockSize, Progressable progress,
        int bytesPerChecksum) throws IOException {
       //调用父类的FSOutputSummer的构造函数,提供校验方法,校验块大小等参数 
      super(new CRC32(), bytesPerChecksum, 4);
       //初始化 
      this.src = src;
      this.blockSize = blockSize;
      this.progress = progress;
      if (progress != null) {
        LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
      }
      
      if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
        throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
                              ") and blockSize(" + blockSize + 
                              ") do not match. " + "blockSize should be a " +
                              "multiple of io.bytes.per.checksum");
                              
      }
       // 根据bytesPerChecksum取得数据的校验和
      checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, bytesPerChecksum);    } 
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
// FSOutputSummer构造函数 
protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {
    this.sum = sum;
    this.buf = new byte[maxChunkSize]; //存储数据的缓存区
    this.checksum = new byte[checksumSize];//存储校验和缓冲区
    this.count = 0;
  } 
--------------------------------------------------------------------------------------------------------------------------------------------------------------- 
computePacketChunkSize(int psize, int csize) 方法: 
/**
     * 方法来计算出接下来的需要的Packet大小,和数据包能包含多少校验块
     * @param psize 64KB
     * @param csize 512B
     */
    private void computePacketChunkSize(int psize, int csize) {
    //chunk的大小为原生数据的大小加上与数据对应的校验和的大小
      int chunkSize = csize + checksum.getChecksumSize();
      int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
      chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
      
      //packet的大小为Packet的头部信息的大小加上packet中所有chunk的总大小
      packetSize = n + chunkSize*chunksPerPacket;
      if (LOG.isDebugEnabled()) {
        LOG.debug("computePacketChunkSize: src=" + src +
                  ", chunkSize=" + chunkSize +
                  ", chunksPerPacket=" + chunksPerPacket +
                  ", packetSize=" + packetSize);
      }
    }
=========================================================================================================
 第二种创建OutputStream方式分析:DFClient.create();
/**
   * 该方法用于向一个已经存在的文件的末尾添加内容,
   * 首先通过namenode的append方法来定位文件的最后一个Block的位置,
   * 之后创建DFSOutputStream对象来向文件的末尾添加内容,
   * 同时将创建的DFSOutputSream添加到客户端的租约中
   * @param src
   * @param buffersize
   * @param progress
   * @return
   * @throws IOException
   */
  OutputStream append(String src, int buffersize, Progressable progress
      ) throws IOException {
    checkOpen();
    FileStatus stat = null;
    LocatedBlock lastBlock = null;
    try {
      stat = getFileInfo(src);
      lastBlock = namenode.append(src, clientName);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(FileNotFoundException.class,
                                     AccessControlException.class,
                                     NSQuotaExceededException.class,
                                     DSQuotaExceededException.class);
    }
     //很关键的一步,初始化 
DFSOutputStream
    OutputStream result = new DFSOutputStream(src, buffersize, progress,
        lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
    leasechecker.put(src, result);
    return result;
  } 
-------------------------------------------------------------------------------------------------------------------------------------------------------- 
  /**
     * 
     * 该构造器被DFSClient的append方法来调用
     */
    DFSOutputStream(String src, int buffersize, Progressable progress,
        LocatedBlock lastBlock, FileStatus stat,
        int bytesPerChecksum) throws IOException {
      //首先调用私有的DFSDataOutputStream的构造方法,完成了对DataChecksum校验和对象的初始化工作
      this(src, stat.getBlockSize(), progress, bytesPerChecksum);
      
      //create方法里没有这步。。为了获得文件的长度
      initialFileSize = stat.getLen(); // length of file when opened

    //由于是追加写入的方式,所以上次写入可能会有未被填满的数据块,所以继续利用
      if (lastBlock != null) {
    //取得最后一个数据Block对象
        block = lastBlock.getBlock();
        
        //计算最后一个数据Block已经被使用的空间的大小
        long usedInLastBlock = stat.getLen() % blockSize;
        //计算最后一个数据Block剩余的空间
        int freeInLastBlock = (int)(blockSize - usedInLastBlock);

      
        
        // 计算出最后一个crc chunk数据块中,已经使用的空间大小
        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
        
        //计算出最后一个crc chunk数据块中空闲的空间的大小
        int freeInCksum = bytesPerChecksum - usedInCksum;

        // if there is space in the last block, then we have to 
        // append to that block
        if (freeInLastBlock > blockSize) {
          throw new IOException("The last block for file " + 
                                src + " is full.");
        }

        // 取得最后一个数据Block的大小
        bytesCurBlock = lastBlock.getBlockSize();
       
      
        /**
         * 如果usedInCksum > 0 && freeInCksum > 0
         * 则最后一个Chunk还有空闲的空间,
         * 那么需要使用将要发送的Packet来填充还有空闲空间的Chunk
         */
        if (usedInCksum > 0 && freeInCksum > 0) {
         
          computePacketChunkSize(0, freeInCksum);
          resetChecksumChunk(freeInCksum);
          this.appendChunk = true;
        } else {
          // if the remaining space in the block is smaller than 
          // that expected size of of a packet, then create 
          // smaller size packet.
          //
          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
                                 bytesPerChecksum);
        }

        //取得最后一个数据Block所在的DataNode集合
        //很关键的一步:初始化
正在被写入的Block的DataNode集合
        nodes = lastBlock.getLocations();//lastBlock=namenode.append(src, clientName);
        
        //用于标识写入数据出错的DataNode的索引
        errorIndex = -1;   // 
        
        if (nodes.length < 1) {
          throw new IOException("Unable to retrieve blocks locations " +
                                " for last block " + block +
                                "of file " + src);
                        
        }
        
        //尝试建立起用于写入数据的DataNode管道
        processDatanodeError(true, true);
        
        //开启写数据线程
        streamer.start();
      }
      else {
        computePacketChunkSize(writePacketSize, bytesPerChecksum);
        streamer.start();
      }
    }
---------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
处理IO异常的专属方法: 
/**
     * 处理任何可能的IO错误
     * @param hasError
     * @param isAppend
     * @return
     */
    private boolean processDatanodeError(boolean hasError, boolean isAppend) {
    //如果不包含错误,则直接返回false
      if (!hasError) {
        return false;
      }
      
      //如果等待DataNode返回信息的线程已经关闭,则方法直接返回true
      if (response != null) {
        LOG.info("Error Recovery for block " + block +
                 " waiting for responder to exit. ");
        return true;
      }
      if (errorIndex >= 0) {
        LOG.warn("Error Recovery for block " + block
            + " bad datanode[" + errorIndex + "] "
            + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
      }
      
      //关闭打开的blockStream 和 blockReplyStream
      if (blockStream != null) {
        try {
          blockStream.close();
          blockReplyStream.close();
        } catch (IOException e) {
        }
      }
      blockStream = null;
      blockReplyStream = null;

      // 将确认队列 中的未收到DataNode返回的响应信息的所有数据Packet添加到 数据队列的头部
      synchronized (ackQueue) {
        dataQueue.addAll(0, ackQueue);
        ackQueue.clear();
      }

      boolean success = false;
      //开始进入到恢复写入出错的Block的循环中
      while (!success && clientRunning) {
        DatanodeInfo[] newnodes = null;
        //如果需要进行写入操作的DataNode集合为空,则抛出I/O异常,而且需要将Streamer线程关闭,方法直接返回false
        if (nodes == null) {
          String msg = "Could not get block locations. " +
                                          "Source file \"" + src
                                          + "\" - Aborting...";
          LOG.warn(msg);
          //TODO step3
          setLastException(new IOException(msg));
          closed = true;
          if (streamer != null) streamer.close();
          return false;
        }
        
        //取得DataNode管道中的所有DataNode的名称字符串
        StringBuilder pipelineMsg = new StringBuilder();
        for (int j = 0; j < nodes.length; j++) {
          pipelineMsg.append(nodes[j].getName());
          if (j < nodes.length - 1) {
            pipelineMsg.append(", ");
          }
        }
        
        /**
         * 如果 errorIndex<0,则将DataNode管道集合直接赋值给新的DataNode集合,
         * 否则需要将出错的DataNode从DataNode管道中移除,然后将剩余的DataNode添加到新的DataNode集合中
         */
        if (errorIndex < 0) {
          newnodes = nodes;
        } else {
          if (nodes.length <= 1) {
            lastException = new IOException("All datanodes " + pipelineMsg + 
                                            " are bad. Aborting...");
            closed = true;
            if (streamer != null) streamer.close();
            return false;
          }
          LOG.warn("Error Recovery for block " + block +
                   " in pipeline " + pipelineMsg + 
                   ": bad datanode " + nodes[errorIndex].getName());
          newnodes =  new DatanodeInfo[nodes.length-1];
          //TODO
          System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
          System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
              newnodes.length-errorIndex);
        }

        // Tell the primary datanode to do error recovery 
        // by stamping appropriate generation stamps.
        //
        LocatedBlock newBlock = null;
        ClientDatanodeProtocol primary =  null;
        DatanodeInfo primaryNode = null;
        try {
          // 从新的DataNode集合中选择最小的DataNode节点作为执行恢复操作的主节点
          primaryNode = Collections.min(Arrays.asList(newnodes));
          //TODO step4
          
          //建立到主DataNode的RPC连接
          primary = createClientDatanodeProtocolProxy(primaryNode, conf);
          
          //通过建立的RPC的recoverBlock方法来启动DataNode上Block的恢复过程,并返回最新Block
          newBlock = primary.recoverBlock(block, isAppend, newnodes);
        } catch (IOException e) {
          recoveryErrorCount++;
          if (recoveryErrorCount > maxRecoveryErrorCount) {
            if (nodes.length > 1) {
              // if the primary datanode failed, remove it from the list.
              // The original bad datanode is left in the list because it is
              // conservative to remove only one datanode in one iteration.
              for (int j = 0; j < nodes.length; j++) {
                if (nodes[j].equals(primaryNode)) {
                //从所有节点中找出最小的,并将其所在的位置标识为errorIndex
                  errorIndex = j; 
                }
              }
              // 从DataNode集合中移除最小DadaNode
              newnodes =  new DatanodeInfo[nodes.length-1];
              System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
              System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
                               newnodes.length-errorIndex);
              nodes = newnodes;
              LOG.warn("Error Recovery for block " + block + " failed " +
                       " because recovery from primary datanode " +
                       primaryNode + " failed " + recoveryErrorCount +
                       " times. " + " Pipeline was " + pipelineMsg +
                       ". Marking primary datanode as bad.");
              //将恢复次数设置为0
              recoveryErrorCount = 0; 
              errorIndex = -1;
              return true;          // sleep when we return from here
            }
            /**
             * 如果是DataNode全部死亡造成,则无法恢复
             */
            String emsg = "Error Recovery for block " + block + " failed " +
                          " because recovery from primary datanode " +
                          primaryNode + " failed " + recoveryErrorCount + 
                          " times. "  + " Pipeline was " + pipelineMsg +
                          ". Aborting...";
            LOG.warn(emsg);
            lastException = new IOException(emsg);
            closed = true;
            if (streamer != null) streamer.close();
            return false;       // abort with IOexception
          } 
          LOG.warn("Error Recovery for block " + block + " failed " +
                   " because recovery from primary datanode " +
                   primaryNode + " failed " + recoveryErrorCount +
                   " times. "  + " Pipeline was " + pipelineMsg +
                   ". Will retry...");
          return true;          // sleep when we return from here
        } finally {
          RPC.stopProxy(primary);
        }
        
        /**
         * 至此,数据块恢复成功
         */
        recoveryErrorCount = 0; // block recovery successful

        // If the block recovery generated a new generation stamp, use that
        // from now on.  Also, setup new pipeline
        
        if (newBlock != null) {
        //更新恢复之后的Block对象的信息
          block = newBlock.getBlock();
          nodes = newBlock.getLocations();
        }

        this.hasError = false;
        lastException = null;
        errorIndex = 0;
        //建立起到DataNode管道中的第一个DataNode的Socket连接,
        success = createBlockOutputStream(nodes, clientName, true);
      }
      //出错的Block恢复之后,重新启动ResponseProcessor线程处理
      response = new ResponseProcessor(nodes);
      response.start();
      return false; // do not sleep, continue processing
    }


DFSClient技术内幕(数据的写入--OutputStream的初始化)

原文:http://blog.csdn.net/u013494310/article/details/19236919

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!