MapReuce 编程总结-多MapReduce执行

标签: mapreuce 编程 mapreduce | 发表时间:2013-10-11 17:37 | 作者:WeiJonathan
出处:http://blog.csdn.net

学习hadoop,必不可少的就是写MapReduce程序,当然,对于简单的分析程序,我们只需一个MapReduce就能搞定,这里就不提单MapReuce的情况了,网上例子很多,大家可以百度Google一下。对于比较复杂的分析程序,我们可能需要多个Job或者多个Map或者Reduce进行分析计算。

        多Job或者多MapReduce的编程形式有以下几种:

1、迭代式MapReduce

MapReduce迭代方式,通常是前一个MapReduce的输出作为下一个MapReduce的输入,最终可只保留最终结果,中间数据可以删除或保留,根据业务需要自己决定

        示例代码如下:

        

Configuration conf = new Configuration();
//first Job
Job job1 = new Job(conf,"job1");
.....
FileInputFormat.addInputPath(job1,InputPaht1);
FileOutputFromat.setOutputPath(job1,Outpath1);
job1.waitForCompletion(true);
//second Mapreduce
Job job2 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job2,Outpath1);
FileOutputFromat.setOutputPath(job2,Outpath2);
job2.waitForCompletion(true);
//third Mapreduce
Job job3 = new Job(conf1,"job1");
.....
FileInputFormat.addInputPath(job3,Outpath2);
FileOutputFromat.setOutputPath(job3,Outpath3);
job3.waitForCompletion(true);
.....

下面列举一个mahout怎样运用mapreduce迭代的,下面的代码快就是mahout中kmeans的算法的代码,在main函数中用一个while循环来做mapreduce的迭代,其中:runIteration()是一次mapreduce的过程。

但个人感觉现在的mapreduce迭代设计不太满意的地方。

1. 每次迭代,如果所有Job(task)重复创建,代价将非常高。

2.每次迭代,数据都写入本地和读取本地,I/O和网络传输的代价比较大。

好像Twister和Haloop的模型能过比较好的解决这些问题,但他们抽象度不够高,支持的计算有限。

期待着下个版本hadoop更好的支持迭代算法。

//main function
while (!converged && iteration <= maxIterations) {
      log.info("K-Means Iteration {}", iteration);
      // point the output to a new directory per iteration
      Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration);
      converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta);
      // now point the input to the old output directory
      clustersIn = clustersOut;
      iteration++;
}

  private static boolean runIteration(Configuration conf,
                                      Path input,
                                      Path clustersIn,
                                      Path clustersOut,
                                      String measureClass,
                                      String convergenceDelta)
    throws IOException, InterruptedException, ClassNotFoundException {

    conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString());
    conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass);
    conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta);

    Job job = new Job(conf, "KMeans Driver running runIteration over clustersIn: " + clustersIn);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(ClusterObservations.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Cluster.class);

    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setMapperClass(KMeansMapper.class);
    job.setCombinerClass(KMeansCombiner.class);
    job.setReducerClass(KMeansReducer.class);

    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, clustersOut);

    job.setJarByClass(KMeansDriver.class);
    HadoopUtil.delete(conf, clustersOut);
    if (!job.waitForCompletion(true)) {
      throw new InterruptedException("K-Means Iteration failed processing " + clustersIn);
    }
    FileSystem fs = FileSystem.get(clustersOut.toUri(), conf);

    return isConverged(clustersOut, conf, fs);
  }

2、依赖关系式MapReuce-JobControl

依赖关系式主要是由JobControl来实现,JobControl由两个类组成:Job和JobControl。其中,Job类封装了一个MapReduce作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新自己的状态。

JobControl包含了一个线程用于周期性的监控和更新各个作业的运行状态,调度依赖作业运行完成的作业,提交处于READY状态的作业等,同事,还提供了一些API用于挂起、回复和暂停该线程。

示例代码如下:

Configuration job1conf = new Configuration();
Job job1 = new Job(job1conf,"Job1");
.........//job1 其他设置
Configuration job2conf = new Configuration();
Job job2 = new Job(job2conf,"Job2");
.........//job2 其他设置
Configuration job3conf = new Configuration();
Job job3 = new Job(job3conf,"Job3");
.........//job3 其他设置
job3.addDepending(job1);//设置job3和job1的依赖关系
job3.addDepending(job2);
JobControl JC = new JobControl("123");
JC.addJob(job1);//把三个job加入到jobcontorl中
JC.addJob(job2);
JC.addJob(job3);
JC.run();

3、线性链式MapReduce-ChainMapper/ChainReduce

ChainMapper/ChainReduce主要为了解决线性链式Mapper提出的。在Map或者Reduce阶段存在多个Mapper,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,行程流水线。

