当使用MapReduce处理压缩文件时,需要考虑压缩文件的可分割性。如果文件是一个bzip2格式的压缩文件,那么,MapReduce作业可以通过bzip2格式压缩文件中的块,将输入划分为若干输入分片,并从块开始处开始解压缩数据。Bzip2格式压缩文件中,块与块间提供了一个48位的同步标记,因此,bzip2支持数据分割。下表列出了一些可以用于Hadoop的常见压缩格式以及特性。
压缩格式 |
UNIX工具 |
算法 |
文件扩展名 |
支持多文件 |
可分割 |
DEFLATE |
无 |
DEFLATE |
.deflate |
否 |
否 |
gzip |
gzip |
DEFLATE |
.gz |
否 |
否 |
zip |
zip |
DEFLATE |
.zip |
是 |
是 |
bzip |
bzip2 |
bzip2 |
.bz2 |
否 |
是 |
LZO |
lzop |
LZO |
.lzo |
否 |
否 |
目前,Hadoop支持的编码/解码器如下表:
压缩格式 |
对应的编码/解码器 |
DEFLATE |
org.apache.hadoop.io.compress.DefaultCodec |
gzip |
org.apache.hadoop.io.compress.GzipCodec |
bzip |
org.apache.hadoop.io.compress.BZip2Codec |
Snappy |
? |
?
????compress()方法接受一个字符串参数,用于指定编码/解码器,并用对应的压缩算法对文本文件README.txt进行压缩。
????public?static?void?compress(String?method)?throws?Exception?{
????????File?fileIn?=?new?File("README.txt");
????????InputStream?in??=?new?FileInputStream(fileIn);
????????Class<?>?codecClass?=?Class.forName(method);
????????Configuration?conf?=?new?Configuration();
????????//?通过名称找对应的编码/解码器
????????CompressionCodec?codec?=?(CompressionCodec)?ReflectionUtils.newInstance(codecClass,?conf);
????????File?fileOut?=?new?File("README.txt"?+?codec.getDefaultExtension());
????????fileOut.delete();
????????OutputStream?out?=?new?FileOutputStream(fileOut);
????????//?通过编码/解码器创建对应的输出流
????????CompressionOutputStream?cout?=?codec.createOutputStream(out);
????????//?压缩
????????IOUtils.copyBytes(in,?cout,?4096,?false);
????????in.close();
????????cout.close();
}
需要解压缩文件时,通常通过其扩展名来推断它对应的编码/解码器,进而用相应的解码流对数据进行解码。
????public?static?void?decompress(File?file)?throws?IOException?{
????????Configuration?conf?=?new?Configuration();
????????CompressionCodecFactory?factory?=?new?CompressionCodecFactory(conf)?;
????????//?通过文件扩展名获得相应的编码/解码器
????????CompressionCodec?codec?=?factory.getCodec(new?Path(file.getName()));
????????if?(codec?==?null)?{
????????????System.out.println("找不到文件。");
????????????return?;
????????}
????????File?fileOut?=?new?File(file.getName()+".txt");
????????InputStream?in?=?codec.createInputStream(new?FileInputStream(file));
????????...?...
????}
????前面已经提过,CompressionCodec接口实现了编码/解码器,使用的是抽象工厂的设计模式,用于创建一系列相关或互相依赖的对象。CompressionCodec可以获得和某一个压缩算法相关的对象,包括压缩流和解压缩流等。
public?interface?CompressionCodec?{
??/**
???*?Create?a?{@link?CompressionOutputStream}?that?will?write?to?the?given?
???*?{@link?OutputStream}.
???*?@param?out?the?location?for?the?final?output?stream
???*?@return?a?stream?the?user?can?write?uncompressed?data?to?have?it?compressed
???*?@throws?IOException
???*/
??CompressionOutputStream?createOutputStream(OutputStream?out)?
??throws?IOException;
??/**
???*?Create?a?{@link?CompressionOutputStream}?that?will?write?to?the?given?
???*?{@link?OutputStream}?with?the?given?{@link?Compressor}.
???*?@param?out?the?location?for?the?final?output?stream
???*?@param?compressor?compressor?to?use
???*?@return?a?stream?the?user?can?write?uncompressed?data?to?have?it?compressed
???*?@throws?IOException
???*/
??CompressionOutputStream?createOutputStream(OutputStream?out,?
?????????????????????????????????????????????Compressor?compressor)?
??throws?IOException;
?
??/**
???*?Get?the?type?of?{@link?Compressor}?needed?by?this?{@link?CompressionCodec}.
???*?@return?the?type?of?compressor?needed?by?this?codec.
???*/
??Class<??extends?Compressor>?getCompressorType();
??/**
???*?Create?a?new?{@link?Compressor}?for?use?by?this?{@link?CompressionCodec}.
???*?@return?a?new?compressor?for?use?by?this?codec
???*/
??Compressor?createCompressor();
??/**
???*?Create?a?stream?decompressor?that?will?read?from?the?given?input?stream.
???*?@param?in?the?stream?to?read?compressed?bytes?from
???*?@return?a?stream?to?read?uncompressed?bytes?from
???*?@throws?IOException
???*/
??CompressionInputStream?createInputStream(InputStream?in)?throws?IOException;
??/**
???*?Create?a?{@link?CompressionInputStream}?that?will?read?from?the?given?
???*?{@link?InputStream}?with?the?given?{@link?Decompressor}.
???*?@param?in?the?stream?to?read?compressed?bytes?from
???*?@param?decompressor?decompressor?to?use
???*?@return?a?stream?to?read?uncompressed?bytes?from
???*?@throws?IOException
???*/
??CompressionInputStream?createInputStream(InputStream?in,?
???????????????????????????????????????????Decompressor?decompressor)?
??throws?IOException;??/**
???*?Get?the?type?of?{@link?Decompressor}?needed?by?this?{@link?CompressionCodec}.
???*?@return?the?type?of?decompressor?needed?by?this?codec.
???*/
??Class<??extends?Decompressor>?getDecompressorType();
??
??/**
???*?Create?a?new?{@link?Decompressor}?for?use?by?this?{@link?CompressionCodec}.
???*?@return?a?new?decompressor?for?use?by?this?codec
???*/
??Decompressor?createDecompressor();
??
??/**
???*?Get?the?default?filename?extension?for?this?kind?of?compression.
???*?@return?the?extension?including?the?‘.‘
???*/
??String?getDefaultExtension();
}
compressionCodecFactory是Hadoop压缩框架中的另一个类,它应用了工厂方法(参数化工厂方法),用于创建多种产品,如CompressionCodec,GzipCodec,BZip2Codec对象。实现代码如下:
public?class?CompressionCodecFactory?{
??/**
???*?A?map?from?the?reversed?filename?suffixes?to?the?codecs.
???*?This?is?probably?overkill,?because?the?maps?should?be?small,?but?it?
???*?automatically?supports?finding?the?longest?matching?suffix.?
???*/
??private?SortedMap<String,?CompressionCodec>?codecs?=?null;
??private?void?addCodec(CompressionCodec?codec)?{
????String?suffix?=?codec.getDefaultExtension();
????codecs.put(new?StringBuffer(suffix).reverse().toString(),?codec);
??}
??/**
???*?Find?the?codecs?specified?in?the?config?value?io.compression.codecs?
???*?and?register?them.?Defaults?to?gzip?and?zip.
???*/
??public?CompressionCodecFactory(Configuration?conf)?{
????codecs?=?new?TreeMap<String,?CompressionCodec>();
????List<Class<??extends?CompressionCodec>>?codecClasses?=?getCodecClasses(conf);
????if?(codecClasses?==?null)?{
??????addCodec(new?GzipCodec());
??????addCodec(new?DefaultCodec());??????
????}?else?{
??????Iterator<Class<??extends?CompressionCodec>>?itr?=?codecClasses.iterator();
??????while?(itr.hasNext())?{
????????CompressionCodec?codec?=?ReflectionUtils.newInstance(itr.next(),?conf);
????????addCodec(codec);?????
??????}
????}
??}
??
??/**
???*?Find?the?relevant?compression?codec?for?the?given?file?based?on?its
???*?filename?suffix.
???*?@param?file?the?filename?to?check
???*?@return?the?codec?object
???*/
??public?CompressionCodec?getCodec(Path?file)?{
????CompressionCodec?result?=?null;
????if?(codecs?!=?null)?{
??????String?filename?=?file.getName();
??????String?reversedFilename?=?new?StringBuffer(filename).reverse().toString();
??????SortedMap<String,?CompressionCodec>?subMap?=?
????????codecs.headMap(reversedFilename);
??????if?(!subMap.isEmpty())?{
????????String?potentialSuffix?=?subMap.lastKey();
????????if?(reversedFilename.startsWith(potentialSuffix))?{
??????????result?=?codecs.get(potentialSuffix);
????????}
??????}
????}
????return?result;
??}
?
}
????getCodec()方法的代码看似复杂,但通过灵活使用有序映射SortedMap,实现其实还是非常简单的。
压缩器(Compressor)和解压缩器(Decompressor)中Hadoop压缩框架中的一对重要概念。Compressor可以插入压缩输出流的实现中,提供具体的压缩功能;相反,Decompressor提供具体的解压缩功能并插入CompressionInputStream中。
??? 在eclipse开发工具中Compressor和Decompressor的大纲视图如下所示:
?
???? 使用Compressor的一个典型实例如下:
????public?static?void?compressor()?throws?ClassNotFoundException,?IOException?{
????????//?读入被压缩的内容
????????File?fileIn?=?new?File("README.txt");
????????InputStream?in?=?new?FileInputStream(fileIn);
????????int?datalength?=?in.available();
????????byte[]?inbuf?=?new?byte[datalength];
????????in.read(inbuf,?0,?datalength);
????????in.close();
?
????????//?长度受限制的输出缓冲区,用于说明finished()方法
????????int?compressorOutputBufferSize?=?100;
????????byte[]?outbuf?=?new?byte[compressorOutputBufferSize];
????????Compressor?compressor?=?new?BuiltInZlibDeflater();?//?构造压缩器
????????int?step?=?100;??//?一些计数器
????????int?inputPos?=?0;
????????int?putcount?=?0;
????????int?getcount?=?0;
????????int?compressedlen?=?0;
?
????????while?(inputPos?<?datalength)?{
????????????//?进行多次setInput
????????????int?len?=?(datalength-inputPos>=step)?step:datalength-inputPos;
????????????compressor.setInput(inbuf,?inputPos,?len);
????????????putcount++;
?
????????????while?(!compressor.needsInput())?{
????????????????compressedlen?=?compressor.compress(outbuf,?0?,?100);
????????????????if?(compressedlen>0)?{
????????????????????getcount++
????????????????}
????????????}
????????????inputPos+=step;
????????}
????????compressor.finish();
????????while?(!compressor.finished())?{
????????????getcount++;
????????????compressor.compress(outbuf,?0,?compressorOutputBufferSize);
????????}
????????System.out.println(compressor.getBytesRead());
????????System.out.println(compressor.getBytesWritten());
????????System.out.println(putcount);
?
????????compressor.end();
}
????以上代码实现了setInput()、needsInput()、finish()、compress()和finished()的配合过程。
压缩流(CompressionOutputStream)和解压缩流(CompressionInputStream)是Hadoop压缩框架中另一对重要概念,它提供了基于流的压缩解压能力。相关代码如下:
public?abstract?class?CompressionOutputStream?extends?OutputStream?{
??/**
???*?The?output?stream?to?be?compressed.?
???*/
??protected?final?OutputStream?out;
??/**
???*?Create?a?compression?output?stream?that?writes
???*?the?compressed?bytes?to?the?given?stream.
???*?@param?out
???*/
??protected?CompressionOutputStream(OutputStream?out)?{
????this.out?=?out;
??}
??public?void?close()?throws?IOException?{
????finish();
????out.close();
??}
??public?void?flush()?throws?IOException?{
????out.flush();
??}
??/**
???*?Write?compressed?bytes?to?the?stream.
???*?Made?abstract?to?prevent?leakage?to?underlying?stream.
???*/
??public?abstract?void?write(byte[]?b,?int?off,?int?len)?throws?IOException;
??/**
???*?Finishes?writing?compressed?data?to?the?output?stream?
???*?without?closing?the?underlying?stream.
???*/
??public?abstract?void?finish()?throws?IOException;
??/**
???*?Reset?the?compression?to?the?initial?state.?
???*?Does?not?reset?the?underlying?stream.
???*/
??public?abstract?void?resetState()?throws?IOException;
}
???CompressorStream使用压缩器实现了一个通用的压缩流,主要代码如下:
public?class?CompressorStream?extends?CompressionOutputStream?{
??protected?Compressor?compressor;
??protected?byte[]?buffer;
??protected?boolean?closed?=?false;
??public?CompressorStream(OutputStream?out,?Compressor?compressor,?int?bufferSize)?{
????super(out);
????if?(out?==?null?||?compressor?==?null)?{
??????throw?new?NullPointerException();
????}?else?if?(bufferSize?<=?0)?{
??????throw?new?IllegalArgumentException("Illegal?bufferSize");
????}
????this.compressor?=?compressor;
????buffer?=?new?byte[bufferSize];
??}
??public?void?write(byte[]?b,?int?off,?int?len)?throws?IOException?{
????//?Sanity?checks
????if?(compressor.finished())?{
??????throw?new?IOException("write?beyond?end?of?stream");
????}
????if?((off?|?len?|?(off?+?len)?|?(b.length?-?(off?+?len)))?<?0)?{
??????throw?new?IndexOutOfBoundsException();
????}?else?if?(len?==?0)?{
??????return;
????}
????compressor.setInput(b,?off,?len);
????while?(!compressor.needsInput())?{
??????compress();
????}
??}
??protected?void?compress()?throws?IOException?{
????int?len?=?compressor.compress(buffer,?0,?buffer.length);
????if?(len?>?0)?{
??????out.write(buffer,?0,?len);
????}
??}
??public?void?finish()?throws?IOException?{
????if?(!compressor.finished())?{
??????compressor.finish();
??????while?(!compressor.finished())?{
????????compress();
??????}
????}
??}
??public?void?resetState()?throws?IOException?{
????compressor.reset();
??}
??public?void?close()?throws?IOException?{
????if?(!closed)?{
??????finish();
??????out.close();
??????closed?=?true;
????}
??}
??private?byte[]?oneByte?=?new?byte[1];
??public?void?write(int?b)?throws?IOException?{
????oneByte[0]?=?(byte)(b?&?0xff);
????write(oneByte,?0,?oneByte.length);
??}
}
????CompressorStream利用压缩器Compressor实现了一个通用的压缩流,在Hadoop中引入一个新的压缩算法,如果没有特殊的考虑,一般只需要实现相关的压缩器和解压缩器,然后通过CompressorStream和DecompressorStream,就实现相关压缩算法的输入/输出流了。
?
原文:http://seandeng888.iteye.com/blog/2160492