转自:http://blog.csdn.net/androidlushangderen/article/details/41114259
昨天经过几个小时的学习,把MapReduce的第一个阶段的过程学习了一下,也就是最最开始的时候从文件中的Data到key-value的映射,也就是InputFormat的过程。虽说过程不是很难,但是也存在很多细节的。也很少会有人对此做比较细腻的研究,学习。今天,就让我来为大家剖析一下这段代码的原理。我还为此花了一点时间做了几张结构图,便于大家理解。在这里先声明一下,我研究的MapReduce主要研究的是旧版的API,也就是mapred包下的。
InputFormat最最原始的形式就是一个接口。后面出现的各种Format都是他的衍生类。结构如下,只包含最重要的2个方法:
- public interface InputFormat<K, V> {
-
-
- InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
-
-
- RecordReader<K, V> getRecordReader(InputSplit split,
- JobConf job,
- Reporter reporter) throws IOException;
- }
所以后面讲解,我也只是会围绕这2个方法进行分析。当然我们用的最多的是从文件中获得输入数据,也就是FileInputFormat这个类。继承关系如下:
- public abstract class FileInputFormat<K, V> implements InputFormat<K, V>
我们看里面的1个主要方法:
- public InputSplit[] getSplits(JobConf job, int numSplits)
返回的类型是一个InputSpilt对象,这是一个抽象的输入Spilt分片概念。结构如下:
- public interface InputSplit extends Writable {
-
-
- long getLength() throws IOException;
-
-
- String[] getLocations() throws IOException;
- }
提供了与数据相关的2个方法。后面这个返回的值会被用来传递给RecordReader里面去的。在想理解getSplits方法之前还有一个类需要理解,FileStatus,里面包装了一系列的文件基本信息方法:
- public class FileStatus implements Writable, Comparable {
-
- private Path path;
- private long length;
- private boolean isdir;
- private short block_replication;
- private long blocksize;
- private long modification_time;
- private long access_time;
- private FsPermission permission;
- private String owner;
- private String group;
.....
看到这里你估计会有点晕了,下面是我做的一张小小类图关系:

可以看到,FileSpilt为了兼容新老版本,继承了新的抽象类InputSpilt,同时附上旧的接口形式的InputSpilt。下面我们看看里面的getspilt核心过程:
- @SuppressWarnings("deprecation")
- public InputSplit[] getSplits(JobConf job, int numSplits)
- throws IOException {
-
- FileStatus[] files = listStatus(job);
-
-
-
- job.setLong(NUM_INPUT_FILES, files.length);
- long totalSize = 0;
-
- for (FileStatus file: files) {
- if (file.isDir()) {
-
- throw new IOException("Not a file: "+ file.getPath());
- }
- totalSize += file.getLen();
- }
-
-
- long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
-
- long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
- minSplitSize);
-
-
-
- ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
- NetworkTopology clusterMap = new NetworkTopology();
- for (FileStatus file: files) {
- Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job);
- long length = file.getLen();
-
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-
- if ((length != 0) && isSplitable(fs, path)) {
-
- long blockSize = file.getBlockSize();
-
- long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-
- long bytesRemaining = length;
-
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-
- String[] splitHosts = getSplitHosts(blkLocations,
- length-bytesRemaining, splitSize, clusterMap);
-
- splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
- splitHosts));
-
- bytesRemaining -= splitSize;
- }
-
- if (bytesRemaining != 0) {
-
- splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
- blkLocations[blkLocations.length-1].getHosts()));
- }
- } else if (length != 0) {
-
- String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
- splits.add(new FileSplit(path, 0, length, splitHosts));
- } else {
-
- splits.add(new FileSplit(path, 0, length, new String[0]));
- }
- }
-
-
- LOG.debug("Total # of splits: " + splits.size());
- return splits.toArray(new FileSplit[splits.size()]);
- }
里面有个computerSpiltSize方法很特殊,考虑了很多情况,总之最小值不能小于系统设定的最小值。要与期待值,块大小,系统允许最小值:
- protected long computeSplitSize(long goalSize, long minSize,
- long blockSize) {
- return Math.max(minSize, Math.min(goalSize, blockSize));
- }
上述过程的相应流程图如下:

3种情况3中年执行流程。
处理完getSpilt方法然后,也就是说已经把数据从文件中转划到InputSpilt中了,接下来就是给RecordRead去取出里面的一条条的记录了。当然这在FileInputFormat是抽象方法,必须由子类实现的,我在这里挑出了2个典型的子类SequenceFileInputFormat,和TextInputFormat。他们的实现RecordRead方法如下:
- public RecordReader<K, V> getRecordReader(InputSplit split,
- JobConf job, Reporter reporter)
- throws IOException {
-
- reporter.setStatus(split.toString());
-
- return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);
- }
- public RecordReader<LongWritable, Text> getRecordReader(
- InputSplit genericSplit, JobConf job,
- Reporter reporter)
- throws IOException {
-
- reporter.setStatus(genericSplit.toString());
- return new LineRecordReader(job, (FileSplit) genericSplit);
- }
可以看到里面的区别就在于LineRecordReader和SequenceFileRecordReader的不同了,这也就表明2种方式对应于数据的读取方式可能会不一样,继续往里深入看:
- public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> {
-
- private SequenceFile.Reader in;
- private long start;
- private long end;
- private boolean more = true;
- protected Configuration conf;
-
- public SequenceFileRecordReader(Configuration conf, FileSplit split)
- throws IOException {
- Path path = split.getPath();
- FileSystem fs = path.getFileSystem(conf);
-
- this.in = new SequenceFile.Reader(fs, path, conf);
- this.end = split.getStart() + split.getLength();
- this.conf = conf;
-
- if (split.getStart() > in.getPosition())
- in.sync(split.getStart());
-
- this.start = in.getPosition();
- more = start < end;
- }
-
- ......
-
-
- public synchronized boolean next(K key, V value) throws IOException {
-
- if (!more) return false;
- long pos = in.getPosition();
- boolean remaining = (in.next(key) != null);
- if (remaining) {
- getCurrentValue(value);
- }
- if (pos >= end && in.syncSeen()) {
- more = false;
- } else {
- more = remaining;
- }
- return more;
- }
我们可以看到SequenceFileRecordReader是从输入流in中一个键值,一个键值的读取,另外一个的实现方式如下:
- public class LineRecordReader implements RecordReader<LongWritable, Text> {
- private static final Log LOG
- = LogFactory.getLog(LineRecordReader.class.getName());
-
- private CompressionCodecFactory compressionCodecs = null;
- private long start;
- private long pos;
- private long end;
- private LineReader in;
- int maxLineLength;
-
- ....
-
-
- public synchronized boolean next(LongWritable key, Text value)
- throws IOException {
-
- while (pos < end) {
-
- key.set(pos);
-
-
- int newSize = in.readLine(value, maxLineLength,
- Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
- maxLineLength));
- if (newSize == 0) {
- return false;
- }
- pos += newSize;
- if (newSize < maxLineLength) {
- return true;
- }
-
-
- LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
- }
-
- return false;
- }
实现的方式为通过读的位置,从输入流中逐行读取key-value。通过这2种方法,就能得到新的key-value,就会用于后面的map操作。
InputFormat的整个流程其实我忽略了很多细节。大体流程如上述所说。
MapReduce的InputFormat过程的学习
原文:http://www.cnblogs.com/cxzdy/p/5043991.html