需要注意的是,对于任意一个MapReduce作业,Map和Reduce阶段可以有无线个Mapper,但是Reduce只能有一个。所以包含多个Reduce的作业,不能使用ChainMapper/ChainReduce来完成。

代码如下:

...
conf.setJobName("chain");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

JobConf mapper1Conf=new JobConf(false);
JobConf mapper2Conf=new JobConf(false);
JobConf redduce1Conf=new JobConf(false);
JobConf mappe3Conf=new JobConf(false);
...
ChainMapper.addMapper(conf,Mapper1.class,LongWritable.class,Text.class,Text.class,Text.class,true,mapper1Conf);
ChainMapper.addMapper(conf,Mapper2.class,Text.class,Text.class,LongWritable.class,Text.class,false,mapper2Conf);
ChainReducer.setReduce(conf,Reducer.class,LongWritable.class,Text.class,Text.class,Text.class,true,reduce1Conf);
ChainReducer.addMapper(conf,Mapper3.class,Text.class,Text.class,LongWritable.class,Text.class,true,mapper3Conf);
JobClient.runJob(conf);

4、子Job式MapReduce

子Job式其实也是迭代式中的一种,我这里单独的提取出来了,说白了,就是一个父Job包含多个子Job。

在nutch中,Crawler是一个父Job,通过run方法中调用runTool工具进行子Job的调用,而runTool是通过反射来调用子Job执行。

下面来看下Nutch里面是如何实现的

....
private NutchTool currentTool = null;
....
private Map<String, Object> runTool(Class<? extends NutchTool> toolClass,
			Map<String, Object> args) throws Exception {
		currentTool = (NutchTool) ReflectionUtils.newInstance(toolClass,
				getConf());
		return currentTool.run(args);
	}
...
@Override
	public Map<String, Object> run(Map<String, Object> args) throws Exception {
		results.clear();
		status.clear();
		String crawlId = (String) args.get(Nutch.ARG_CRAWL);
		if (crawlId != null) {
			getConf().set(Nutch.CRAWL_ID_KEY, crawlId);
		}
		String seedDir = null;
		String seedList = (String) args.get(Nutch.ARG_SEEDLIST);
		if (seedList != null) { // takes precedence
			String[] seeds = seedList.split("\\s+");
			// create tmp. dir
			String tmpSeedDir = getConf().get("hadoop.tmp.dir") + "/seed-"
					+ System.currentTimeMillis();
			FileSystem fs = FileSystem.get(getConf());
			Path p = new Path(tmpSeedDir);
			fs.mkdirs(p);
			Path seedOut = new Path(p, "urls");
			OutputStream os = fs.create(seedOut);
			for (String s : seeds) {
				os.write(s.getBytes());
				os.write('\n');
			}
			os.flush();
			os.close();
			cleanSeedDir = true;
			seedDir = tmpSeedDir;
		} else {
			seedDir = (String) args.get(Nutch.ARG_SEEDDIR);
		}
		Integer depth = (Integer) args.get(Nutch.ARG_DEPTH);
		if (depth == null)
			depth = 1;
		boolean parse = getConf().getBoolean(FetcherJob.PARSE_KEY, false);
		String solrUrl = (String) args.get(Nutch.ARG_SOLR);
		int onePhase = 3;
		if (!parse)
			onePhase++;
		float totalPhases = depth * onePhase;
		if (seedDir != null)
			totalPhases++;
		float phase = 0;
		Map<String, Object> jobRes = null;
		LinkedHashMap<String, Object> subTools = new LinkedHashMap<String, Object>();
		status.put(Nutch.STAT_JOBS, subTools);
		results.put(Nutch.STAT_JOBS, subTools);
		// inject phase
		if (seedDir != null) {
			status.put(Nutch.STAT_PHASE, "inject");
			jobRes = runTool(InjectorJob.class, args);
			if (jobRes != null) {
				subTools.put("inject", jobRes);
			}
			status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
			if (cleanSeedDir && tmpSeedDir != null) {
				LOG.info(" - cleaning tmp seed list in " + tmpSeedDir);
				FileSystem.get(getConf()).delete(new Path(tmpSeedDir), true);
			}
		}
		if (shouldStop) {
			return results;
		}
		// run "depth" cycles
		for (int i = 0; i < depth; i++) {
			
			status.put(Nutch.STAT_PHASE, "generate " + i);
			jobRes = runTool(GeneratorJob.class, args);
			if (jobRes != null) {
				subTools.put("generate " + i, jobRes);
			}

			status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
			if (shouldStop) {
				return results;
			}
			status.put(Nutch.STAT_PHASE, "fetch " + i);
			jobRes = runTool(FetcherJob.class, args);
			if (jobRes != null) {
				subTools.put("fetch " + i, jobRes);
			}
			status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
			if (shouldStop) {
				return results;
			}
			if (!parse) {
				status.put(Nutch.STAT_PHASE, "parse " + i);
				jobRes = runTool(ParserJob.class, args);
				if (jobRes != null) {
					subTools.put("parse " + i, jobRes);
				}
				status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
				if (shouldStop) {
					return results;
				}
			}
			status.put(Nutch.STAT_PHASE, "updatedb " + i);
			jobRes = runTool(DbUpdaterJob.class, args);
			if (jobRes != null) {
				subTools.put("updatedb " + i, jobRes);
			}
			status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
			if (shouldStop) {
				return results;
			}
		}
		if (solrUrl != null) {
			status.put(Nutch.STAT_PHASE, "index");
			jobRes = runTool(SolrIndexerJob.class, args);
			if (jobRes != null) {
				subTools.put("index", jobRes);
			}
		}
		return results;
	}


