InputFormat是MapReduce编程模型包括5个可编程组件之一,其余4个是Mapper、Partitioner、Reducer和OutputFormat。
新版Hadoop InputFormat是一个抽象类,之前的InputFormat是一个接口。
InputFormat类有两个抽象方法。
方法getSplits将输入数据切分成InputSlits,InputSplits的个数即为map tasks的个数,InputSplits的大小默认为块大小,即64M
public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
方法createRecordReader将每个InputSplit解析成RecordReader, 再依次将RecordReader解析成<K,V>对
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,InterruptedException;
也就是说InputFormat完成以下工作:
自己实现的一个RecordReader
package tokenize.inputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; public class MyRecordReader extends RecordReader<Text, Text> { private CombineFileSplit combineFileSplit; // 当前处理的分片 private int totalLength; // 分片包含的文件数量 private int currentIndex; // 当前处理的文件索引 private float currentProgress = 0; // 当前的进度 private Text currentKey = new Text(); // 当前的Key private Text currentValue = new Text(); // 当前的Value private Configuration conf; // 任务信息 private boolean processed; // 记录当前文件是否已经读取 public MyRecordReader(CombineFileSplit combineFileSplit, TaskAttemptContext context, Integer index) throws IOException { super(); this.currentIndex = index; this.combineFileSplit = combineFileSplit; conf = context.getConfiguration(); totalLength = combineFileSplit.getPaths().length; processed = false; } @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { } @Override public Text getCurrentKey() throws IOException, InterruptedException { return currentKey; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return currentValue; } @Override public float getProgress() throws IOException { if (currentIndex >= 0 && currentIndex < totalLength) { currentProgress = (float) currentIndex / totalLength; return currentProgress; } return currentProgress; } @Override public void close() throws IOException { } @Override public boolean nextKeyValue() throws IOException { if (!processed) { // 如果文件未处理则读取文件并设置key-value // set key Path file = combineFileSplit.getPath(currentIndex); currentKey.set(file.getParent().getName()); // category‘s name // set value FSDataInputStream in = null; byte[] contents = new byte[(int)combineFileSplit.getLength(currentIndex)]; try { FileSystem fs = file.getFileSystem(conf); in = fs.open(file); in.readFully(contents); currentValue.set(contents); } catch (Exception e) { } finally { in.close(); } processed = true; return true; } return false; //如果文件已经处理,必须返回false } }
package tokenize.inputformat; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; public class MyInputFormat extends CombineFileInputFormat<Text, Text> { /** * make sure file will not be splitted */ @Override protected boolean isSplitable(JobContext context, Path file) { return false; } /** * specify record reader */ @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException { CombineFileRecordReader<Text, Text> recordReader = new CombineFileRecordReader<Text, Text>( (CombineFileSplit)split, context, MyRecordReader.class); return recordReader; } }
Hadoop InputFormat详解,布布扣,bubuko.com
原文:http://www.cnblogs.com/pingh/p/3652827.html