首页 > 其他 > 详细

大数据框架hadoop的解压缩机制

时间:2014-11-26 02:08:02      阅读:389      评论:0      收藏:0      [点我收藏+]
??? gzipzip是通用的压缩工具,在时间/空间处理上相对平衡,gzip2压缩比gzipzip更有效,但速度较慢,而且gzip2的解压缩速度快于它的压缩速度。

当使用MapReduce处理压缩文件时,需要考虑压缩文件的可分割性。如果文件是一个bzip2格式的压缩文件,那么,MapReduce作业可以通过bzip2格式压缩文件中的块,将输入划分为若干输入分片,并从块开始处开始解压缩数据。Bzip2格式压缩文件中,块与块间提供了一个48位的同步标记,因此,bzip2支持数据分割。下表列出了一些可以用于Hadoop的常见压缩格式以及特性。

压缩格式

UNIX工具

算法

文件扩展名

支持多文件

可分割

DEFLATE

DEFLATE

.deflate

gzip

gzip

DEFLATE

.gz

zip

zip

DEFLATE

.zip

bzip

bzip2

bzip2

.bz2

LZO

lzop

LZO

.lzo

目前,Hadoop支持的编码/解码器如下表:

压缩格式

对应的编码/解码器

DEFLATE

org.apache.hadoop.io.compress.DefaultCodec

gzip

org.apache.hadoop.io.compress.GzipCodec

bzip

org.apache.hadoop.io.compress.BZip2Codec

Snappy

?

?

1?Hadoop压缩API应用实例

????compress()方法接受一个字符串参数,用于指定编码/解码器,并用对应的压缩算法对文本文件README.txt进行压缩。

????public?static?void?compress(String?method)?throws?Exception?{

????????File?fileIn?=?new?File("README.txt");

????????InputStream?in??=?new?FileInputStream(fileIn);

????????Class<?>?codecClass?=?Class.forName(method);

????????Configuration?conf?=?new?Configuration();

????????//?通过名称找对应的编码/解码器

????????CompressionCodec?codec?=?(CompressionCodec)?ReflectionUtils.newInstance(codecClass,?conf);

????????File?fileOut?=?new?File("README.txt"?+?codec.getDefaultExtension());

????????fileOut.delete();

????????OutputStream?out?=?new?FileOutputStream(fileOut);

????????//?通过编码/解码器创建对应的输出流

????????CompressionOutputStream?cout?=?codec.createOutputStream(out);

????????//?压缩

????????IOUtils.copyBytes(in,?cout,?4096,?false);

????????in.close();

????????cout.close();

}

需要解压缩文件时,通常通过其扩展名来推断它对应的编码/解码器,进而用相应的解码流对数据进行解码。

????public?static?void?decompress(File?file)?throws?IOException?{

????????Configuration?conf?=?new?Configuration();

????????CompressionCodecFactory?factory?=?new?CompressionCodecFactory(conf)?;

????????//?通过文件扩展名获得相应的编码/解码器

????????CompressionCodec?codec?=?factory.getCodec(new?Path(file.getName()));

????????if?(codec?==?null)?{

????????????System.out.println("找不到文件。");

????????????return?;

????????}

????????File?fileOut?=?new?File(file.getName()+".txt");

????????InputStream?in?=?codec.createInputStream(new?FileInputStream(file));

????????...?...

????}

2?Hadoop压缩框架

2.1?编码/解码器

????前面已经提过,CompressionCodec接口实现了编码/解码器,使用的是抽象工厂的设计模式,用于创建一系列相关或互相依赖的对象。CompressionCodec可以获得和某一个压缩算法相关的对象,包括压缩流和解压缩流等。