作者:WeiJonathan 发表于2013-10-11 9:37:14 原文链接
阅读:133 评论:1 查看评论

相关 [mapreuce 编程 mapreduce] 推荐:

MapReuce 编程总结-多MapReduce执行

- - CSDN博客云计算推荐文章
学习hadoop,必不可少的就是写MapReduce程序,当然,对于简单的分析程序,我们只需一个MapReduce就能搞定,这里就不提单MapReuce的情况了,网上例子很多,大家可以百度Google一下. 对于比较复杂的分析程序,我们可能需要多个Job或者多个Map或者Reduce进行分析计算.         多Job或者多MapReduce的编程形式有以下几种:.

MapReduce编程模型

- - CSDN博客云计算推荐文章
MapReduce是一个Google发明的编程模型,也是一个处理和生成超大规模数据集的算法模型的相关实现. 用户首先创建一个Map函数处理一个基于对的数据集合,输出的中间结果基于对的数据集合,然后再创建一个Reduce函数用来合并所有的具有相同中间Key值的中间Value值.

Hadoop MapReduce高级编程

- - 互联网 - ITeye博客
•combine函数把一个map函数产生的对(多个key, value)合并成一个新的. 将新的作为输入到reduce函数中,其格式与reduce函数相同. •这样可以有效的较少中间结果,减少网络传输负荷. •什么情况下可以使用Combiner.

MapReduce编程实战之“高级特性”

- - CSDN博客云计算推荐文章
本篇介绍MapReduce的一些高级特性,如计数器、数据集的排序和连接. 计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的“”连接(join)操作. 计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计. 计数器还可用于辅助诊断系统故障.

Hadoop MapReduce编程入门案例

- - CSDN博客云计算推荐文章
Hadoop入门例程简析中. (下面的程序下载地址: http://download.csdn.net/detail/zpcandzhj/7810829). (1)Hadoop新旧API的区别. 新的API倾向于使用虚类(抽象类),而不是接口,因为这更容易扩展. 例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现).

MapReduce 编程之 倒排索引

- - CSDN博客云计算推荐文章
本文调试环境: ubuntu 10.04 , hadoop-1.0.2. hadoop装的是伪分布模式,就是只有一个节点,集namenode, datanode, jobtracker, tasktracker...于一体. 本文实现了简单的倒排索引,单词,文档路径,词频,重要的解释都会在代码注视中.

文章: Apache Crunch:用于简化MapReduce编程的Java库

- - InfoQ cn
Apache Crunch(孵化器项目)是基于Google的 FlumeJava库编写的Java库,用于创建MapReduce流水线. 与其他用来创建MapReduce作业的高层工具(如Apache Hive、Apache Pig和Cascading等)类似,Crunch提供了用于实现如连接数据、执行聚合和排序记录等常见任务的模式库.

Mapreduce小结

- MAGI-CASPER/Peter Pan - 博客园-唯有前进值得敬仰
读完mapreduce论文小结一下. 1.MapReduce是一个编程模型,封装了并行计算、容错、数据分布、负载均衡等细节问题. 输入是一个key-value对的集合,中间输出也是key-value对的集合,用户使用两个函数:Map和Reduce. Map函数接受一个输入的key-value对,然后产生一个中间key-value 对的集合.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

MapReduce原理

- - C++博客-牵着老婆满街逛
       MapReduce 是由Google公司的Jeffrey Dean 和 Sanjay Ghemawat 开发的一个针对大规模群组中的海量数据处理的分布式编程模型. MapReduce实现了两个功能. Map把一个函数应用于集合中的所有成员,然后返回一个基于这个处理的结果集. 而Reduce是把从两个或更多个Map中,通过多个线程,进程或者独立系统并行执行处理的结果集进行分类和归纳.