Hadoop中共享全局信息的几种方法

标签: hadoop 共享 信息 | 发表时间:2014-04-21 00:27 | 作者:yanxiangtianji
出处:http://blog.csdn.net

在编写Hadoop MapReduce程序的过程中有时候需要在各个Mapper或者Reducer中使用一些共享的全局数据,例如在处理整数数据表格的时候有时候需要让每个Reducer知道各个列的取值范围或是一些图算法中需要让各个Reducer知道图的连通关系。


加入key/value对 通用,但效率不高
将共享文件放在HDFS上,采用Hadoop的文件操作API访问
通用,效率一般(可读可写)
将共享信息加入JobConf/Configure对象,set/get系列方法 较适用于小信息,效率最高
将共享信息加入DistributedCache对象 较适用于大量共享信息(只能读)


1, 最基本的方法是把需要共享的信息加到key/value对中。这种方法简单易行(用Text表示value,然后在正常数据后面加间隔符和全局数据),但是网络效率和处理效率都受到非常严重的影响。

2, 把共享文件放在HDFS上,在每个Mapper/Reducer中使用Hadoop的文件API去访问。这种方法比较通用,但是需要涉及DFS的文件操作,较为复杂且效率会受到影响。

读写HDFS的API与标准Java文件API有一点差异,需要使用特定的对象来创建InputStream/OutputStream。下面举一个从HDFS文件中读取信息的例子。

其中的关键点在于:首先根据当前的JobConf获得当前的文件系统(它默认从hadoop下的配置文件中读取相关信息,同样适用于单节点模式);然后 要使用FileSystem的成员方法open打开文件(它返回一个FSDataInputStream,它是InputStream的子类), 千万不要试图使用一般的Java文件API打开输入流或直接使用Hadoop的Path打开文件,如new Scanner(p.toString())或new Scanner(new Path(hdfs.getHomeDirectory(),p).toString()),会出现找不到文件的异常(即使文件就在所显示的目录里面)

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

class XXX{
private int N;
List<Integer> D=new ArrayList<Integer>();
.....

	private void setConfByHDFS(Path p, JobConf conf) throws IOException {
		FileSystem hdfs = FileSystem.get(conf);
		Scanner s = new Scanner(hdfs.open(p));
		N = s.nextInt();for (int i = 0; i < N; i++) {
			D.add(s.nextInt());
		}
		s.close();
	}
}
3, 使用JobConf的set*方法写入配置信息,再在Mapper/Reducer的configure方法里面使用JobConf的get*方法读取相关信息。

由于信息是写入JobConf的,读取的时候不设计HDFS的读写,效率最高。但是这种方法难以共享大量信息。比较适合设置一些全局变量。

实现的时候需要 重载Mapper/Reducer的configure方法。

set*方法在JobConf中根据指定的名字创建一个指定类型值,get*方法根据名字访问已经存入的值,对于基本类型可以通过一个额外的参数指定访问失败时返回的默认值(class方法失败时返回null)。可以使用setInt/getInt,setFloat/getFloat这样的方法存取如int、float这样的类型;存取单个字符串直接使用set/get方法;setStrings/getStrings方法的访问的是一个String类型的数组。

class XXX{
...
	public static class CSVReducer extends MapReduceBase implements
			Reducer<IntWritable, IntWritable, IntWritable, VectorIntWritable> {
		private int N=0;
		private ArrayList<Integer> D = new ArrayList<Integer>();

		@Override
		public void configure(JobConf job) {//只有这里能访问到JobConf
			super.configure(job);
			N=job.getInt("csvcount.conf.num", -1);//访问共享信息
			String str = job.get("csvcount.conf.d");
			for (String s : str.split(",")) {
				D.add(Integer.parseInt(s));
			}
		}

		@Override
		public void reduce(IntWritable key, Iterator<IntWritable> values,
				OutputCollector<IntWritable, VectorIntWritable> output, Reporter reporter) throws IOException {
			int[] res = new int[D.get(key.get())];
			// System.out.println(D.get(key.get()));
			...
		}
	}

	private void setConfByConfigure(Path p, JobConf conf) throws IOException {//创建任务后调用本函数类写入全局共享信息
		FileSystem hdfs = FileSystem.get(conf);
		Scanner s = new Scanner(hdfs.open(p));
		int N = s.nextInt();
		ArrayList<Integer> D = new ArrayList<Integer>();
		for (int i = 0; i < N; i++) {
			D.add(s.nextInt());
		}
		s.close();
		conf.setInt("csvcount.conf.num", N);//写入共享信息
		conf.set("csvcount.conf.d", D.toString().replaceAll("[\\[\\] ]", ""));
	}


4, 写入DistributedCache。它是Hadoop专门为共享一些只读的全局信息提供的一个较为简单的机制。由于任务信息块里面只记录了文件的路径,具体的文件读写还是在HDFS上进行,所有可以共享大量的数据;另外由于只允许读操作,以及其他一些内部优化,效率要比使用Hadoop文件API读写文件高一些。
  

使用的时候需要先调用DistributedCache的静态方法addCacheFile将共享文件/目录的URI加入到任务JobConf中;访问之前使用DistributedCache的另一个静态方法getLocalCachedFiles将job中的共享文件全都列出来,然后就可以使用标准的Java文件API打开文件了。

在Mapper/Reducer中需要重载configure方法。

public class XXX {

