大数据框架hadoop的序列化机制

标签: 大数据 框架 hadoop | 发表时间:2014-11-24 23:50 | 作者:
出处:http://www.iteye.com

       对象的序列化(Serialization)用于将对象编码成一个字节流,以及从字节流中重新构建对象。“将一个对象编码成一个字节流”称为序列化该对象(Serializing);相反的处理过程称为反序列化(Deserializing)。

1.1              Java内建序列化机制

Java序列化机制将对象转换为连续的byte数据,这些数据可以在日后还原为原先的对象状态,该机制还能自动处理不同操作系统上的差异,在Windows系统上序列化的Java对象,可以在UNIX系统上被重建出来,不需要担心不同机器上的数据表示方法,也不需要担心字节排列次序。

在Java中,使一个类的实例可被序列化非常简单,只需要在类声明中加入implements Serializable即可。Serializable接口是一个标志,不具有任何成员函数,其定义如下:

    public interface Serializable {

}

    Serializable接口没有任何方法,所以不需要对类进行修改,Block类通过声明它实现了Serializable 接口,立即可以获得Java提供的序列化功能。代码如下:

public classBlock implements Writable, Comparable<Block>, Serializable

由于序列化主要应用在与I/O相关的一些操作上,其实现是通过一对输入/输出流来实现的。如果想对某个对象执行序列化动作,可以在某种OutputStream对象的基础上创建一个对象流ObjectOutputStream对象,然后调用writeObject()就可达到目的。

writeObject()方法负责写入实现了Serializable接口对象的状态信息,输出数据将被送至该OutputStream。多个对象的序列化可以在ObjectOutputStream对象上多次调用writeObject(),分别写入这些对象。下面是序列化对象的例子:

Block block1=new Block(7806259420524417791L,39447755L,56736651L);

... ...

ByteArrayOutputStream out =new ByteArrayOutputStream();

ObjectOutputStream objOut=new ObjectOutputStream(out);

objOut.writeObject(block1);

但是,序列化以后的对象在尺寸上有点过于充实了,以Block类为例,它只包含3个长整数,但是它的序列化结果竟然有112字节。包含3个长整数的Block对象的序列化结果如下:

-84, -19, 0, 5, 115, 114, 0, 23, 111, 114, 103, 46, 115, 101, 97, 110, 100, 101, 110, 103, 46, 116, 101, 115, 116, 46, 66, 108, 111, 99, 107, 40, -7, 56, 46, 72, 64, -69, 45, 2, 0, 3, 74, 0, 7, 98, 108, 111, 99, 107, 73, 100, 74, 0, 16, 103, 101, 110, 101, 114, 97, 116, 105, 111, 110, 115, 83, 116, 97, 109, 112, 74, 0, 8, 110, 117, 109, 66, 121, 116, 101, 115, 120, 112, 108, 85, 103, -107, 104, -25, -110, -1, 0, 0, 0, 0, 3, 97, -69, -117, 0, 0, 0, 0, 2, 89, -20, -53

1.2              Hadoop序列化机制

和Java序列化机制不同(在对象流ObjectOutputStream对象上调用writeObject()方法),Hadoop的序列化机制通过调用对象的write()方法(它带有一个类型为DataOutput的参数),将对象序列化到流中。反序列化的过程也是类似,通过对象的readFields(),从流中读取数据。值得一提的是,Java序列化机制中,反序列化过程会不断地创建新的对象,但在Hadoop的序列化机制的反序列化过程中,用户可以复用对象,这减少了Java对象的分配和回收,提高了应用的效率。

public static void main(String[] args) {

    try {

        Block block1 = new Block(1L,2L,3L);

        ... ...

        ByteArrayOutputStream bout = new ByteArrayOutputStream();

        DataOutputStream dout = new DataOutputStream();

        block1.write(dout);

        dout.close();

        ... ...

    }

    ... ...

}

由于Block对象序列化时只输出了3个长整数,block1的序列化结果一共有24字节。

1.3              Hadoop Writable机制