public?interface?CompressionCodec?{

??/**

???*?Create?a?{@link?CompressionOutputStream}?that?will?write?to?the?given?

???*?{@link?OutputStream}.

???*?@param?out?the?location?for?the?final?output?stream

???*?@return?a?stream?the?user?can?write?uncompressed?data?to?have?it?compressed

???*?@throws?IOException

???*/

??CompressionOutputStream?createOutputStream(OutputStream?out)?

??throws?IOException;

??/**

???*?Create?a?{@link?CompressionOutputStream}?that?will?write?to?the?given?

???*?{@link?OutputStream}?with?the?given?{@link?Compressor}.

???*?@param?out?the?location?for?the?final?output?stream

???*?@param?compressor?compressor?to?use

???*?@return?a?stream?the?user?can?write?uncompressed?data?to?have?it?compressed

???*?@throws?IOException

???*/

??CompressionOutputStream?createOutputStream(OutputStream?out,?

?????????????????????????????????????????????Compressor?compressor)?

??throws?IOException;

?

??/**

???*?Get?the?type?of?{@link?Compressor}?needed?by?this?{@link?CompressionCodec}.

???*?@return?the?type?of?compressor?needed?by?this?codec.

???*/

??Class<??extends?Compressor>?getCompressorType();

??/**

???*?Create?a?new?{@link?Compressor}?for?use?by?this?{@link?CompressionCodec}.

???*?@return?a?new?compressor?for?use?by?this?codec

???*/

??Compressor?createCompressor();

??/**

???*?Create?a?stream?decompressor?that?will?read?from?the?given?input?stream.

???*?@param?in?the?stream?to?read?compressed?bytes?from

???*?@return?a?stream?to?read?uncompressed?bytes?from

???*?@throws?IOException

???*/

??CompressionInputStream?createInputStream(InputStream?in)?throws?IOException;

??/**

???*?Create?a?{@link?CompressionInputStream}?that?will?read?from?the?given?

???*?{@link?InputStream}?with?the?given?{@link?Decompressor}.

???*?@param?in?the?stream?to?read?compressed?bytes?from

???*?@param?decompressor?decompressor?to?use

???*?@return?a?stream?to?read?uncompressed?bytes?from

???*?@throws?IOException

???*/

??CompressionInputStream?createInputStream(InputStream?in,?

???????????????????????????????????????????Decompressor?decompressor)?

??throws?IOException;??/**

???*?Get?the?type?of?{@link?Decompressor}?needed?by?this?{@link?CompressionCodec}.

???*?@return?the?type?of?decompressor?needed?by?this?codec.

???*/

??Class<??extends?Decompressor>?getDecompressorType();

??

??/**

???*?Create?a?new?{@link?Decompressor}?for?use?by?this?{@link?CompressionCodec}.

???*?@return?a?new?decompressor?for?use?by?this?codec

???*/

??Decompressor?createDecompressor();

??

??/**

???*?Get?the?default?filename?extension?for?this?kind?of?compression.

???*?@return?the?extension?including?the?‘.‘

???*/

??String?getDefaultExtension();

}

compressionCodecFactoryHadoop压缩框架中的另一个类,它应用了工厂方法(参数化工厂方法),用于创建多种产品,如CompressionCodecGzipCodecBZip2Codec对象。实现代码如下:

