用Hadoop AVRO进行大量小文件的处理

标签: hadoop avro 文件 | 发表时间:2013-06-18 19:41 | 作者:zuochanxiaoheshang
出处:http://blog.csdn.net

使用 使用使用 使用 HDFS 保存大量小文件的缺点:
1.Hadoop NameNode 在内存中保存所有文件的“元信息”数据。据统计,每一个文件需要消耗 NameNode600 字节内存。如果需要保存大量的小文件会对NameNode 造成极大的压力。
2.如果采用 Hadoop MapReduce 进行小文件的处理,那么 Mapper 的个数就会跟小文件的个数成线性相关(备注:FileInputFormat 默认只对大于 HDFS Block Size的文件进行划分)。如果小文件特别多,MapReduce 就会在消耗大量的时间进行Map 进程的创建和销毁。
为了解决大量小文件带来的问题,我们可以将很多小文件打包,组装成一个大文件。 Apache Avro 是语言独立的数据序列化系统。 Avro 在概念上分为两部分:模式(Schema)和数据(一般为二进制数据)。Schema 一般采用 Json 格式进行描述。Avro 同时定义了一些自己的数据类型如表所示:

Avro基础数据类型

类型

描述

模式

null 

The absence of a value

 "null"

boolean

A binary value

"boolean"

int

32位带符号整数

"int"

long

64位带符号整数

"long"

float

32位单精度浮点数

"float"

double

64位双精度浮点数

"double"

bytes

byte数组

"bytes"

string

Unicode字符串

"string"

类型

描述

模式

array

An ordered collection of objects. All objects in a particular array must have the same schema.

{

"type": "array",

"items": "long"

}

map

An unordered collection of key-value pairs. Keys must be strings and values may be any type, although within a particular map, all values must have the same schema.

{

"type": "map",

"values": "string"

}

record

A collection of named fields of any type.

{

"type": "record",

"name": "WeatherRecord",

"doc": "A weather reading.",

"fields": [

{"name": "year", "type": "int"},

{"name": "temperature", "type": "int"},

{"name": "stationId", "type": "string"}

]

}

enum

A set of named values.

{

"type": "enum",

"name": "Cutlery",

"doc": "An eating utensil.",

"symbols": ["KNIFE", "FORK", "SPOON"]

}

fixed

A fixed number of 8-bit unsigned bytes.

{

"type": "fixed",

"name": "Md5Hash",

"size": 16

}

union

A union of schemas. A union is represented by a JSON

array, where each element in the array is a schema. Data represented by a union must match one of the schemas in the union.

[

"null",

"string",

{"type": "map", "values": "string"}

]

Avro复杂数据类型


 通过上图所示,通过程序可以将本地的小文件进行打包,组装成一个大文件在HDFS中进行保存,本地的小文件成为Avro的记录。具体的程序如下面的代码所示:

public class Demo {
	public static final String FIELD_CONTENTS = "contents";
	public static final String FIELD_FILENAME = "filename";
	public static final String SCHEMA_JSON = "{\"type\": \"record\",\"name\": \"SmallFilesTest\", "
			+ "\"fields\": ["
			+ "{\"name\":\""
			+ FIELD_FILENAME
			+ "\",\"type\":\"string\"},"
			+ "{\"name\":\""
			+ FIELD_CONTENTS
			+ "\", \"type\":\"bytes\"}]}";
	public static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);

	public static void writeToAvro(File srcPath, OutputStream outputStream) throws IOException {
		DataFileWriter<Object> writer = new  DataFileWriter<Object>(new GenericDatumWriter<Object>()).setSyncInterval(100);
		writer.setCodec(CodecFactory.snappyCodec());
		writer.create(SCHEMA, outputStream);
		for (Object obj : FileUtils.listFiles(srcPath, null, false)){
			File file = (File) obj;
			String filename = file.getAbsolutePath();
			byte content[] = FileUtils.readFileToByteArray(file);
			GenericRecord record = new GenericData.Record(SCHEMA);
			record.put(FIELD_FILENAME, filename);
			record.put(FIELD_CONTENTS, ByteBuffer.wrap(content));
			writer.append(record);
			System.out.println(file.getAbsolutePath() + ":"+ DigestUtils.md5Hex(content));
		}
		IOUtils.cleanup(null, writer);
		IOUtils.cleanup(null, outputStream);
	}

	public static void main(String args[]) throws Exception {
		Configuration config = new Configuration();
		FileSystem hdfs = FileSystem.get(config);
		File sourceDir = new File(args[0]);
		Path destFile = new Path(args[1]);
		OutputStream os = hdfs.create(destFile);
		writeToAvro(sourceDir, os);
	}
}

public class Demo {
	private static final String FIELD_FILENAME = "filename";
	private static final String FIELD_CONTENTS = "contents";

	public static void readFromAvro(InputStream is) throws  IOException {
		DataFileStream<Object> reader = new DataFileStream<Object>(is,new GenericDatumReader<Object>());
		for (Object o : reader) {
			GenericRecord r = (GenericRecord) o;
			System.out.println(r.get(FIELD_FILENAME)+ ":"+DigestUtils.md5Hex(((ByteBuffer)r.get(FIELD_CONTENTS)).array()));
		}
		IOUtils.cleanup(null, is);
		IOUtils.cleanup(null, reader);
	}

