回顾:
在上一篇https://www.cnblogs.com/superlsj/p/11857691.html详细介绍了InputFormat的原理和常见的实现类。总结来说,一个InputFormat是将文件切片----->再转化为<key--value>对转交给Mapper处理。
所以我们看到在InputFormat类中只有两个方法,一个负责切片,一个返回能将切片信息转化为相应的键值对的对象:
public abstract class InputFormat<K, V> { public InputFormat() { } public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException; public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException; }
以KeyValueInputFormat为例:
@Stable public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> { public KeyValueTextInputFormat() { } protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec = (new CompressionCodecFactory(context.getConfiguration())).getCodec(file); return null == codec ? true : codec instanceof SplittableCompressionCodec; } public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) throws IOException { context.setStatus(genericSplit.toString()); return new KeyValueLineRecordReader(context.getConfiguration()); } }
我们知道:当使用KeyValueInputFormat并设置分隔符后,Mapper以分隔符前的内容作为Key,以分隔符后面的内容作为Value,都为Text类型,那么在数据提交到Mapper之前,数据就必须被格式化为满足Mapper接收的格式,这个工作就是由RecordReader来完成的,因此,其泛型也必须与Mapper接收类型一致。顺带一提:isSplitable方法返回文件是否可以切片,当返回false时,表示在格式化输入文件时,不对文件进行切片,而直接进行文本数据至键值对的转化。
设计自己的InputFormat:
现有的那些InputFormat肯定是无法满足现实中花里胡哨的需求的,所以自定义InputFormat是一项不可避免的工作。下面以将三个小文件合并成一个SquenceFile文件(SuquenceFile文件是Hadoop用来村塾二进制形式的key-value对的文件格式),SuquenceFile里面存储三个小文件,存储形式为文件路径+文件名为key,文件内容为value为例,演示自定义InputFormat的流程。
1、自定义InputFormat
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new WholeFileRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } }
2、自定义RecordReader
public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { private boolean notRead = true; private Text key = new Text(); private BytesWritable value = new BytesWritable(); private FSDataInputStream inputStream; private FileSplit fs; /** * 初始化方法,框架会在开始的时候调用此方法, * 因此,一些在RecordReader工作时需要使用的资源可以此方法中初始化 */ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { //转换切片类型到文件切片 fs = (FileSplit)inputSplit; //通过qiepianhuodeluj Path path = fs.getPath(); //通过路径获取文件系统 FileSystem fileSystem = path.getFileSystem(taskAttemptContext.getConfiguration()); //开流 inputStream = fileSystem.open(path); } /** * 此方法用于读取下一组数据,类似于迭代器,如果读到数据返回true * 因为将路径+文件名作为key,文件内容作为value,所以一个文件只会读取一次,要么没读过,要么读过 */ public boolean nextKeyValue() throws IOException, InterruptedException { if(notRead){ //具体读文件的操作 //读Key key.set(fs.getPath().toString()); //读Value byte[] bytes = new byte[(int)fs.getLength()]; inputStream.read(bytes); value.set(bytes,0,bytes.length); notRead = true; return true; }else{ return false; } } /** * 获取当前读到的Key-value对并返回 */ public Text getCurrentKey() throws IOException, InterruptedException { return key; } public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } /** * 返回当前数据的读取进度:0.0~1.0 * 由于本案例中以路径+整个文件名作为Key,只存在一个K-V对, * 所以读取进度只存在两种情况:要么0没读,要么1读完了。 */ public float getProgress() throws IOException, InterruptedException { return notRead ? 0 : 1; } /** * 常用于关闭资源 */ public void close() throws IOException { IOUtils.closeStream(inputStream); } }
3、测试,本案例中Mapper和Redu啥也不用干,所以不用写,用默认提供的就行,是需要写一个Driver。
public class WholeFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(WholeFileDriver.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class);//【注意】 FileInputFormat.setInputPaths(job, new Path("d:\\input")); FileOutputFormat.setOutputPath(job, new Path("d:\\output")); boolean b = job.waitForCompletion(true); System.out.println(b ? 0:1); } }
原文:https://www.cnblogs.com/superlsj/p/11878229.html