首页 > 其他 > 详细

大数据框架hadoop的序列化机制

时间:2014-11-25 02:13:49      阅读:302      评论:0      收藏:0      [点我收藏+]

? ? ? ?对象的序列化(Serialization)用于将对象编码成一个字节流,以及从字节流中重新构建对象。“将一个对象编码成一个字节流”称为序列化该对象(Serializing);相反的处理过程称为反序列化(Deserializing)。

1.1????????????? Java内建序列化机制

Java序列化机制将对象转换为连续的byte数据,这些数据可以在日后还原为原先的对象状态,该机制还能自动处理不同操作系统上的差异,在Windows系统上序列化的Java对象,可以在UNIX系统上被重建出来,不需要担心不同机器上的数据表示方法,也不需要担心字节排列次序。

Java中,使一个类的实例可被序列化非常简单,只需要在类声明中加入implements Serializable即可。Serializable接口是一个标志,不具有任何成员函数,其定义如下:

??? publicinterface Serializable {

}

? ??Serializable接口没有任何方法,所以不需要对类进行修改,Block类通过声明它实现了Serializable 接口,立即可以获得Java提供的序列化功能。代码如下:

publicclassBlockimplements Writable, Comparable<Block>, Serializable

由于序列化主要应用在与I/O相关的一些操作上,其实现是通过一对输入/输出流来实现的。如果想对某个对象执行序列化动作,可以在某种OutputStream对象的基础上创建一个对象流ObjectOutputStream对象,然后调用writeObject()就可达到目的。

writeObject()方法负责写入实现了Serializable接口对象的状态信息,输出数据将被送至该OutputStream。多个对象的序列化可以在ObjectOutputStream对象上多次调用writeObject(),分别写入这些对象。下面是序列化对象的例子:

Block block1=new Block(7806259420524417791L,39447755L,56736651L);

... ...

ByteArrayOutputStream out =new ByteArrayOutputStream();

ObjectOutputStream objOut=new ObjectOutputStream(out);

objOut.writeObject(block1);

但是,序列化以后的对象在尺寸上有点过于充实了,以Block类为例,它只包含3个长整数,但是它的序列化结果竟然有112字节。包含3个长整数的Block对象的序列化结果如下:

-84, -19, 0, 5, 115, 114, 0, 23, 111, 114, 103, 46, 115, 101, 97, 110, 100, 101, 110, 103, 46, 116, 101, 115, 116, 46, 66, 108, 111, 99, 107, 40, -7, 56, 46, 72, 64, -69, 45, 2, 0, 3, 74, 0, 7, 98, 108, 111, 99, 107, 73, 100, 74, 0, 16, 103, 101, 110, 101, 114, 97, 116, 105, 111, 110, 115, 83, 116, 97, 109, 112, 74, 0, 8, 110, 117, 109, 66, 121, 116, 101, 115, 120, 112, 108, 85, 103, -107, 104, -25, -110, -1, 0, 0, 0, 0, 3, 97, -69, -117, 0, 0, 0, 0, 2, 89, -20, -53

1.2????????????? Hadoop序列化机制

Java序列化机制不同(在对象流ObjectOutputStream对象上调用writeObject()方法),Hadoop的序列化机制通过调用对象的write()方法(它带有一个类型为DataOutput的参数),将对象序列化到流中。反序列化的过程也是类似,通过对象的readFields(),从流中读取数据。值得一提的是,Java序列化机制中,反序列化过程会不断地创建新的对象,但在Hadoop的序列化机制的反序列化过程中,用户可以复用对象,这减少了Java对象的分配和回收,提高了应用的效率。

public static void main(String[] args) {

??? try {

?????? ?Block block1 = new Block(1L,2L,3L);

??????? ... ...

??????? ByteArrayOutputStream bout = new ByteArrayOutputStream();

??????? DataOutputStream dout = new DataOutputStream();

??????? block1.write(dout);

??????? dout.close();

??????? ... ...

??? }

??? ... ...

}

由于Block对象序列化时只输出了3个长整数,block1的序列化结果一共有24字节。

1.3????????????? Hadoop Writable机制

