Writable接口大家可能都知道,它是一个实现了序列化协议的序列化对象。在Hadoop中定义一个结构化对象都要实现Writable接口,使得该结构化对象可以序列化为字节流,字节流也可以反序列化为结构化对象。那WritableComparable接口是可序列化并且可比较的接口。MapReduce中所有的key值类型都必须实现这个接口,既然是可序列化的那就必须得实现readFiels()和write()这两个序列化和反序列化函数,既然也是可比较的那就必须得实现compareTo()函数,该函数即是比较和排序规则的实现。这样MR中的key值就既能可序列化又是可比较的。下面几符图是API中对WritableComparable接口的解释及其方法,还有一个实现了该接口的对象的列子:
public interface WritableComparable<T> extends Writable, Comparable<T>
A
Writable
which is alsoComparable
.
WritableComparable
s can be compared to each other, typically via
Comparator
s. Any type which is to be used as a key
in the
Hadoop Map-Reduce framework should implement this interface.
Example:
public class MyWritableComparable implements WritableComparable
{ // Some data private int counter; private long timestamp;
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp); }
public void readFields(DataInput in) throws IOException {
counter = in.readInt(); timestamp = in.readLong(); }
public int compareTo(MyWritableComparable w) {
int thisValue = this.value; int thatValue = ((IntWritable)o).value;
return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
在Hadoop中,Writable接口定义了两个方法:一个用于将其状态写入二进制格式的DataOutput流,另一个用于从二进制格式的DataInput流读取其态。
package org.apache.hadoop.io; import java.io.DataOutput; import java.io.DataInput; import java.io.IOException; public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }
让我们来看一个特别的Writable,看看可以对它进行哪些操作。我们要使用IntWritable,这是一个Java的int对象的封装。可以使用set()函数来创建和设置它的值:
IntWritable writable = new IntWritable(); writable.set(163);
类似地,我们也可以使用构造函数:
IntWritable writable = new IntWritable(163);
为了检查IntWritable的序列化形式,我们写一个小的辅助方法,它把一个java.io.ByteArrayOutputStream封装到java.io.DataOutputStream中(java.io.DataOutput的一个实现),以此来捕获序列化的数据流中的字节:
public static byte[] serialize(Writable writable) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputStream dataOut = new DataOutputStream(out); writable.write(dataOut); dataOut.close(); return out.toByteArray(); }
整数用四个字节写入(我们使用JUnit 4断言):
byte[] bytes = serialize(writable); assertThat(bytes.length, is(4));
字节使用大端顺序写入(所以,最重要的字节写在数据流的开始处,这是由java.io.DataOutput接口规定的),我们可以使用Hadoop的StringUtils方法看到它们的十六进制表示:
assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));
让我们再来试试反序列化。我们创建一个帮助方法来从一个字节数组读取一个Writable对象:
public static byte[] deserialize(Writable writable, byte[] bytes) throws IOException { ByteArrayInputStream in = new ByteArrayInputStream(bytes); DataInputStream dataIn = new DataInputStream(in); writable.readFields(dataIn); dataIn.close(); return bytes; }
我们构造一个新的、缺值的IntWritable,然后调用deserialize()方法来读取刚写入的输出流。然后发现它的值(使用get方法检索得到)还是原来的值163:
IntWritable newWritable = new IntWritable(); deserialize(newWritable, bytes); assertThat(newWritable.get(), is(163));
WritableComparable 和comparators
IntWritable实现了WritableComparable接口,后者是Writable和java.lang.Comparable接口的子接口。
package org.apache.hadoop.io; public interface WritableComparable extends Writable, Comparable { }
类型的比较对MapReduce而言至关重要的,键和键之间的比较是在排序阶段完成。Hadoop提供的一个优化方法是从Java Comparator的RawComparator扩展:
package org.apache.hadoop.io; import java.util.Comparator; public interface RawComparator extends Comparator { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
这个接口允许执行者比较从流中读取的未被反序列化为对象的记录,从而省去了创建对象的所有开销。例如,IntWritables的comparator使用原始的compare()方法从每个字节数组的指定开始位置(S1和S2)和长度(L1和L2)读取整数b1和b2然后直接进行比较。
WritableComparator是RawComparator对WritableComparable类的一个通用实现。它提供两个主要功能。首先,它提供了一个默认的对原始compare()函数的调用,对从数据流对要比较的对象进行反序列化,然后调用对象的compare()方法。其次,它充当的是RawComparator实例的一个工厂方法(Writable方法已经注册)。例如,为获得IntWritable的comparator,我们只需使用:
RawComparator comparator = WritableComparator.get(IntWritable.class);
comparator可以用来比较两个IntWritable:
IntWritable w1 = new IntWritable(163); IntWritable w2 = new IntWritable(67); assertThat(comparator.compare(w1, w2), greaterThan(0));
或者它们的序列化描述:
byte[] b1 = serialize(w1); byte[] b2 = serialize(w2); assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length), greaterThan(0));
Hadoop WritableComparable接口收集的知识
原文:http://blog.csdn.net/michaelzhou224/article/details/19159425