Hadoop引入org.apache.hadoop.io.Writable接口,作为所有可序列化对象必须实现的接口,在eclipse开发工具里看到的大纲视图如下:



 

和java.io.Serializable不同,Writable接口不是一个说明性接口,它包含两个方法:

public interface Writable {

  /**

   * Serialize the fields of this object to <code>out</code>.

   * @param out <code>DataOuput</code> to serialize this object into.

   * @throws IOException

   */

  void write(DataOutput out) throws IOException;

  /**

   * Deserialize the fields of this object from <code>in</code>. 

   * For efficiency, implementations should attempt to re-use storage in the

   * existing object where possible.</p>

   * @param in <code>DataInput</code> to deseriablize this object from.

   * @throws IOException

   */

  void readFields(DataInput in) throws IOException;

}

Writable.write(DataOutput out)方法用于将对象写入二进制的DataOutput中,反序列化的过程由readFields(DataInput in)从DataInput流中读取状态完成。下面是一个例子:

public class Block {

    private long blockId;

    private long numBytes;

    private long generationsStamp;

    public void write(DataOutput out) throws IOException {

        out.writeLong(blockId);

        out.writeLong(numBytes);

        out.writeLong(generationsStamp);

    }

    public void readFields(DataInput in) throws IOException {

        this.blockId = in.readLong();

        this.numBytes = in.readLong();

        this.generationsStamp = in.readLong();

        if (numBytes < 0 ) {

            throw new IOException("Unexpected block size:" + numBytes);

        }

    }

}

Hadoop序列化机制中还包括另外几个重要接口:WritableComparable、RawComparator和WritableComparator。

Comparable是一个对象本身就已经支持自比较所需要实现的接口(如Integer自己就可以完成比较大小操作),实现Comparable接口的方法compareTo(),通过传入要比较的对象即可进行比较。

   而Comparator是一个专用的比较器,可以完成两个对象之间大小的比较。实现Comparator接口的compare()方法,通过传入需要比较的两个对象来实现对两个对象之间大小的比较。

1.4              典型的Writable类详解

1.4.1       Java基本类型的Writable封装

Java基本类型对应的Writable封装如下表:

Java 基本类型

Writable

布尔型(Boolean)

BooleanWritable

字节型(byte)

ByteWritable

整型(int)

IntWritable

VIntWritable

浮点型(float)

FloatWritable

长整型(long)

LongWritable

VLongWritable

双精度浮点型(double)

DoubleWritable

下面以VIntWritable为例,代码如下:

public class VIntWritable implements WritableComparable {

  private intvalue;

  public VIntWritable() {}

  public VIntWritable( intvalue) { set(value); }

  /** Set the value of this VIntWritable. */

  public void set( intvalue) { this.value = value; }

  /** Return the value of this VIntWritable. */

  public int get() { returnvalue; }

  public void readFields(DataInput in) throws IOException {

    value = WritableUtils. readVInt(in);

  }

  public void write(DataOutput out) throws IOException {

    WritableUtils. writeVInt(out, value);

  }

  /** Compares two VIntWritables. */

  public int compareTo(Object o) {

    intthisValue = this.value;

    intthatValue = ((VIntWritable)o).value;

    return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));

  }

}

    VIntWritable是通过调用Writable工具类中提供的readVInt()和writeVInt()读/写数据。

1.4.2       ObjectWritable类的实现

针对类实例,ObjectWritable提供了一个封装。相关代码如下:

public class ObjectWritable implements Writable, Configurable {

  private Class declaredClass;

  private Object instance;

  private Configuration conf;

  public ObjectWritable() {}

  public ObjectWritable(Object instance) {

    set(instance);

  }

  public ObjectWritable(Class declaredClass, Object instance) {

    this.declaredClass = declaredClass;

    this.instance = instance;

  }

  /** Return the instance, or null if none. */

  public Object get() { returninstance; }

  /** Return the class this is meant to be. */

  public Class getDeclaredClass() { returndeclaredClass; }

  /** Reset the instance. */

  public void set(Object instance) {

    this.declaredClass = instance.getClass();

    this.instance = instance;

  }

