Configuration conf = ...
FileSystem fs = new RawLocalFileSystem();
FileSystem rawFs = ...
FileSystem checksummedFs = new ChecksumFileSystem(rawFs);
所有压缩算法都需要权衡时间和空间:一般来说-1为优化速度,-9为优化压缩空间,例如:gzip -1 file,代表最快压缩创建一个file.gz。
import java.io.FileInputStream; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.util.ReflectionUtils; public class StreamCompressor { public static void main(String[] args) throws Exception { String codeClassname = "org.apache.hadoop.io.compress.GzipCodec"; Class<?> codecClass = Class.forName(codeClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils .newInstance(codecClass, conf); CompressionOutputStream out = codec.createOutputStream(System.out); //InputStream in = new FileInputStream("/test/input/wc/file01.txt"); InputStream in = System.in; IOUtils.copyBytes(in, out, 4096, false); out.finish(); //这里只是完成到这个数据流的写操作,并没有关闭,所以可以接着往下流 } }
hadoop集群执行命令:echo "Text" | hadoop jar test.jar StreamCompressor | gunzip ,可以看到正确的输出。
import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; public class FileDecompressor { public static void main(String[] args) throws Exception { String uri = "/test/input/t/1901.gz"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path inputPath = new Path(uri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if (codec == null) { System.err.println("No codec found for " + uri); System.exit(1); } String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try { in = codec.createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } } }
hadoop jar test.jar FileDecompressor。
hadoop fs -ls /test/input/t ,可以看到 /test/input/t/file01.gz 文件已经被解压了。
hadoop fs -cat /test/input/t/file01,可以查看文件内容。
import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.util.ReflectionUtils; public class PooledStreamCompressor { public static void main(String[] args) throws Exception { String codeClassname = "org.apache.hadoop.io.compress.GzipCodec"; Class<?> codecClass = Class.forName(codeClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils .newInstance(codecClass, conf); Compressor compressor = null; try { compressor = CodecPool.getCompressor(codec); CompressionOutputStream out = codec.createOutputStream(System.out, compressor); // InputStream in = new // FileInputStream("/test/input/wc/file01.txt"); InputStream in = System.in; IOUtils.copyBytes(in, out, 4096, false); out.finish(); // 这里只是完成到这个数据流的写操作,并没有关闭,所以可以接着往下流 } finally { CodecPool.returnCompressor(compressor);// 返回池子 } } }
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class MaxTemperatureWithCompression { public static void main(String[] args) throws Exception { JobConf conf = new JobConf(MaxTemperatureWithCompression.class); conf.setJobName("Max Temperature With Compression"); // FileInputFormat.addInputPaths(conf, new Path(args[0])); // FileOutputFormat.setOutputPath(conf, new Path(args[1])); FileInputFormat.setInputPaths(conf, new Path("/test/input/t")); FileOutputFormat.setOutputPath(conf, new Path("/test/output/t")); // 设置压缩(输出gz压缩文件) conf.setBoolean("mapred.output.compress", true); conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); conf.setMapperClass(MaxTemperatureWithCompressionMapper.class); conf.setCombinerClass(MaxTemperatureWithCompressionReduce.class); conf.setReducerClass(MaxTemperatureWithCompressionReduce.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); } } class MaxTemperatureWithCompressionMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == ‘+‘) { airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { output.collect(new Text(year), new IntWritable(airTemperature)); } } } class MaxTemperatureWithCompressionReduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxValue = Integer.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } }查理结果:
hadoop fs -copyToLocal /test/output/t/part-00000.gz
gunzip -c part-00000.gz
(4)使用顺序文件(Sequence File),它支持压缩和切分
package org.apache.hadoop.io; import java.io.DataOutput; import java.io.DataInput; import java.io.IOException; public interface Writable { void write(DataOutput output) throws IOException; void readFields(DataInput in) throws IOException; }
import java.io.*; import org.apache.hadoop.io.*; public class TextPair implements WritableComparable<TextPair> { private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public Text getSecond() { return second; } // @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } // @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object o) { if (o instanceof TextPair) { TextPair tp = (TextPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first + "\t" + second; } // @Override public int compareTo(TextPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; } return second.compareTo(tp.second); } }
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; public class Comparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public Comparator() { super(TextPair.class); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); if (cmp != 0) { return cmp; } return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1, b2, s2 + firstL2, l2 - firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } static { WritableComparator.define(TextPair.class, new Comparator()); } }
Doug Cutting这样解释:“为什么开始设计Hadoop的时候我不用Java Serialization?因为它看起来太复杂,而我认为需要有一个非常精简的机制,可以用于精确控制对象的读和写,因为这个机制是Hadoop的核心。使用Java Serialization后,虽然可以获得一些控制权,但用起来非常纠结。不用RMI也处于类似的考虑。高效、高性能的进程间通信是Hadoop的关键。我觉得我们需要精确控制连接、延迟和缓冲的处理方式,然而RMI对此无能为力。”
精简:Writable不把类名写到数据流,它假设客户端知道会收到什么类型,结果是这个格式比Java序列化更加精简,同时支持 随机存取和访问,因为流中的每一条记录均独立于其他记录。
Apache Avro是一个独立于编程语言的数据序列化框架。该项目是由Doug Cutting创建的,旨在解决Hadoop中Writable类型的不足:缺乏语言的可移植性。
本篇不介绍这个框架,可以参阅官方网址:http://avro.apache.org 。
import java.io.*; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; public class SequenceFileWriteDemo { private static final String[] DATA = { "One,two", "Threw,four", "Five,six", "Seven,eitht", "Nine,ten" }; public static void main(String[] args) throws IOException { String uri = "/test/numbers.seq"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); IntWritable key = new IntWritable(); Text value = new Text(); SequenceFile.Writer writer = null; try { writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass()); for (int i = 0; i < 100; i++) { key.set(100 - i); value.set(DATA[i % DATA.length]); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } }
import java.io.*; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.util.ReflectionUtils; public class SequenceFileReadDemo { public static void main(String[] args) throws IOException { String uri = "/test/numbers.seq"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path path = new Path(uri); SequenceFile.Reader reader = null; try { reader = new SequenceFile.Reader(fs, path, conf); Writable key = (Writable) ReflectionUtils.newInstance( reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance( reader.getValueClass(), conf); long position = reader.getPosition(); while (reader.next(key, value)) { String syncSeen = reader.syncSeen() ? "*" : "";// 同步点 System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value); position = reader.getPosition();// beginning of next record } } finally { IOUtils.closeStream(reader); } } }
hadoop fs -text 可以识别gzip压缩文件及顺序文件,其他格式,则认为是文本文件。
hadoop fs -text /test/numbers.seq
import java.io.*; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; public class MapFileWriteDemo { private static final String[] DATA = { "One,two", "Threw,four", "Five,six", "Seven,eitht", "Nine,ten" }; public static void main(String[] args) throws IOException { String uri = "/test/numbers.map"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); IntWritable key = new IntWritable(); Text value = new Text(); MapFile.Writer writer = null; try { writer = new MapFile.Writer(conf, fs, uri, key.getClass(), value.getClass()); for (int i = 0; i < 1024; i++) { key.set(i+1); value.set(DATA[i % DATA.length]); writer.append(key, value); } } finally { IOUtils.closeStream(writer); } } }
hadoop fs -text /test/numbers.map/data | head
hadoop fs -text /test/numbers.map/index | head