Hadoop中决定map个数的的因素有几个,由于版本的不同,决定因素也不一样,掌握这些因素对了解hadoop分片的划分有很大帮助,
并且对优化hadoop性能也很有大的益处。
旧API中getSplits方法:
1 public InputSplit[] getSplits(JobConf job, int numSplits) 2 throws IOException { 3 FileStatus[] files = listStatus(job); 4 5 // Save the number of input files in the job-conf 6 job.setLong(NUM_INPUT_FILES, files.length); 7 long totalSize = 0; // compute total size 8 for (FileStatus file: files) { // check we have valid files 9 if (file.isDir()) { 10 throw new IOException("Not a file: "+ file.getPath()); 11 } 12 totalSize += file.getLen(); 13 } 14 15 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); 16 long minSize = Math.max(job.getLong("mapred.min.split.size", 1), 17 minSplitSize); 18 19 // generate splits 20 ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); 21 NetworkTopology clusterMap = new NetworkTopology(); 22 for (FileStatus file: files) { 23 Path path = file.getPath(); 24 FileSystem fs = path.getFileSystem(job); 25 long length = file.getLen(); 26 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); 27 if ((length != 0) && isSplitable(fs, path)) { 28 long blockSize = file.getBlockSize(); 29 long splitSize = computeSplitSize(goalSize, minSize, blockSize); 30 31 long bytesRemaining = length; 32 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 33 String[] splitHosts = getSplitHosts(blkLocations, 34 length-bytesRemaining, splitSize, clusterMap); 35 splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 36 splitHosts)); 37 bytesRemaining -= splitSize; 38 } 39 40 if (bytesRemaining != 0) { 41 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 42 blkLocations[blkLocations.length-1].getHosts())); 43 } 44 } else if (length != 0) { 45 String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); 46 splits.add(new FileSplit(path, 0, length, splitHosts)); 47 } else { 48 //Create empty hosts array for zero length files 49 splits.add(new FileSplit(path, 0, length, new String[0])); 50 } 51 } 52 LOG.debug("Total # of splits: " + splits.size()); 53 return splits.toArray(new FileSplit[splits.size()]); 54 } 55 56 protected long computeSplitSize(long goalSize, long minSize, 57 long blockSize) { 58 return Math.max(minSize, Math.min(goalSize, blockSize)); 59 }
新API中getSplits方法:
1 public List<InputSplit> getSplits(JobContext job 2 ) throws IOException { 3 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 4 long maxSize = getMaxSplitSize(job); 5 6 // generate splits 7 List<InputSplit> splits = new ArrayList<InputSplit>(); 8 List<FileStatus>files = listStatus(job); 9 for (FileStatus file: files) { 10 Path path = file.getPath(); 11 FileSystem fs = path.getFileSystem(job.getConfiguration()); 12 long length = file.getLen(); 13 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); 14 if ((length != 0) && isSplitable(job, path)) { 15 long blockSize = file.getBlockSize(); 16 long splitSize = computeSplitSize(blockSize, minSize, maxSize); 17 18 long bytesRemaining = length; 19 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 20 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 21 splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 22 blkLocations[blkIndex].getHosts())); 23 bytesRemaining -= splitSize; 24 } 25 26 if (bytesRemaining != 0) { 27 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 28 blkLocations[blkLocations.length-1].getHosts())); 29 } 30 } else if (length != 0) { 31 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts())); 32 } else { 33 //Create empty hosts array for zero length files 34 splits.add(new FileSplit(path, 0, length, new String[0])); 35 } 36 } 37 38 // Save the number of input files in the job-conf 39 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 40 41 LOG.debug("Total # of splits: " + splits.size()); 42 return splits; 43 } 44 45 protected long computeSplitSize(long blockSize, long minSize, 46 long maxSize) { 47 return Math.max(minSize, Math.min(maxSize, blockSize)); 48 }
测试一个输入文件大小为:0.52 KB 日志如下:
new :
blockSize:67108864 minSize:1 maxSize:9223372036854775807
splitSize:67108864
决定因素为 blockSize的大小.这个很容易理解
old:
blockSize:67108864 totalSize:529 numSplits:2 goalSize:264 minSplitSize:1 minSize:1
splitSize:264
numSplits为2,这个是在调用getSplits中传入的,这个地方要注意,经过查找发现这个参数为job.getNumMapTasks()的值如下
JobConf: public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); }
mapred-default.xml中:
<property>
<name>mapred.map.tasks</name>
<value>2</value>
<description>The default number of map tasks per job.
Ignored when mapred.job.tracker is "local".
</description>
</property>
所以使用旧的API编写的MP程序,会产生2个map,而使用新的API则会产生1个map.
原文:http://www.cnblogs.com/lvfeilong/p/23849jffdslkfjd.html