MapReuce 编程总结-多MapReduce执行
学习hadoop,必不可少的就是写MapReduce程序,当然,对于简单的分析程序,我们只需一个MapReduce就能搞定,这里就不提单MapReuce的情况了,网上例子很多,大家可以百度Google一下。对于比较复杂的分析程序,我们可能需要多个Job或者多个Map或者Reduce进行分析计算。
多Job或者多MapReduce的编程形式有以下几种:
1、迭代式MapReduce
MapReduce迭代方式,通常是前一个MapReduce的输出作为下一个MapReduce的输入,最终可只保留最终结果,中间数据可以删除或保留,根据业务需要自己决定
示例代码如下:
Configuration conf = new Configuration(); //first Job Job job1 = new Job(conf,"job1"); ..... FileInputFormat.addInputPath(job1,InputPaht1); FileOutputFromat.setOutputPath(job1,Outpath1); job1.waitForCompletion(true); //second Mapreduce Job job2 = new Job(conf1,"job1"); ..... FileInputFormat.addInputPath(job2,Outpath1); FileOutputFromat.setOutputPath(job2,Outpath2); job2.waitForCompletion(true); //third Mapreduce Job job3 = new Job(conf1,"job1"); ..... FileInputFormat.addInputPath(job3,Outpath2); FileOutputFromat.setOutputPath(job3,Outpath3); job3.waitForCompletion(true); .....
下面列举一个mahout怎样运用mapreduce迭代的,下面的代码快就是mahout中kmeans的算法的代码,在main函数中用一个while循环来做mapreduce的迭代,其中:runIteration()是一次mapreduce的过程。
但个人感觉现在的mapreduce迭代设计不太满意的地方。
1. 每次迭代,如果所有Job(task)重复创建,代价将非常高。
2.每次迭代,数据都写入本地和读取本地,I/O和网络传输的代价比较大。
好像Twister和Haloop的模型能过比较好的解决这些问题,但他们抽象度不够高,支持的计算有限。
期待着下个版本hadoop更好的支持迭代算法。
//main function while (!converged && iteration <= maxIterations) { log.info("K-Means Iteration {}", iteration); // point the output to a new directory per iteration Path clustersOut = new Path(output, AbstractCluster.CLUSTERS_DIR + iteration); converged = runIteration(conf, input, clustersIn, clustersOut, measure.getClass().getName(), delta); // now point the input to the old output directory clustersIn = clustersOut; iteration++; } private static boolean runIteration(Configuration conf, Path input, Path clustersIn, Path clustersOut, String measureClass, String convergenceDelta) throws IOException, InterruptedException, ClassNotFoundException { conf.set(KMeansConfigKeys.CLUSTER_PATH_KEY, clustersIn.toString()); conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measureClass); conf.set(KMeansConfigKeys.CLUSTER_CONVERGENCE_KEY, convergenceDelta); Job job = new Job(conf, "KMeans Driver running runIteration over clustersIn: " + clustersIn); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(ClusterObservations.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Cluster.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(KMeansMapper.class); job.setCombinerClass(KMeansCombiner.class); job.setReducerClass(KMeansReducer.class); FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, clustersOut); job.setJarByClass(KMeansDriver.class); HadoopUtil.delete(conf, clustersOut); if (!job.waitForCompletion(true)) { throw new InterruptedException("K-Means Iteration failed processing " + clustersIn); } FileSystem fs = FileSystem.get(clustersOut.toUri(), conf); return isConverged(clustersOut, conf, fs); }
2、依赖关系式MapReuce-JobControl
依赖关系式主要是由JobControl来实现,JobControl由两个类组成:Job和JobControl。其中,Job类封装了一个MapReduce作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新自己的状态。
JobControl包含了一个线程用于周期性的监控和更新各个作业的运行状态,调度依赖作业运行完成的作业,提交处于READY状态的作业等,同事,还提供了一些API用于挂起、回复和暂停该线程。
示例代码如下:
Configuration job1conf = new Configuration(); Job job1 = new Job(job1conf,"Job1"); .........//job1 其他设置 Configuration job2conf = new Configuration(); Job job2 = new Job(job2conf,"Job2"); .........//job2 其他设置 Configuration job3conf = new Configuration(); Job job3 = new Job(job3conf,"Job3"); .........//job3 其他设置 job3.addDepending(job1);//设置job3和job1的依赖关系 job3.addDepending(job2); JobControl JC = new JobControl("123"); JC.addJob(job1);//把三个job加入到jobcontorl中 JC.addJob(job2); JC.addJob(job3); JC.run();
3、线性链式MapReduce-ChainMapper/ChainReduce
ChainMapper/ChainReduce主要为了解决线性链式Mapper提出的。在Map或者Reduce阶段存在多个Mapper,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,行程流水线。
需要注意的是,对于任意一个MapReduce作业,Map和Reduce阶段可以有无线个Mapper,但是Reduce只能有一个。所以包含多个Reduce的作业,不能使用ChainMapper/ChainReduce来完成。
代码如下:
... conf.setJobName("chain"); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); JobConf mapper1Conf=new JobConf(false); JobConf mapper2Conf=new JobConf(false); JobConf redduce1Conf=new JobConf(false); JobConf mappe3Conf=new JobConf(false); ... ChainMapper.addMapper(conf,Mapper1.class,LongWritable.class,Text.class,Text.class,Text.class,true,mapper1Conf); ChainMapper.addMapper(conf,Mapper2.class,Text.class,Text.class,LongWritable.class,Text.class,false,mapper2Conf); ChainReducer.setReduce(conf,Reducer.class,LongWritable.class,Text.class,Text.class,Text.class,true,reduce1Conf); ChainReducer.addMapper(conf,Mapper3.class,Text.class,Text.class,LongWritable.class,Text.class,true,mapper3Conf); JobClient.runJob(conf);
4、子Job式MapReduce
子Job式其实也是迭代式中的一种,我这里单独的提取出来了,说白了,就是一个父Job包含多个子Job。
在nutch中,Crawler是一个父Job,通过run方法中调用runTool工具进行子Job的调用,而runTool是通过反射来调用子Job执行。
下面来看下Nutch里面是如何实现的
.... private NutchTool currentTool = null; .... private Map<String, Object> runTool(Class<? extends NutchTool> toolClass, Map<String, Object> args) throws Exception { currentTool = (NutchTool) ReflectionUtils.newInstance(toolClass, getConf()); return currentTool.run(args); } ... @Override public Map<String, Object> run(Map<String, Object> args) throws Exception { results.clear(); status.clear(); String crawlId = (String) args.get(Nutch.ARG_CRAWL); if (crawlId != null) { getConf().set(Nutch.CRAWL_ID_KEY, crawlId); } String seedDir = null; String seedList = (String) args.get(Nutch.ARG_SEEDLIST); if (seedList != null) { // takes precedence String[] seeds = seedList.split("\\s+"); // create tmp. dir String tmpSeedDir = getConf().get("hadoop.tmp.dir") + "/seed-" + System.currentTimeMillis(); FileSystem fs = FileSystem.get(getConf()); Path p = new Path(tmpSeedDir); fs.mkdirs(p); Path seedOut = new Path(p, "urls"); OutputStream os = fs.create(seedOut); for (String s : seeds) { os.write(s.getBytes()); os.write('\n'); } os.flush(); os.close(); cleanSeedDir = true; seedDir = tmpSeedDir; } else { seedDir = (String) args.get(Nutch.ARG_SEEDDIR); } Integer depth = (Integer) args.get(Nutch.ARG_DEPTH); if (depth == null) depth = 1; boolean parse = getConf().getBoolean(FetcherJob.PARSE_KEY, false); String solrUrl = (String) args.get(Nutch.ARG_SOLR); int onePhase = 3; if (!parse) onePhase++; float totalPhases = depth * onePhase; if (seedDir != null) totalPhases++; float phase = 0; Map<String, Object> jobRes = null; LinkedHashMap<String, Object> subTools = new LinkedHashMap<String, Object>(); status.put(Nutch.STAT_JOBS, subTools); results.put(Nutch.STAT_JOBS, subTools); // inject phase if (seedDir != null) { status.put(Nutch.STAT_PHASE, "inject"); jobRes = runTool(InjectorJob.class, args); if (jobRes != null) { subTools.put("inject", jobRes); } status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases); if (cleanSeedDir && tmpSeedDir != null) { LOG.info(" - cleaning tmp seed list in " + tmpSeedDir); FileSystem.get(getConf()).delete(new Path(tmpSeedDir), true); } } if (shouldStop) { return results; } // run "depth" cycles for (int i = 0; i < depth; i++) { status.put(Nutch.STAT_PHASE, "generate " + i); jobRes = runTool(GeneratorJob.class, args); if (jobRes != null) { subTools.put("generate " + i, jobRes); } status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases); if (shouldStop) { return results; } status.put(Nutch.STAT_PHASE, "fetch " + i); jobRes = runTool(FetcherJob.class, args); if (jobRes != null) { subTools.put("fetch " + i, jobRes); } status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases); if (shouldStop) { return results; } if (!parse) { status.put(Nutch.STAT_PHASE, "parse " + i); jobRes = runTool(ParserJob.class, args); if (jobRes != null) { subTools.put("parse " + i, jobRes); } status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases); if (shouldStop) { return results; } } status.put(Nutch.STAT_PHASE, "updatedb " + i); jobRes = runTool(DbUpdaterJob.class, args); if (jobRes != null) { subTools.put("updatedb " + i, jobRes); } status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases); if (shouldStop) { return results; } } if (solrUrl != null) { status.put(Nutch.STAT_PHASE, "index"); jobRes = runTool(SolrIndexerJob.class, args); if (jobRes != null) { subTools.put("index", jobRes); } } return results; }