	public static void main(String... args) throws Exception {
		Configuration config = new Configuration();
		FileSystem hdfs = FileSystem.get(config);
		Path destFile = new Path(args[0]);
		InputStream is = hdfs.open(destFile);
		readFromAvro(is);
	}
}


作者:zuochanxiaoheshang 发表于2013-6-18 19:41:18 原文链接
阅读:99 评论:0 查看评论

相关 [hadoop avro 文件] 推荐:

用Hadoop AVRO进行大量小文件的处理

- - CSDN博客云计算推荐文章
使用 使用使用 使用 HDFS 保存大量小文件的缺点:. 1.Hadoop NameNode 在内存中保存所有文件的“元信息”数据. 据统计,每一个文件需要消耗 NameNode600 字节内存. 如果需要保存大量的小文件会对NameNode 造成极大的压力. 2.如果采用 Hadoop MapReduce 进行小文件的处理,那么 Mapper 的个数就会跟小文件的个数成线性相关(备注:FileInputFormat 默认只对大于 HDFS Block Size的文件进行划分).

Avro RPC 对比测试

- - 行业应用 - ITeye博客
J2EE平台常采用多层分布式的架构体系. 分布式服务节点之间需要通讯和交互(业务节点和资源节点之间),服务端和客户端需要交互(终端客户端需要调用服务端的远程服务,客户端有C实现的,也有Java等其他语言实现的). 因此基础平台需要提供一个稳定、高效的、可伸缩的RPC服务性组件. 稳定,高性能;作为一个基础性的骨架组件,高可用性和高性能是必备的;传输层希望是面向连接的TCP通信.

hadoop多文件输出

- - CSDN博客云计算推荐文章
现实环境中,常常遇到一个问题就是想使用多个Reduce,但是迫于setup和cleanup在每一个Reduce中会调用一次,只能设置一个Reduce,无法是实现负载均衡. 问题,如果要在reduce中输出两种文件,一种是标志,另一种是正常业务数据,实现方案有三种:. (1)设置一个reduce,在reduce中将数据封装到一个集合中,在cleanup中将数据写入到hdfs中,但是如果数据量巨大,一个reduce无法充分利用资源,实现负载均衡,但是如果数据量较小,可以使用.

hadoop多文件格式输入

- - CSDN博客云计算推荐文章
hadoop多文件格式输入,一般可以使用MultipleInputs类指定不同的输入文件路径以及输入文件格式. 现在需要把user和phone按照phone number连接起来,得到下面的结果:. 那么就可以使用MultipleInputs来操作,这里把user和phone上传到hdfs目录中,分别是/multiple/user/user , /multiple/phone/phone.

hadoop 处理不同的输入文件,文件关联

- - CSDN博客云计算推荐文章
file1和file2进行关联,想要的结果:. 2、将file1的key、value颠倒 ;file1和file2的key相同,file1的value做key,file2的value做value ,输出. if("file1".equals(fileName)){//加标记. // 设置Map和Reduce处理类.

Thrift、protocolbuffer、avro这几种序列化之间的比较

- - 企业架构 - ITeye博客
        thrift和avro都提供rpc服务和序列化,而protocol buffer只是提供序列化功能.         thrift是一个跨语言的轻量级RPC消息和数据交换框架,Thrift能生成的语言有: C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, Smalltalk, and OCaml.

hadoop集群调优-OS和文件系统部分

- - 开源软件 - ITeye博客
根据Dell(因为我们的硬件采用dell的方案)关于hadoop调优的相关说明,改变几个Linux的默认设置,Hadoop的性能能够增长大概15%. 文件描述符是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表. 当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符.

Hadoop 归档 和HIVE 如何使用har 归档 文件

- - CSDN博客云计算推荐文章
但对于MapReduce 来说起不到任何作用,因为har文件就相当一个目录,仍然不能讲小文件合并到一个split中去,一个小文件一个split ,任然是低效的,这里要说一点<>对这个翻译有问题,上面说可以分配到一个split中去,但是低效的.      既然有优势自然也有劣势,这里不说它的不足之处,仅介绍如果使用har 并在hadoop中更好的使用har 文件.

大数据-Hadoop小文件问题解决方案

- - IT瘾-geek
HDFS中小文件是指文件size小于HDFS上block(. dfs.block.size)大小的文件. 大量的小文件会给Hadoop的扩展性和性能带来严重的影响. 动态分区插入数据,产生大量的小文件,从而导致map数量剧增. reduce数量越多,小文件也越多,reduce的个数和输出文件个数一致.

[原]基于hadoop搜索引擎实践——二级索引文件(五)

- - long1657的专栏
基于hadoop搜索引擎——二级索引文件.     一般生成的倒排表文件会比源文件暂用空间大,主要是倒排表文件所记录的信息比较详细. 它记录了所有的索引词记录(TERM_RECORD)信息,对于常见的关键词(TERM),其MULTI_INFO可能包含几万甚至几十万个SINGLE_INFO..     由于倒排表文件很大.