最近一直在找工作,写论文,对MapReduce源代码的学习搁置了很久,想来想去认为不能放弃,有意义的事情一定要做好,要做到底,要尽力。前面的文章到后来写的有些心不在焉,有应付之嫌,如今重新拾起,认真学习,认真写下去。MR 2.0已经发布很久了,新架构新思想很值得学习,学无止境啊。
参考书目:
【1】《Java编程思想(第四版)》
【2】《Hadoop 技术内幕:深入解析MapReduce架构设计与实现原理》
【3】《Hadoop 技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理》
从Map阶段的Collect过程开始吧,力求有所收获。
看MapTask类的入口函数run,根据配置判断启用old mapper或new mapper。如果是前者:
准备一个MapRunner跑用户的map函数。这个MapRunner实现了MapRunnable泛型接口,四个泛型参数分别代表map的输入键值对和输出键值对的类型(INKEY,INVALUE,OUTKEY,OUTVALUE)。对于MapRunner来说,两个泛型参数来自RecordReader<INKEY,INVALUE>对象;另外两个来自OldOutputCollector对象。后者使用MapOutputBuffer<OUTKEY,OUTVALUE>对象构造。MapOutputBuffer类实现了泛型接口MapOutputCollector,因而具有collect功能。这样一个MapRunner就具备了读取数据(read)和输出数据(collect)的功能。MapRunner通过run函数使用上述两个功能对象。
如果是后者,就有了新的一套RecordReader和OutputCollector,并使用了一个Context对象封装上述功能,传入run函数。不打算详细学习这部分内容。
回到old mapper的实现中,前面提到泛型,由于对java中的泛型技术比较陌生,这里详细学习一下MapRunner.run方法中涉及到泛型技术,顺便还有反射的内容。
首先,看RecordReader,它是一个泛型接口。使用泛型而不是普通接口的好处是,实现接口不仅仅具有了接口的功能,同时接口方法的参数和返回值支持多种类型。对于RecordReader来说,支持多种类型的key和value。如果不使用泛型,则在接口中使用key和value类型的基类,这样就只支持基类及其派生类,不支持该派生体系外的类型。Java支持泛型方法,这使得方法能够独立于类产生变化【1】。如果能使用泛型方法解决问题,就不使用泛型类。MapTask类的runOldMapper方法就是一个泛型方法,其签名如下:
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runOldMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter )MapTask本身不是个泛型类。四个泛型参数在构造MapRunnable对象时使用:
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job);MapRunnable也是个泛型接口,其run方法的参数RecordReader和OutputCollector使用了泛型参数:
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, Reporter reporter)再看runner的构造,使用ReflectionUtils的静态方法实现,该类是MapReduce提供的一个反射工具类。newInstance方法是个静态泛型方法:
public static <T> T newInstance(Class<T> theClass, Configuration conf) { T result; try { Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); if (meth == null) { meth = theClass.getDeclaredConstructor(EMPTY_ARRAY); meth.setAccessible(true); CONSTRUCTOR_CACHE.put(theClass, meth); } result = meth.newInstance(); } catch (Exception e) { throw new RuntimeException(e); } setConf(result, conf); return result; }
该方法的作用是根据给定的类型和配置创建对象。在Java中,一个static方法无法访问泛型类的泛型参数。因此,如果static方法需要使用泛型能力,就必须使其成为泛型方法【1】。静态泛型方法经常用于一些工具类作为创建对象的工具。具体看方法实现,首先查看缓存中有没有该类型的构造方法对象,这个缓存对象是这样实现的:
/** * Cache of constructors for each class. Pins the classes so they * can‘t be garbage collected until ReflectionUtils can be collected. */ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = new ConcurrentHashMap<Class<?>, Constructor<?>>();
public class Maps{ public static <K,V> Map<K,V> map(){ return new HashMap<K,V>(); } public static <K,V> ConcurrentHashMap<K,V> cMap(){ return new ConcurrentHashMap<K,V>(); } public static void main(String[] args){ Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.cMap(); } }
final Constructor<T> c = getConstructor0(empty, Member.DECLARED); cachedConstructor = c; Constructor<T> tmpConstructor = cachedConstructor; return tmpConstructor.newInstance((Object[])null);
回到run方法,MapRunnable的run有两种实现:一是普通的MapRunner,另一个是多线程MultithreadedMapRunner,这里先学习前者。每个MapRunner对象中都有一个Mapper泛型对象用于执行用户提交的Map函数。
this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job); mapper.map(key, value, output, reporter);
前面的文章提到过,collect方法是由用户的map函数调用的,例如Grep应用的mapper类RegexMap类中的map函数:
public void map(K key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException { String text = value.toString(); Matcher matcher = pattern.matcher(text); while (matcher.find()) { output.collect(new Text(matcher.group(group)), new LongWritable(1)); }
collector.collect(key, value, partitioner.getPartition(key, value, numPartitions));
private final ReentrantLock spillLock = new ReentrantLock();
Thinking in Java中对可重入互斥锁的解释为:ReentrantLock允许你尝试着获取但最终未获取锁,这样如果其他人已经获取了这个锁,那你就可以决定离开去执行其他一些事情,而不是等待直至这个锁被释放。在Java中显式使用锁对象Lock的情况比较少,因为Lock对象必须显式地创建、锁定和释放。但有时synchronized关键字不能实现一些特殊需求:尝试着获取锁且最终获取失败,或者尝试获取锁一段时间,然后放弃。这里举一个书上的例子:
private ReentrantLock lock = new ReentrantLock(); public void untimed(){ boolean captured = lock.tryLock(); try{ System.out.println("tryLock(): " + captured); }finally{ if(captured) lock.unlock(); } }
lock.tryLock()如果没有获得锁,captured为false,此时不会阻塞线程,而是会继续执行下面语句输出一行。在finally块中,根据是否捕获到锁来释放锁。另外,在ReentrantLock上阻塞的任务具备可以被中断的能力,这与在synchronized方法或临界区上阻塞的任务不同(后者是不可中断的阻塞,不会抛出InterruptedException异常)。在collect方法中之所以使用可重入锁,我想就是因为使用上述后一种特性,使其在阻塞时可中断,抛出异常。在MapOutputBuffer中还定义了两个条件变量spillReady和spillDone
private final Condition spillDone = spillLock.newCondition(); private final Condition spillReady = spillLock.newCondition();
spillReady.signal();
spillDone.await();
public void run() { spillLock.lock(); spillThreadRunning = true; try { while (true) { spillDone.signal(); while (kvstart == kvend) { spillReady.await(); } try { spillLock.unlock(); sortAndSpill(); }finally { spillLock.lock(); ... kvstart = kvend; bufstart = bufend; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { spillLock.unlock(); spillThreadRunning = false; } } }
首先获得锁,然后唤醒在spillDone上等待的collect线程,表明spill溢写结束,可以继续写入缓冲区了。当进入正常写入状态后(kvstart==kvend),调用spillReady.await(),挂起spill线程,暂停溢写,直到collect线程再次调用startSpill方法。当spill线程被唤醒并再次获得锁时,调用sortAndSpill对缓冲区数据进行依次快速排序然后写入磁盘。
注意,在调用await,signal和signalAll之前必须拥有锁,这里两个Condition变量在使用前必须拥有锁spillLock。collect同步控制的核心逻辑如下,用于对比:
spillLock.lock(); try { boolean kvfull; do { // sufficient acct space kvfull = kvnext == kvstart; final boolean kvsoftlimit = ((kvnext > kvend) ? kvnext - kvend > softRecordLimit : kvend - kvnext <= kvoffsets.length - softRecordLimit); if (kvstart == kvend && kvsoftlimit) { startSpill(); } if (kvfull) { while (kvstart != kvend) { spillDone.await(); } } } while (kvfull); } finally { spillLock.unlock(); }
有一个疑问,如果spill线程发现collect正在写缓冲区而挂起,那么spill获得的锁就挂起了,collect就获得不到锁了,也就无法调用startSpill方法去唤醒挂起的spill线程,这样岂不是死锁了?
研究了相关资料,找到答案:await操作在调用时会先释放锁,然后挂起线程,并将线程加入一个等待队列。在调用signal时会让等待队列中第一个线程重新获得锁,并继续执行。这样spill挂起时,spillLock被释放掉,collect线程会持续获得锁,直到满足spill条件,调用startSpill方法,唤醒挂起的spill线程,当collect释放锁时,spill线程会重新获得spillLock,并继续执行。
使用Condition对象的目的相当于在锁上加上一个条件,实现更精细的同步控制。这里在同一个锁spillLock上使用了两个条件,spillReady条件表示只有缓冲区满足一定条件才能发生spill,读取缓冲区(消费者行为);spillDone条件表示只有满足一定条件(在这里只有缓冲区满的时候collect线程才挂起,即便是spill正在进行,缓冲区依然可以写入,读写不冲突,这体现了环形缓冲区的优势)才能写缓冲区(生产者行为)。
这个地方关于条件的锁的理解有什么错误或不足请不吝赐教。关于缓冲区的操作比较复杂,参考书目【2】第8章内容中作者针对各类情况给出了图文描述,推荐阅读。
看看sortAndSpill做了些什么,首先对bufstart和bufend下标之间的数据排序:
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
Reducer<K,V,K,V> combiner = ReflectionUtils.newInstance(combinerClass, job); try { CombineValuesIterator<K,V> values = new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, valueClass, job, Reporter.NULL, inputCounter); while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL); values.nextKey(); } } finally { combiner.close(); }
最后研究一下Hadoop中的序列化【3】。Hadoop重新定义了序列化的机制,原因是:在Java序列化的过程中,序列化输出中保存了大量的附加信息,导致序列化结果膨胀,对于需要保存和处理大规模数据的Hadoop来说,需要一个新的序列化机制。使用Java实现对象的序列化简单概括为:
1. 实现Serializable接口。
2. 在某种OutputStream的基础上创建ObjectOutputStream对象。
3. 调用writeObject方法进行序列化。
反序列化的过程类似,只需要使用对应的输入流,并调用readObject即可。Hadoop对序列化过程的优化为:同一个类对象的序列化结果只输出一份元数据;重用对象,在已有对象上进行反序列化操作。
具体的实现机制是:
可序列化的对象需要实现Writable接口,该接口含有两个方法:write(DataOutput out) 和 readFields(DataInput in)。前者借助Java的DataOutput类对象将Java原生类型以字节形式写入二进制流,后者从二进制流中读取字段。
Hadoop还提供了带有比较功能的WritableComparable接口,具有高效比较能力的RawComparator接口。前者兼具比较和序列化的功能;后者可以比较流中读取的未被反序列化为对象的记录,节省了创建对象的开销,十分高效。
Hadoop中的Text类型即为一种常见的键类型,其声明如下:
public class Text extends BinaryComparable implements WritableComparable<BinaryComparable> {}
public class ObjectWritable implements Writable, Configurable { private Class declaredClass;//实际类型的类对象 private Object instance;//需要序列化的对象 private Configuration conf; public ObjectWritable() {} public ObjectWritable(Object instance) { set(instance); } public ObjectWritable(Class declaredClass, Object instance) { this.declaredClass = declaredClass; this.instance = instance; } ... }
Hadoop还提供了简单的序列化框架API。通过Serialization实现获得一个Serializer对象,可将一个对象转换为一个字节流的实现实例。在collect过程中,会将key/value序列化到缓冲区中。这里使用了Serializer:
private final Serializer<K> keySerializer; private final Serializer<V> valSerializer;
static class WritableSerializer implements Serializer<Writable> { private DataOutputStream dataOut; public void open(OutputStream out) { if (out instanceof DataOutputStream) { dataOut = (DataOutputStream) out; } else { dataOut = new DataOutputStream(out); } } public void serialize(Writable w) throws IOException { w.write(dataOut); } public void close() throws IOException { dataOut.close(); } }
keySerializer.serialize(key); valSerializer.serialize(value);
Collect过程结束,以后可能会就某个话题有补充。下篇计划学习Reduce任务的过程。
2014.04.01
MapReduce任务执行过程研究之Collect过程,布布扣,bubuko.com
原文:http://blog.csdn.net/jaytalent/article/details/21335847