  public void readFields(DataInput in) throws IOException {

    readObject(in, this, this.conf);

  }

  public void write(DataOutput out) throws IOException {

    writeObject(out, instance, declaredClass, conf);

  }

  /** Write a {@link Writable}, {@link String}, primitive type, or an array of

   * the preceding. */

  public static void writeObject(DataOutput out, Object instance,

                                 Class declaredClass,

                                 Configuration conf) throws IOException {

    if (instance == null) {                       // null

      instance = new NullInstance(declaredClass, conf);

      declaredClass = Writable. class;

    }

    UTF8. writeString(out, declaredClass.getName()); // always write declared

    if (declaredClass.isArray()) {                // array

      intlength = Array. getLength(instance);

      out.writeInt(length);

      for ( inti = 0; i < length; i++) {

        writeObject(out, Array. get(instance, i),

        declaredClass.getComponentType(), conf);

      }

    } else if (declaredClass == String. class) {   // String

      UTF8. writeString(out, (String)instance);

    } else if (declaredClass.isPrimitive()) {     // primitive type

      if (declaredClass == Boolean. TYPE) {        // boolean

        out.writeBoolean(((Boolean)instance).booleanValue());

      } else if (declaredClass == Character. TYPE) { // char

        out.writeChar(((Character)instance).charValue());

      } else if (declaredClass == Byte. TYPE) {    // byte

        out.writeByte(((Byte)instance).byteValue());

      } else if (declaredClass == Short. TYPE) {   // short

        out.writeShort(((Short)instance).shortValue());

      } else if (declaredClass == Integer. TYPE) { // int

        out.writeInt(((Integer)instance).intValue());

      } else if (declaredClass == Long. TYPE) {    // long

        out.writeLong(((Long)instance).longValue());

      } else if (declaredClass == Float. TYPE) {   // float

        out.writeFloat(((Float)instance).floatValue());

      } else if (declaredClass == Double. TYPE) {  // double

        out.writeDouble(((Double)instance).doubleValue());

      } else if (declaredClass == Void. TYPE) {    // void

      } else {

        throw new IllegalArgumentException("Not a primitive: "+declaredClass);

      }

    } else if (declaredClass.isEnum()) {         // enum

      UTF8. writeString(out, ((Enum)instance).name());

    } else if (Writable. class.isAssignableFrom(declaredClass)) { // Writable

      UTF8. writeString(out, instance.getClass().getName());

      ((Writable)instance).write(out);

    } else {

      throw new IOException("Can't write: "+instance+" as "+declaredClass);

    }

  }

  /** Read a {@link Writable}, {@link String}, primitive type, or an array of

   * the preceding. */

  public static Object readObject(DataInput in, Configuration conf)

    throws IOException {

    return readObject(in, null, conf);

  }

  /** Read a {@link Writable}, {@link String}, primitive type, or an array of

   * the preceding. */

  @SuppressWarnings("unchecked")

  public static Object readObject(DataInput in, ObjectWritable objectWritable, Configuration conf)

