首页 > 其他 > 详细

修改SequenceFileInputFormat hdfs blocksize

时间:2017-01-23 21:48:03      阅读:388      评论:0      收藏:0      [点我收藏+]

用spark读取sequencefile时,非常消耗时间,默认情况下SequenceFileInputFormat切分文件是沿用FIleInputFormat,对于大文件会切成Hdfs block size大小,如果想切的更小,增加spark任务的并法度,可以自己修改:

class MySequenceFileInputFormat[K, V] extends FileInputFormat[K, V] {
  private val LOG: Log = LogFactory.getLog(classOf[MySequenceFileInputFormat[K, V]])

  val sequenceFileBlockSize = 30000000 //手动设置blocksize为30M

  @throws[IOException]
  def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[K, V] = new SequenceFileRecordReader

  override protected def getFormatMinSplitSize: Long = 2000L

  @throws[IOException]
  override protected def listStatus(job: JobContext): List[FileStatus] = {
    val files: List[FileStatus] = super.listStatus(job)
    val len: Int = files.size
    var j: Int = 0
    var i: Int = 0
    while (i < len) {
      {
        val file: FileStatus = files.get(i).asInstanceOf[FileStatus]
        if (file.isDirectory) {
          val p: Path = file.getPath
          val fs: FileSystem = p.getFileSystem(job.getConfiguration)
          files.set(i, fs.getFileStatus(new Path(p, "data")))
        }
        if (files.get(i).asInstanceOf[FileStatus].getLen != 0L) {
          files.set(j, files.get(i))
          j += 1
        }
      }
      {
        i += 1;
        i
      }
    }
    files.subList(0, j)
  }

  @throws[IOException]
  override def getSplits(job: JobContext): List[InputSplit] = {
    val sw: Stopwatch = (new Stopwatch).start
    val minSize: Long = Math.max(this.getFormatMinSplitSize, FileInputFormat.getMinSplitSize(job))
    val maxSize: Long = FileInputFormat.getMaxSplitSize(job)
    val splits: ArrayList[InputSplit] = new ArrayList[InputSplit]
    val files: List[FileStatus] = listStatus(job)
    val it: Iterator[FileStatus] = files.iterator

    while(true){
      while(true){
        while(it.hasNext){
          val file: FileStatus = it.next.asInstanceOf[FileStatus]
          val path: Path = file.getPath
          val length: Long = file.getLen

          if(length!=0L){
            var blkLocations: Array[BlockLocation] = null
            if (file.isInstanceOf[LocatedFileStatus]) {
              blkLocations = file.asInstanceOf[LocatedFileStatus].getBlockLocations
            } else {
              val blockSize: FileSystem = path.getFileSystem(job.getConfiguration)
              blkLocations = blockSize.getFileBlockLocations(file, 0L, length)
            }

            if (this.isSplitable(job, path)) {
              //                            long blockSize1 = file.getBlockSize();
              val blockSize1: Long = sequenceFileBlockSize //手动设置blocksize为50M
              val splitSize: Long = this.computeSplitSize(blockSize1, minSize, maxSize)
              var bytesRemaining: Long = 0L
              var blkIndex: Int = 0
              bytesRemaining = length
              while (bytesRemaining.toDouble / splitSize.toDouble > 1.1D) {
                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining)
                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations(blkIndex).getHosts, blkLocations(blkIndex).getCachedHosts))
                bytesRemaining -= splitSize
              }
              if (bytesRemaining != 0L) {
                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining)
                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations(blkIndex).getHosts, blkLocations(blkIndex).getCachedHosts))
              }
            } else {
              splits.add(this.makeSplit(path, 0L, length, blkLocations(0).getHosts, blkLocations(0).getCachedHosts))
            }
          }else{
            splits.add(this.makeSplit(path, 0L, length, new Array[String](0)))
          }

          job.getConfiguration.setLong("mapreduce.input.fileinputformat.numinputfiles", files.size.toLong)
          sw.stop

          if (LOG.isDebugEnabled) {
            LOG.debug("Total # of splits generated by getSplits: " + splits.size + ", TimeTaken: " + sw.elapsedMillis)
          }
          return splits

        }

      }
    }


    return splits
  }


}


sequenceFileBlockSize  改成自己想要的大小


使用:

val dd = sc.newAPIHadoopFile[BytesWritable,BytesWritable, MySequenceFileInputFormat[BytesWritable,BytesWritable]](sourceDir).flatMap(x=>{
  function(new String(x._2.getBytes))
})


本文出自 “11660039” 博客,请务必保留此出处http://11670039.blog.51cto.com/11660039/1893871

修改SequenceFileInputFormat hdfs blocksize

原文:http://11670039.blog.51cto.com/11660039/1893871

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!