Hadoop引入org.apache.hadoop.io.Writable接口,作为所有可序列化对象必须实现的接口,在eclipse开发工具里看到的大纲视图如下:


bubuko.com,布布扣
?

java.io.Serializable不同,Writable接口不是一个说明性接口,它包含两个方法:

publicinterface Writable {

? /**

?? * Serialize the fields of this object to <code>out</code>.

?? * @param out <code>DataOuput</code> to serialize this object into.

?? * @throws IOException

?? */

? void write(DataOutput out) throws IOException;

? /**

?? * Deserialize the fields of this object from <code>in</code>.?

?? * For efficiency, implementations should attempt to re-use storage in the

?? * existing object where possible.</p>

?? * @param in <code>DataInput</code> to deseriablize this object from.

?? * @throws IOException

?? */

? void readFields(DataInput in) throws IOException;

}

Writable.write(DataOutput out)方法用于将对象写入二进制的DataOutput中,反序列化的过程由readFields(DataInput in)DataInput流中读取状态完成。下面是一个例子:

public class Block {

??? private long blockId;

??? private long numBytes;

??? private long generationsStamp;

??? public void write(DataOutput out) throws IOException {

??????? out.writeLong(blockId);

??????? out.writeLong(numBytes);

??????? out.writeLong(generationsStamp);

??? }

??? public void readFields(DataInput in) throws IOException {

??????? this.blockId = in.readLong();

??????? this.numBytes = in.readLong();

??????? this.generationsStamp = in.readLong();

??????? if (numBytes < 0 ) {

??????????? throw new IOException("Unexpected block size:" + numBytes);

??????? }

??? }

}

Hadoop序列化机制中还包括另外几个重要接口:WritableComparableRawComparatorWritableComparator

Comparable是一个对象本身就已经支持自比较所需要实现的接口(如Integer自己就可以完成比较大小操作),实现Comparable接口的方法compareTo(),通过传入要比较的对象即可进行比较。

?? Comparator是一个专用的比较器,可以完成两个对象之间大小的比较。实现Comparator接口的compare()方法,通过传入需要比较的两个对象来实现对两个对象之间大小的比较。

1.4????????????? 典型的Writable类详解

1.4.1?????? Java基本类型的Writable封装

Java基本类型对应的Writable封装如下表:

Java基本类型

Writable

布尔型(Boolean)

BooleanWritable

字节型(byte)

ByteWritable

整型(int)

IntWritable

VIntWritable

浮点型(float)

FloatWritable

长整型(long)

LongWritable

VLongWritable

双精度浮点型(double)

DoubleWritable

下面以VIntWritable为例,代码如下:

publicclass VIntWritable implements WritableComparable {

? privateintvalue;

? public VIntWritable() {}

? public VIntWritable(intvalue) { set(value); }

? /** Set the value of this VIntWritable. */

? publicvoid set(intvalue) { this.value = value; }

? /** Return the value of this VIntWritable. */

? publicint get() { returnvalue; }

? publicvoid readFields(DataInput in) throws IOException {

??? value = WritableUtils.readVInt(in);

? }

? publicvoid write(DataOutput out) throws IOException {

??? WritableUtils.writeVInt(out, value);

? }

? /** Compares two VIntWritables. */

? publicint compareTo(Object o) {

??? intthisValue = this.value;

??? intthatValue = ((VIntWritable)o).value;

??? return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));

? }

}

??? VIntWritable是通过调用Writable工具类中提供的readVInt()writeVInt()/写数据。

1.4.2?????? ObjectWritable类的实现

针对类实例,ObjectWritable提供了一个封装。相关代码如下:

publicclass ObjectWritable implements Writable, Configurable {

? private Class declaredClass;

? private Object instance;

? private Configuration conf;

? public ObjectWritable() {}

? public ObjectWritable(Object instance) {

??? set(instance);

? }

? public ObjectWritable(Class declaredClass, Object instance) {

??? this.declaredClass = declaredClass;

??? this.instance = instance;

? }

? /** Return the instance, or null if none. */

? public Object get() { returninstance; }

? /** Return the class this is meant to be. */

? public Class getDeclaredClass() { returndeclaredClass; }

? /** Reset the instance. */

? publicvoid set(Object instance) {

??? this.declaredClass = instance.getClass();

??? this.instance = instance;

? }

? publicvoid readFields(DataInput in) throws IOException {

??? readObject(in, this, this.conf);

? }

? publicvoid write(DataOutput out) throws IOException {

??? writeObject(out, instance, declaredClass, conf);

? }

? /** Write a {@link Writable}, {@link String}, primitive type, or an array of

?? * the preceding. */

? publicstaticvoid writeObject(DataOutput out, Object instance,

???????????????????????????????? Class declaredClass,

???????????????????????????????? Configuration conf) throws IOException {

??? if (instance == null) {?????????????????????? // null

????? instance = new NullInstance(declaredClass, conf);

????? declaredClass = Writable.class;

??? }

??? UTF8.writeString(out, declaredClass.getName()); // always write declared

??? if (declaredClass.isArray()) {??????????????? // array

????? intlength = Array.getLength(instance);

????? out.writeInt(length);

????? for (inti = 0; i < length; i++) {

??????? writeObject(out, Array.get(instance, i),

??????? declaredClass.getComponentType(), conf);

????? }

??? } elseif (declaredClass == String.class) {?? // String

????? UTF8.writeString(out, (String)instance);

??? } elseif (declaredClass.isPrimitive()) {???? // primitive type

????? if (declaredClass == Boolean.TYPE) {??????? // boolean

??????? out.writeBoolean(((Boolean)instance).booleanValue());

????? } elseif (declaredClass == Character.TYPE) { // char

??????? out.writeChar(((Character)instance).charValue());

????? } elseif (declaredClass == Byte.TYPE) {??? // byte

??????? out.writeByte(((Byte)instance).byteValue());

????? } elseif (declaredClass == Short.TYPE) {?? // short

??????? out.writeShort(((Short)instance).shortValue());

????? } elseif (declaredClass == Integer.TYPE) { // int

??????? out.writeInt(((Integer)instance).intValue());

????? } elseif (declaredClass == Long.TYPE) {??? // long

??????? out.writeLong(((Long)instance).longValue());

????? } elseif (declaredClass == Float.TYPE) {?? // float

??????? out.writeFloat(((Float)instance).floatValue());

????? } elseif (declaredClass == Double.TYPE) {? // double

??????? out.writeDouble(((Double)instance).doubleValue());

????? } elseif (declaredClass == Void.TYPE) {??? // void

????? } else {

??????? thrownew IllegalArgumentException("Not a primitive: "+declaredClass);

????? }

??? } elseif (declaredClass.isEnum()) {???????? // enum

????? UTF8.writeString(out, ((Enum)instance).name());

??? } elseif (Writable.class.isAssignableFrom(declaredClass)) { // Writable

????? UTF8.writeString(out, instance.getClass().getName());

????? ((Writable)instance).write(out);

??? } else {

????? thrownew IOException("Can‘t write: "+instance+" as "+declaredClass);

??? }

? }

? /** Read a {@link Writable}, {@link String}, primitive type, or an array of

?? * the preceding. */

? publicstatic Object readObject(DataInput in, Configuration conf)

??? throws IOException {

??? return readObject(in, null, conf);

? }

? /** Read a {@link Writable}, {@link String}, primitive type, or an array of

?? * the preceding. */

? @SuppressWarnings("unchecked")

? publicstatic Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)