public?class?CompressionCodecFactory?{

??/**

???*?A?map?from?the?reversed?filename?suffixes?to?the?codecs.

???*?This?is?probably?overkill,?because?the?maps?should?be?small,?but?it?

???*?automatically?supports?finding?the?longest?matching?suffix.?

???*/

??private?SortedMap<String,?CompressionCodec>?codecs?=?null;

??private?void?addCodec(CompressionCodec?codec)?{

????String?suffix?=?codec.getDefaultExtension();

????codecs.put(new?StringBuffer(suffix).reverse().toString(),?codec);

??}

??/**

???*?Find?the?codecs?specified?in?the?config?value?io.compression.codecs?

???*?and?register?them.?Defaults?to?gzip?and?zip.

???*/

??public?CompressionCodecFactory(Configuration?conf)?{

????codecs?=?new?TreeMap<String,?CompressionCodec>();

????List<Class<??extends?CompressionCodec>>?codecClasses?=?getCodecClasses(conf);

????if?(codecClasses?==?null)?{

??????addCodec(new?GzipCodec());

??????addCodec(new?DefaultCodec());??????

????}?else?{

??????Iterator<Class<??extends?CompressionCodec>>?itr?=?codecClasses.iterator();

??????while?(itr.hasNext())?{

????????CompressionCodec?codec?=?ReflectionUtils.newInstance(itr.next(),?conf);

????????addCodec(codec);?????

??????}

????}

??}

??

??/**

???*?Find?the?relevant?compression?codec?for?the?given?file?based?on?its

???*?filename?suffix.

???*?@param?file?the?filename?to?check

???*?@return?the?codec?object

???*/

??public?CompressionCodec?getCodec(Path?file)?{

????CompressionCodec?result?=?null;

????if?(codecs?!=?null)?{

??????String?filename?=?file.getName();

??????String?reversedFilename?=?new?StringBuffer(filename).reverse().toString();

??????SortedMap<String,?CompressionCodec>?subMap?=?

????????codecs.headMap(reversedFilename);

??????if?(!subMap.isEmpty())?{

????????String?potentialSuffix?=?subMap.lastKey();

????????if?(reversedFilename.startsWith(potentialSuffix))?{

??????????result?=?codecs.get(potentialSuffix);

????????}

??????}

????}

????return?result;

??}

?

}

????getCodec()方法的代码看似复杂,但通过灵活使用有序映射SortedMap,实现其实还是非常简单的。

2.2?压缩器和解压器

压缩器(Compressor)和解压缩器(Decompressor)中Hadoop压缩框架中的一对重要概念。Compressor可以插入压缩输出流的实现中,提供具体的压缩功能;相反,Decompressor提供具体的解压缩功能并插入CompressionInputStream中。

??? 在eclipse开发工具中CompressorDecompressor的大纲视图如下所示:

?

???? 使用Compressor的一个典型实例如下:

????public?static?void?compressor()?throws?ClassNotFoundException,?IOException?{

????????//?读入被压缩的内容

????????File?fileIn?=?new?File("README.txt");

????????InputStream?in?=?new?FileInputStream(fileIn);

????????int?datalength?=?in.available();

????????byte[]?inbuf?=?new?byte[datalength];

????????in.read(inbuf,?0,?datalength);

????????in.close();

?

????????//?长度受限制的输出缓冲区,用于说明finished()方法

????????int?compressorOutputBufferSize?=?100;

????????byte[]?outbuf?=?new?byte[compressorOutputBufferSize];

????????Compressor?compressor?=?new?BuiltInZlibDeflater();?//?构造压缩器

????????int?step?=?100;??//?一些计数器

????????int?inputPos?=?0;

????????int?putcount?=?0;

????????int?getcount?=?0;

????????int?compressedlen?=?0;

?

????????while?(inputPos?<?datalength)?{

????????????//?进行多次setInput

????????????int?len?=?(datalength-inputPos>=step)?step:datalength-inputPos;

????????????compressor.setInput(inbuf,?inputPos,?len);

????????????putcount++;

?

????????????while?(!compressor.needsInput())?{

????????????????compressedlen?=?compressor.compress(outbuf,?0?,?100);

????????????????if?(compressedlen>0)?{

????????????????????getcount++

????????????????}

????????????}

????????????inputPos+=step;

????????}

????????compressor.finish();

????????while?(!compressor.finished())?{

????????????getcount++;

????????????compressor.compress(outbuf,?0,?compressorOutputBufferSize);

????????}

????????System.out.println(compressor.getBytesRead());

????????System.out.println(compressor.getBytesWritten());

????????System.out.println(putcount);

?

????????compressor.end();

}

????以上代码实现了setInput()needsInput()finish()compress()finished()的配合过程。

2.3?压缩流和解压缩流

