用spark读取sequencefile时,非常消耗时间,默认情况下SequenceFileInputFormat切分文件是沿用FIleInputFormat,对于大文件会切成Hdfs block size大小,如果想切的更小,增加spark任务的并法度,可以自己修改:
class MySequenceFileInputFormat[K, V] extends FileInputFormat[K, V] { private val LOG: Log = LogFactory.getLog(classOf[MySequenceFileInputFormat[K, V]]) val sequenceFileBlockSize = 30000000 //手动设置blocksize为30M @throws[IOException] def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[K, V] = new SequenceFileRecordReader override protected def getFormatMinSplitSize: Long = 2000L @throws[IOException] override protected def listStatus(job: JobContext): List[FileStatus] = { val files: List[FileStatus] = super.listStatus(job) val len: Int = files.size var j: Int = 0 var i: Int = 0 while (i < len) { { val file: FileStatus = files.get(i).asInstanceOf[FileStatus] if (file.isDirectory) { val p: Path = file.getPath val fs: FileSystem = p.getFileSystem(job.getConfiguration) files.set(i, fs.getFileStatus(new Path(p, "data"))) } if (files.get(i).asInstanceOf[FileStatus].getLen != 0L) { files.set(j, files.get(i)) j += 1 } } { i += 1; i } } files.subList(0, j) } @throws[IOException] override def getSplits(job: JobContext): List[InputSplit] = { val sw: Stopwatch = (new Stopwatch).start val minSize: Long = Math.max(this.getFormatMinSplitSize, FileInputFormat.getMinSplitSize(job)) val maxSize: Long = FileInputFormat.getMaxSplitSize(job) val splits: ArrayList[InputSplit] = new ArrayList[InputSplit] val files: List[FileStatus] = listStatus(job) val it: Iterator[FileStatus] = files.iterator while(true){ while(true){ while(it.hasNext){ val file: FileStatus = it.next.asInstanceOf[FileStatus] val path: Path = file.getPath val length: Long = file.getLen if(length!=0L){ var blkLocations: Array[BlockLocation] = null if (file.isInstanceOf[LocatedFileStatus]) { blkLocations = file.asInstanceOf[LocatedFileStatus].getBlockLocations } else { val blockSize: FileSystem = path.getFileSystem(job.getConfiguration) blkLocations = blockSize.getFileBlockLocations(file, 0L, length) } if (this.isSplitable(job, path)) { // long blockSize1 = file.getBlockSize(); val blockSize1: Long = sequenceFileBlockSize //手动设置blocksize为50M val splitSize: Long = this.computeSplitSize(blockSize1, minSize, maxSize) var bytesRemaining: Long = 0L var blkIndex: Int = 0 bytesRemaining = length while (bytesRemaining.toDouble / splitSize.toDouble > 1.1D) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining) splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations(blkIndex).getHosts, blkLocations(blkIndex).getCachedHosts)) bytesRemaining -= splitSize } if (bytesRemaining != 0L) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining) splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations(blkIndex).getHosts, blkLocations(blkIndex).getCachedHosts)) } } else { splits.add(this.makeSplit(path, 0L, length, blkLocations(0).getHosts, blkLocations(0).getCachedHosts)) } }else{ splits.add(this.makeSplit(path, 0L, length, new Array[String](0))) } job.getConfiguration.setLong("mapreduce.input.fileinputformat.numinputfiles", files.size.toLong) sw.stop if (LOG.isDebugEnabled) { LOG.debug("Total # of splits generated by getSplits: " + splits.size + ", TimeTaken: " + sw.elapsedMillis) } return splits } } } return splits } }
sequenceFileBlockSize 改成自己想要的大小
使用:
val dd = sc.newAPIHadoopFile[BytesWritable,BytesWritable, MySequenceFileInputFormat[BytesWritable,BytesWritable]](sourceDir).flatMap(x=>{ function(new String(x._2.getBytes)) })
本文出自 “11660039” 博客,请务必保留此出处http://11670039.blog.51cto.com/11660039/1893871
修改SequenceFileInputFormat hdfs blocksize
原文:http://11670039.blog.51cto.com/11660039/1893871