    throws IOException {

    String className = UTF8. readString(in);

    Class<?> declaredClass = PRIMITIVE_NAMES.get(className);

    if (declaredClass == null) {

      try {

        declaredClass = conf.getClassByName(className);

      } catch (ClassNotFoundException e) {

        throw new RuntimeException("readObject can't find class " + className, e);

      }

    }

    Object instance;

    if (declaredClass.isPrimitive()) {            // primitive types

      if (declaredClass == Boolean. TYPE) {             // boolean

        instance = Boolean. valueOf(in.readBoolean());

      } else if (declaredClass == Character. TYPE) {    // char

        instance = Character. valueOf(in.readChar());

      } else if (declaredClass == Byte. TYPE) {         // byte

        instance = Byte. valueOf(in.readByte());

      } else if (declaredClass == Short. TYPE) {        // short

        instance = Short. valueOf(in.readShort());

      } else if (declaredClass == Integer. TYPE) {      // int

        instance = Integer. valueOf(in.readInt());

      } else if (declaredClass == Long. TYPE) {         // long

        instance = Long. valueOf(in.readLong());

      } else if (declaredClass == Float. TYPE) {        // float

        instance = Float. valueOf(in.readFloat());

      } else if (declaredClass == Double. TYPE) {       // double

        instance = Double. valueOf(in.readDouble());

      } else if (declaredClass == Void. TYPE) {         // void

        instance = null;

      } else {

        throw new IllegalArgumentException("Not a primitive: "+declaredClass);

      }

    } else if (declaredClass.isArray()) {              // array

      intlength = in.readInt();

      instance = Array. newInstance(declaredClass.getComponentType(), length);

      for ( inti = 0; i < length; i++) {

        Array. set(instance, i, readObject(in, conf));

      }

    } else if (declaredClass == String. class) {        // String

      instance = UTF8. readString(in);

    } else if (declaredClass.isEnum()) {         // enum

      instance = Enum. valueOf((Class<? extends Enum>) declaredClass, UTF8. readString(in));

    } else {                                      // Writable

      Class instanceClass = null;

      String str = "";

      try {

        str = UTF8. readString(in);

        instanceClass = conf.getClassByName(str);

      } catch (ClassNotFoundException e) {

         throw new RuntimeException("readObject can't find class " + str, e);

      }

      Writable writable = WritableFactories. newInstance(instanceClass, conf);

      writable.readFields(in);

      instance = writable;

      if (instanceClass == NullInstance. class) {  // null

        declaredClass = ((NullInstance)instance).declaredClass;

        instance = null;

      }

    }

    if (objectWritable != null) {                 // store values

      objectWritable.declaredClass = declaredClass;

      objectWritable.instance = instance;

    }

    returninstance;

  }

  ... ...

}

通过readFields方法反序列化一个object。而如果DataInput中传过来的是Writable 类型,则会在readObject再去调用readFields方法(writable.readFields(in)),直到DataInput中传递 的是非Writable 类型,就这样递归的反序列化DataInput中的Writable对象。

readObject()方法依赖于WritableFactories类。WritableFactories类允许非公有的Writable子类定义一个对象工厂,由该工厂创建Writable对象。相关代码如下:

public class WritableFactories {

  private static final HashMap<Class, WritableFactory> CLASS_TO_FACTORY =

    new HashMap<Class, WritableFactory>();

  private WritableFactories() {}                  // singleton

  /** Define a factory for a class. */

  public static synchronized void setFactory(Class c, WritableFactory factory) {

    CLASS_TO_FACTORY.put(c, factory);

  }

  /** Define a factory for a class. */

  public static synchronized WritableFactory getFactory(Class c) {

    return CLASS_TO_FACTORY.get(c);

  }

  /** Create a new instance of a class with a defined factory. */

  public static Writable newInstance(Class<? extends Writable> c, Configuration conf) {

    WritableFactory factory = WritableFactories. getFactory(c);

    if (factory != null) {

      Writable result = factory.newInstance();

      if (result instanceof Configurable) {

        ((Configurable) result).setConf(conf);

      }

      returnresult;

    } else {

      return ReflectionUtils. newInstance(c, conf);

    }

  }

  /** Create a new instance of a class with a defined factory. */

  public static Writable newInstance(Class<? extends Writable> c) {

    return newInstance(c, null);

  }

}

 

WritableFacories.newInstance()方法根据输入的类型查找对应的WritableFactory工厂对象,然后调用该对象的newInstance()创建对象,如果该对象是可配置的,newInstance()还会通过对象的setConf()方法配置对象。



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [大数据 框架 hadoop] 推荐:

大数据框架hadoop的序列化机制

- - ITeye博客
       对象的序列化(Serialization)用于将对象编码成一个字节流,以及从字节流中重新构建对象. “将一个对象编码成一个字节流”称为序列化该对象(Serializing);相反的处理过程称为反序列化(Deserializing). 1.1              Java内建序列化机制.

大数据架构hadoop