压缩流(CompressionOutputStream)和解压缩流(CompressionInputStream)Hadoop压缩框架中另一对重要概念,它提供了基于流的压缩解压能力。相关代码如下:

public?abstract?class?CompressionOutputStream?extends?OutputStream?{

??/**

???*?The?output?stream?to?be?compressed.?

???*/

??protected?final?OutputStream?out;

??/**

???*?Create?a?compression?output?stream?that?writes

???*?the?compressed?bytes?to?the?given?stream.

???*?@param?out

???*/

??protected?CompressionOutputStream(OutputStream?out)?{

????this.out?=?out;

??}

??public?void?close()?throws?IOException?{

????finish();

????out.close();

??}

??public?void?flush()?throws?IOException?{

????out.flush();

??}

??/**

???*?Write?compressed?bytes?to?the?stream.

???*?Made?abstract?to?prevent?leakage?to?underlying?stream.

???*/

??public?abstract?void?write(byte[]?b,?int?off,?int?len)?throws?IOException;

??/**

???*?Finishes?writing?compressed?data?to?the?output?stream?

???*?without?closing?the?underlying?stream.

???*/

??public?abstract?void?finish()?throws?IOException;

??/**

???*?Reset?the?compression?to?the?initial?state.?

???*?Does?not?reset?the?underlying?stream.

???*/

??public?abstract?void?resetState()?throws?IOException;

}

???CompressorStream使用压缩器实现了一个通用的压缩流,主要代码如下:

public?class?CompressorStream?extends?CompressionOutputStream?{

??protected?Compressor?compressor;

??protected?byte[]?buffer;

??protected?boolean?closed?=?false;

??public?CompressorStream(OutputStream?out,?Compressor?compressor,?int?bufferSize)?{

????super(out);

????if?(out?==?null?||?compressor?==?null)?{

??????throw?new?NullPointerException();

????}?else?if?(bufferSize?<=?0)?{

??????throw?new?IllegalArgumentException("Illegal?bufferSize");

????}

????this.compressor?=?compressor;

????buffer?=?new?byte[bufferSize];

??}

??public?void?write(byte[]?b,?int?off,?int?len)?throws?IOException?{

????//?Sanity?checks

????if?(compressor.finished())?{

??????throw?new?IOException("write?beyond?end?of?stream");

????}

????if?((off?|?len?|?(off?+?len)?|?(b.length?-?(off?+?len)))?<?0)?{

??????throw?new?IndexOutOfBoundsException();

????}?else?if?(len?==?0)?{

??????return;

????}

????compressor.setInput(b,?off,?len);

????while?(!compressor.needsInput())?{

??????compress();

????}

??}

??protected?void?compress()?throws?IOException?{

????int?len?=?compressor.compress(buffer,?0,?buffer.length);

????if?(len?>?0)?{

??????out.write(buffer,?0,?len);

????}

??}

??public?void?finish()?throws?IOException?{

????if?(!compressor.finished())?{

??????compressor.finish();

??????while?(!compressor.finished())?{

????????compress();

??????}

????}

??}

??public?void?resetState()?throws?IOException?{

????compressor.reset();

??}

??public?void?close()?throws?IOException?{

????if?(!closed)?{

??????finish();

??????out.close();

??????closed?=?true;

????}

??}

??private?byte[]?oneByte?=?new?byte[1];

??public?void?write(int?b)?throws?IOException?{

????oneByte[0]?=?(byte)(b?&?0xff);

????write(oneByte,?0,?oneByte.length);

??}

}

????CompressorStream利用压缩器Compressor实现了一个通用的压缩流,在Hadoop中引入一个新的压缩算法,如果没有特殊的考虑,一般只需要实现相关的压缩器和解压缩器,然后通过CompressorStreamDecompressorStream,就实现相关压缩算法的输入/输出流了。

?

大数据框架hadoop的解压缩机制

原文:http://seandeng888.iteye.com/blog/2160492

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!