	public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
		private Set<String> patternsToSkip = new HashSet<String>();
   
		@Override
public void configure(JobConf job) {Path[] patternsFiles = new Path[0];try {patternsFiles = DistributedCache.getLocalCacheFiles(job);//获取所有DistributedCache文件名/目录名} catch (IOException ioe) {System.err.println("Caught exception while getting cached files: "+ StringUtils.stringifyException(ioe));}for (Path patternsFile : patternsFiles) {parseSkipFile(patternsFile);}}private void parseSkipFile(Path patternsFile) {try {BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));String pattern = null;while ((pattern = fis.readLine()) != null) {patternsToSkip.add(pattern);}fis.close();} catch (IOException ioe) {System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : "+ StringUtils.stringifyException(ioe));}}public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {...}}public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {...}public int run(String[] args) throws Exception {JobConf conf = new JobConf(getConf(), WordCount2.class);conf.setJobName("wordcount2");...List<String> other_args = new ArrayList<String>();for (int i = 0; i < args.length; ++i) {if ("-skip".equals(args[i])) {DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);//添加DistributedCache} else {other_args.add(args[i]);}}FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));JobClient.runJob(conf);return 0;}


   


   


   
作者:yanxiangtianji 发表于2014-4-20 16:27:56 原文链接
阅读:0 评论:0 查看评论

相关 [hadoop 共享 信息] 推荐:

Hadoop中共享全局信息的几种方法

- - CSDN博客推荐文章
在编写Hadoop MapReduce程序的过程中有时候需要在各个Mapper或者Reducer中使用一些共享的全局数据,例如在处理整数数据表格的时候有时候需要让每个Reducer知道各个列的取值范围或是一些图算法中需要让各个Reducer知道图的连通关系. 将共享文件放在HDFS上,采用Hadoop的文件操作API访问.

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.

Hadoop使用(一)

- Pei - 博客园-首页原创精华区
Hadoop使用主/从(Master/Slave)架构,主要角色有NameNode,DataNode,secondary NameNode,JobTracker,TaskTracker组成. 其中NameNode,secondary NameNode,JobTracker运行在Master节点上,DataNode和TaskTracker运行在Slave节点上.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

Hadoop TaskScheduler浅析

- - kouu&#39;s home
TaskScheduler,顾名思义,就是MapReduce中的任务调度器. 在MapReduce中,JobTracker接收JobClient提交的Job,将它们按InputFormat的划分以及其他相关配置,生成若干个Map和Reduce任务. 然后,当一个TaskTracker通过心跳告知JobTracker自己还有空闲的任务Slot时,JobTracker就会向其分派任务.

HADOOP安装

- - OracleDBA Blog---三少个人自留地
最近有时间看看hadoop的一些东西,而且在测试的环境上做了一些搭建的工作. 首先,安装前需要做一些准备工作. 使用一台pcserver作为测试服务器,同时使用Oracle VM VirtualBox来作为虚拟机的服务器. 新建了三个虚拟机以后,安装linux,我安装的linux的版本是redhat linux 5.4 x64版本.

Hadoop Corona介绍

- - 董的博客
Dong | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/hadoop-corona/hadoop-corona/. Hadoop Corona是facebook开源的下一代MapReduce框架. 其基本设计动机和Apache的YARN一致,在此不再重复,读者可参考我的这篇文章 “下一代Apache Hadoop MapReduce框架的架构”.

Hadoop RPC机制

- - 企业架构 - ITeye博客
RPC(Remote Procedure Call Protocol)远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议. Hadoop底层的交互都是通过 rpc进行的. 例如:datanode和namenode 、tasktracker和jobtracker、secondary namenode和namenode之间的通信都是通过rpc实现的.

Hadoop Rumen介绍

- - 董的博客
Dong | 新浪微博: 西成懂 | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/mapreduce/hadoop-rumen-introduction/. 什么是Hadoop Rumen?. Hadoop Rumen是为Hadoop MapReduce设计的日志解析和分析工具,它能够将JobHistory 日志解析成有意义的数据并格式化存储.

Hadoop contrib介绍

- - 董的博客
Dong | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/mapreduce/hadoop-contrib/. Hadoop Contrib是Hadoop代码中第三方公司贡献的工具包,一般作为Hadoop kernel的扩展功能,它包含多个非常有用的扩展包,本文以Hadoop 1.0为例对Hadoop Contrib中的各个工具包进行介绍.