- - CSDN博客云计算推荐文章
摘要:Admaster数据挖掘总监 随着互联网、移动互联网和物联网的发展,谁也无法否认,我们已经切实地迎来了一个海量数据的时代,数据调查公司IDC预计2011年的数据总量将达到1.8万亿GB,对这些海量数据的分析已经成为一个非常重要且紧迫的需求. 随着互联网、移动互联网和物联网的发展,谁也无法否认,我们已经切实地迎来了一个海量数据的时代,数据调查公司IDC预计2011年的数据总量将达到1.8万亿GB,对这些海量数据的分析已经成为一个非常重要且紧迫的需求.

王家林“云计算分布式大数据Hadoop实战高手之路---从零开始”的第一讲Hadoop图文训练课程:10分钟理解云计算分布式大数据处理框架Hadoop

- - CSDN博客云计算推荐文章
                                                                                                                                                     .

Hadoop之HDFS子框架

- - CSDN博客云计算推荐文章
由图片可以看到HDFS主要包含这样几个功能组件. Namenode:存储文档的元数据信息,还有整个文件系统的目录结构. DataNode:存储文档块信息,并且文档块之间是有冗余备份的. 这里面提到了文档块的概念,同本地文件系统一样,HDFS也是按块存储的,只不过块的大小设置的相对大一些,默认为64M.

Hadoop掀起大数据革命 三巨头齐发力

- - 慕容鱼吐的新闻泡
导读:开源的数据处理平台凭借其低成本、高扩展性和灵活性的优势已经赢得了多数网络巨头的认可. 现在Hadoop将进入更多企业. IBM将在明年推出内置NoSQL技术的DB2旗舰级数据库管理系统. 上个月Oracle和Microsoft也分别透露了将计划在明年发布基于Hadoop的产品. 两家公司都计划提供协助部署服务和企业级支持.

如何挑选合适的大数据或Hadoop平台

- - 互联网旁观者
今年,大数据在很多公司都成为相关话题. 虽然没有一个标准的定义来解释何为 “大数据”,但在处理大数据上,Hadoop已经成为事实上的标准. IBM、Oracle、SAP、甚至Microsoft等几乎所有的大型软件提供商都采用了Hadoop. 然而,当你已经决定要使用Hadoop来处理大数据时,首先碰到的问题就是如何开始以及选择哪一种产品.

大数据-Hadoop小文件问题解决方案

- - IT瘾-geek
HDFS中小文件是指文件size小于HDFS上block(. dfs.block.size)大小的文件. 大量的小文件会给Hadoop的扩展性和性能带来严重的影响. 动态分区插入数据,产生大量的小文件,从而导致map数量剧增. reduce数量越多,小文件也越多,reduce的个数和输出文件个数一致.

分布式计算开源框架Hadoop入门实践

- - ITeye博客
一、分布式计算开源框架Hadoop实践. 在 SIP项目设计的过程中,对于它庞大的日志在开始时就考虑使用任务分解的多线程处理模式来分析统计,在我从前写的文章《Tiger Concurrent Practice --日志分析并行分解设计与实现》中有所提到. 但是由于统计的内容暂时还是十分简单,所以就采用Memcache作为计数器,结合MySQL就完成了访问 控制以及统计的工作.

Oracle大数据机和连接器产品支持与Hadoop和Cloudera Manager集成

- - InfoQ cn
Oracle大数据机和大数据连接器软件支持与Hadoop、Cloudera Manager以及Oracle NoSQL数据库的集成. 上月Oracle 宣布携手Cloudera进军大数据机和连接器软件领域. 大数据机融合了Cloudera公司的 Apache Hadoop(CDH)和 Cloudera Manager管理应用,以及一个开源统计性编程语言 R.

自学大数据:用以生产环境的Hadoop版本比较

- - CSDN博客云计算推荐文章
生产环境中,hadoop的版本选择是一个公司架构之时,很重要的一个考虑因素. 这篇文章根据就谈谈现在主流的hadoop版本的比较. 如果有不同意见,或者指正,希望大家能交流. Apache Hadoop:Apache Hadoop是一款支持数据密集型分布式应用并以Apache 2.0许可协议发布的开源软件框架.