??? throws IOException {

??? String className = UTF8.readString(in);

??? Class<?> declaredClass = PRIMITIVE_NAMES.get(className);

??? if (declaredClass == null) {

????? try {

??????? declaredClass = conf.getClassByName(className);

????? } catch (ClassNotFoundException e) {

??????? thrownew RuntimeException("readObject can‘t find class " + className, e);

????? }

??? }

??? Object instance;

??? if (declaredClass.isPrimitive()) {??????????? // primitive types

????? if (declaredClass == Boolean.TYPE) {???????????? // boolean

??????? instance = Boolean.valueOf(in.readBoolean());

????? } elseif (declaredClass == Character.TYPE) {??? // char

??????? instance = Character.valueOf(in.readChar());

????? } elseif (declaredClass == Byte.TYPE) {???????? // byte

??????? instance = Byte.valueOf(in.readByte());

????? } elseif (declaredClass == Short.TYPE) {??????? // short

??????? instance = Short.valueOf(in.readShort());

????? } elseif (declaredClass == Integer.TYPE) {????? // int

??????? instance = Integer.valueOf(in.readInt());

????? } elseif (declaredClass == Long.TYPE) {?? ??????// long

??????? instance = Long.valueOf(in.readLong());

????? } elseif (declaredClass == Float.TYPE) {??????? // float

??????? instance = Float.valueOf(in.readFloat());

????? } elseif (declaredClass == Double.TYPE) {?????? // double

??????? instance = Double.valueOf(in.readDouble());

????? } elseif (declaredClass == Void.TYPE) {???????? // void

??????? instance = null;

????? } else {

??????? thrownew IllegalArgumentException("Not a primitive: "+declaredClass);

????? }

??? } elseif (declaredClass.isArray()) {????????????? // array

????? intlength = in.readInt();

????? instance = Array.newInstance(declaredClass.getComponentType(), length);

????? for (inti = 0; i < length; i++) {

??????? Array.set(instance, i, readObject(in, conf));

????? }

??? } elseif (declaredClass == String.class) {??????? // String

????? instance = UTF8.readString(in);

??? } elseif (declaredClass.isEnum()) {???????? // enum

????? instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in));

??? } else {????????????????????????????????????? // Writable

????? Class instanceClass = null;

????? String str = "";

????? try {

??????? str = UTF8.readString(in);

??????? instanceClass = conf.getClassByName(str);

????? } catch (ClassNotFoundException e) {

? ??????thrownew RuntimeException("readObject can‘t find class " + str, e);

????? }

????? Writable writable = WritableFactories.newInstance(instanceClass, conf);

????? writable.readFields(in);

????? instance = writable;

????? if (instanceClass == NullInstance.class) {? // null

??????? declaredClass = ((NullInstance)instance).declaredClass;

??????? instance = null;

????? }

??? }

??? if (objectWritable != null) {???????????????? // store values

????? objectWritable.declaredClass = declaredClass;

????? objectWritable.instance = instance;

??? }

??? returninstance;

? }

? ... ...

}

通过readFields方法反序列化一个object。而如果DataInput中传过来的是Writable 类型,则会在readObject再去调用readFields方法(writable.readFields(in)),直到DataInput中传递 的是非Writable 类型,就这样递归的反序列化DataInput中的Writable对象。

readObject()方法依赖于WritableFactories类。WritableFactories类允许非公有的Writable子类定义一个对象工厂,由该工厂创建Writable对象。相关代码如下:

publicclass WritableFactories {

? privatestaticfinal HashMap<Class, WritableFactory> CLASS_TO_FACTORY =

??? new HashMap<Class, WritableFactory>();

? private WritableFactories() {}????????????????? // singleton

? /** Define a factory for a class. */

? publicstaticsynchronizedvoid setFactory(Class c, WritableFactory factory) {

??? CLASS_TO_FACTORY.put(c, factory);

? }

? /** Define a factory for a class. */

? publicstaticsynchronized WritableFactory getFactory(Class c) {

??? returnCLASS_TO_FACTORY.get(c);

? }

? /** Create a new instance of a class with a defined factory. */

? publicstatic Writable newInstance(Class<? extends Writable> c, Configuration conf) {

??? WritableFactory factory = WritableFactories.getFactory(c);

??? if (factory != null) {

????? Writable result = factory.newInstance();

????? if (resultinstanceof Configurable) {

??????? ((Configurable) result).setConf(conf);

????? }

????? returnresult;

??? } else {

????? return ReflectionUtils.newInstance(c, conf);

??? }

? }

? /** Create a new instance of a class with a defined factory. */

? publicstatic Writable newInstance(Class<? extends Writable> c) {

??? return newInstance(c, null);

? }

}

?

WritableFacories.newInstance()方法根据输入的类型查找对应的WritableFactory工厂对象,然后调用该对象的newInstance()创建对象,如果该对象是可配置的,newInstance()还会通过对象的setConf()方法配置对象。

大数据框架hadoop的序列化机制

原文:http://seandeng888.iteye.com/blog/2159914

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!