protected class SpillThread extends Thread { @Override public void run() { spillLock.lock(); spillThreadRunning = true; try { while (true) { spillDone.signal(); while (kvstart == kvend) { // 等待被唤醒 spillReady.await(); } try { spillLock.unlock(); // spill处理 sortAndSpill(); } catch (...) { ... } finally { spillLock.lock(); // 重置索引区,更新buf缓冲区的尾部位置信息 if (bufend < bufindex && bufindex < bufstart) { bufvoid = kvbuffer.length; } kvstart = kvend; bufstart = bufend; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { spillLock.unlock(); spillThreadRunning = false; } } }
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { // part1 // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); // part2 final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend; sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); int spindex = kvstart; IndexRecord rec = new IndexRecord(); InMemValBytes value = new InMemValBytes(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { // part3 long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); // part4 if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { final int kvoff = kvoffsets[spindex % kvoffsets.length]; getVBytesForOffset(kvoff, value); key.reset(kvbuffer, kvindices[kvoff + KEYSTART], (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART])); writer.append(key, value); ++spindex; } } else { // part5 int spstart = spindex; while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } } // part6 // close the writer writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, i); writer = null; } finally { if (null != writer) writer.close(); } } // part7 if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); } }
public SpillRecord(int numPartitions) { buf = ByteBuffer.allocate( numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH); entries = buf.asLongBuffer(); }
创建文件流,文件的路径是根据numspill来产生的,第一个溢出的文件就是spill0.out,以此类推后续的溢出的文件就是spill0.out,spill1.out ...
private static void sortInternal(final IndexedSortable s, int p, int r, final Progressable rep, int depth) { if (null != rep) { rep.progress(); } while (true) { if (r-p < 13) { //p为其实位置,r为结束位置,s为MapOutputBuffer for (int i = p; i < r; ++i) { for (int j = i; j > p && s.compare(j-1, j) > 0; --j) { s.swap(j, j-1); } } return; } .... }
public int compare(int i, int j) { final int ii = kvoffsets[i % kvoffsets.length]; final int ij = kvoffsets[j % kvoffsets.length]; // sort by partition if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) { return kvindices[ii + PARTITION] - kvindices[ij + PARTITION]; } // sort by key return comparator.compare(kvbuffer, kvindices[ii + KEYSTART], kvindices[ii + VALSTART] - kvindices[ii + KEYSTART], kvbuffer, kvindices[ij + KEYSTART], kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]); } public void swap(int i, int j) { i %= kvoffsets.length; j %= kvoffsets.length; int tmp = kvoffsets[i]; kvoffsets[i] = kvoffsets[j]; kvoffsets[j] = tmp; }
KeyLength |
valueLength |
key |
Value |
KeyLength |
valueLength |
key |
Value |
4字节CRC |
//用于io操作的输出流,基于checksum的流产生 FSDataOutputStream out; //记录原始的输出流,也就是第一部分中产生的文件流 FSDataOutputStream rawOut; //基于文件流产生的checksum输出流,特点是write时内部会做crc IFileOutputStream checksumOut; //key,value的序列化,和"核心成员变量中的key,value序列化类一样的功能" Class<K> keyClass; Class<V> valueClass; Serializer<K> keySerializer; Serializer<V> valueSerializer; public Writer(Configuration conf, FSDataOutputStream out, Class<K> keyClass, Class<V> valueClass, CompressionCodec codec, Counters.Counter writesCounter) throws IOException { //根据文件流封了一层可以在输出时做crc this.checksumOut = new IFileOutputStream(out); this.rawOut = out; this.start = this.rawOut.getPos(); if (codec != null) { ... } else { //writer内部用于io输出的流是基于checksumOut产生的。 this.out = new FSDataOutputStream(checksumOut,null); } // key,value序列化类,是输出key,value到buffer中,真正写的时候从buffer中取出 this.keyClass = keyClass; this.valueClass = valueClass; SerializationFactory serializationFactory = new SerializationFactory(conf); this.keySerializer = serializationFactory.getSerializer(keyClass); this.keySerializer.open(buffer); this.valueSerializer = serializationFactory.getSerializer(valueClass); this.valueSerializer.open(buffer); }
public void append(DataInputBuffer key, DataInputBuffer value) throws IOException { int keyLength = key.getLength() - key.getPosition(); if (keyLength < 0) { throw new IOException("Negative key-length not allowed: " + keyLength + " for " + key); } int valueLength = value.getLength() - value.getPosition(); if (valueLength < 0) { throw new IOException("Negative value-length not allowed: " + valueLength + " for " + value); } WritableUtils.writeVInt(out, keyLength); WritableUtils.writeVInt(out, valueLength); out.write(key.getData(), key.getPosition(), keyLength); out.write(value.getData(), value.getPosition(), valueLength); // Update bytes written decompressedBytesWritten += keyLength + valueLength + WritableUtils.getVIntSize(keyLength) + WritableUtils.getVIntSize(valueLength); ++numRecordsWritten; }
public synchronized void collect(K key, V value) throws IOException { outCounter.increment(1); writer.append(key, value); if ((outCounter.getValue() % progressBar) == 0) { progressable.progress(); } }
combinerRunner.combine(kvIter, combineCollector);是如何执行的呢,这里会因为combinerRunner的不同而不同,我们关注的是旧的MR处理,因此我们跟踪到OldCombinerRunner.combiner,可以看到流程实际上非常简单,整个迭代的过程是判断是否还有数据没有被处理掉,有则一直循环,依次调用reduce函数,每处理一次相同的key的数据后,通过nextKey切换到下一个不同的key再次重复。在用户的reduce函数内,因为collector是CombineOutputCollector,因此用户在collector.collect输出key,value,实际上是输出到IFile.Writer流中。
protected void combine(RawKeyValueIterator kvIter, OutputCollector<K,V> combineCollector ) throws IOException { //combiner是一个Reduer Reducer<K,V,K,V> combiner = ReflectionUtils.newInstance(combinerClass, job); try { //取得value的迭代器 CombineValuesIterator<K,V> values = new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, valueClass, job, Reporter.NULL, inputCounter); //判断依据是spstart是否走到spindex while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL); //跳过相同key直到读取到下一个不相同的key values.nextKey(); } } finally { combiner.close(); } }
Reducer的处理函数reduce大家都知道入参是key和一串value,这里的一串value通过Iterator来表示。void reduce(K2 key,Iterator<V2> values, OutputCollector<K3, V3> output, Reporterreporter) throws IOException;
private void readNextKey() throws IOException { //根据spstart是否到达spindex指向的区间尾部 more = in.next(); if (more) { DataInputBuffer nextKeyBytes = in.getKey(); keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength() - nextKeyBytes.getPosition()); //将keyIn中的key反序列化到nextKey变量中 nextKey = keyDeserializer.deserialize(nextKey); //判断是否还有相同的key存在 hasNext = key != null && (comparator.compare(key, nextKey) == 0); } else { hasNext = false; } } public boolean hasNext() { return hasNext; } private int ctr = 0; public VALUE next() { if (!hasNext) { throw new NoSuchElementException("iterate past last value"); } try { //返回相同的key的value,读取下一个key,如果key相同, //下一次仍会进入到next函数中,读取到相同key的value readNextValue(); readNextKey(); } catch (IOException ie) { throw new RuntimeException("problem advancing post rec#"+ctr, ie); } reporter.progress(); return value; } //在每调用一次reduce处理完相同key所对应的一串value, //会通过nextKey函数取得下一个不同的key,重新进入到reduce函数。 /** Start processing next unique key. */ void nextKey() throws IOException { //读取到下一个不同的key,实际上combiner的处理,是不会进入到while循环内 while (hasNext) { readNextKey(); } ++ctr; // move the next key to the current one KEY tmpKey = key; key = nextKey; nextKey = tmpKey; hasNext = more; }
public void close() throws IOException { // Close the serializers keySerializer.close(); valueSerializer.close(); // Write EOF_MARKER for key/value length WritableUtils.writeVInt(out, EOF_MARKER); WritableUtils.writeVInt(out, EOF_MARKER); decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER); //Flush the stream out.flush(); if (compressOutput) { // Flush compressedOut.finish(); compressedOut.resetState(); } // Close the underlying stream iff we own it... if (ownOutputStream) { out.close(); } else { //写入checksum checksumOut.finish(); } compressedBytesWritten = rawOut.getPos() - start; if (compressOutput) { // Return back the compressor CodecPool.returnCompressor(compressor); compressor = null; } out = null; if(writtenRecordsCounter != null) { writtenRecordsCounter.increment(numRecordsWritten); } }
public void finish() throws IOException { if (finished) { return; } finished = true; sum.writeValue(barray, 0, false); out.write (barray, 0, sum.getChecksumSize()); out.flush(); }
每个IFile数据文件就有一个对应的索引文件和它一一对应,这个索引文件有可能在内存,也有可能在磁盘上真实存在的索引文件。IFile文件对应的的索引信息会在满足条件的情况下内存中缓存着,一个IFile对应的索引信息封装在SpillRecord中,这个索引信息SpillRecord存储在indexCacheList中,当索引的cache超过1M大小后,那么会将后来产生的索引信息输出到磁盘上形成一个索引文件。这个索引文件的文件名为"spill"+ spillNumber +".out.index",spillNumber就是:numSpills变量所记录的当前进行到第几次spill。
索引对应的数据 |
索引文件存储内容 |
Spill0的partition0 |
startOffset |
rawLength |
partLength |
Spill0的partition1 |
startOffset |
rawLength |
partLength |
Spill1的partition0 |
startOffset |
rawLength |
partLength |
Spill1的partition1 |
startOffset |
rawLength |
partLength |
8字节的crc |
runOldMapper: runner.run(in, new OldOutputCollector(collector, conf), reporter); collector.flush(); // MapOutputBuffer.flush public synchronized void flush() throws IOException, ClassNotFoundException, InterruptedException { LOG.info("Starting flush of map output"); spillLock.lock(); try { //等待正在进行中的spill动作完成 while (kvstart != kvend) { reporter.progress(); spillDone.await(); } if (sortSpillException != null) { throw (IOException)new IOException("Spill failed" ).initCause(sortSpillException); } //缓存中剩余的数据,需要触发一次spill动作将剩余数据spill到磁盘上 if (kvend != kvindex) { kvend = kvindex; bufend = bufmark; sortAndSpill(); } } catch (InterruptedException e) { throw (IOException)new IOException( "Buffer interrupted while waiting for the writer" ).initCause(e); } finally { spillLock.unlock(); } assert !spillLock.isHeldByCurrentThread(); // shut down spill thread and wait for it to exit. Since the preceding // ensures that it is finished with its work (and sortAndSpill did not // throw), we elect to use an interrupt instead of setting a flag. // Spilling simultaneously from this thread while the spill thread // finishes its work might be both a useful way to extend this and also // sufficient motivation for the latter approach. try { spillThread.interrupt(); spillThread.join(); } catch (InterruptedException e) { throw (IOException)new IOException("Spill failed" ).initCause(e); } // release sort buffer before the merge kvbuffer = null; //触发将小的spill文件合并为大的spill文件。 mergeParts(); Path outputPath = mapOutputFile.getOutputFile(); fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen()); } merge的动作会有mergeParts函数触发,先看该函数的实现: private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException { // get the approximate size of the final output/index files long finalOutFileSize = 0; long finalIndexFileSize = 0; final Path[] filename = new Path[numSpills]; final TaskAttemptID mapId = getTaskID(); //获取磁盘上的所有spill文件的文件名 for(int i = 0; i < numSpills; i++) { filename[i] = mapOutputFile.getSpillFile(i); finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); } //只有一个spill文件,那么只需要将数据文件和索引文件rename即可 if (numSpills == 1) { //the spill is the final output rfs.rename(filename[0], new Path(filename[0].getParent(), "file.out")); if (indexCacheList.size() == 0) { rfs.rename(mapOutputFile.getSpillIndexFile(0), new Path(filename[0].getParent(),"file.out.index")); } else { indexCacheList.get(0).writeToFile( new Path(filename[0].getParent(),"file.out.index"), job); } return; } // read in paged indices //加载因内存中缓存不下而刷出到磁盘上的索引文件到内存中 for (int i = indexCacheList.size(); i < numSpills; ++i) { Path indexFileName = mapOutputFile.getSpillIndexFile(i); indexCacheList.add(new SpillRecord(indexFileName, job, null)); } //make correction in the length to include the sequence file header //lengths for each partition finalOutFileSize += partitions * APPROX_HEADER_LENGTH; finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; //获取最终的spill文件和index文件的路径名 Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); //没有触发过spill动作,则形成一个空的spill文件和index文件。 if (numSpills == 0) { //create dummy files IndexRecord rec = new IndexRecord(); SpillRecord sr = new SpillRecord(partitions); try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null); writer.close(); rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); sr.putIndex(rec, i); } sr.writeToFile(finalIndexFile, job); } finally { finalOut.close(); } return; } { IndexRecord rec = new IndexRecord(); final SpillRecord spillRec = new SpillRecord(partitions); for (int parts = 0; parts < partitions; parts++) { //create the segments to be merged //抽取spill文件中属于该paritition的索引形成一个segment //所有属于同一个分区的信息性能一个segment列表 List<Segment<K,V>> segmentList = new ArrayList<Segment<K, V>>(numSpills); for(int i = 0; i < numSpills; i++) { IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); Segment<K,V> s = new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset, indexRecord.partLength, codec, true); segmentList.add(i, s); if (LOG.isDebugEnabled()) { LOG.debug("MapId=" + mapId + " Reducer=" + parts + "Spill =" + i + "(" + indexRecord.startOffset + "," + indexRecord.rawLength + ", " + indexRecord.partLength + ")"); } } //将属于同一个分区的数据进行merge //merge @SuppressWarnings("unchecked") RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, job.getInt("io.sort.factor", 100), new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, null, spilledRecordsCounter); //write merged output to disk //将merge后的数据写入到最终的spill文件中 long segmentStart = finalOut.getPos(); Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); } //close writer.close(); // record offsets //记录当前分区在spill.out文件中的索引信息 rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, parts); } //将索引信息写入到spill.index.out spillRec.writeToFile(finalIndexFile, job); finalOut.close(); //将每次触发spill而产生的spill文件删除 for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); } } }
2)Merge.merge需要根据segment 列表将不同spill文件中同一个parition的数据进行merge。