MapReduce调度与执行原理之任务调度
- - CSDN博客云计算推荐文章前言:本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教. 本文不涉及Hadoop的架构设计,如有兴趣请参考相关书籍和文献. 在梳理过程中,我对一些感兴趣的源码也会逐行研究学习,以期强化基础. 作者:Jaytalent.
前言:本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教。本文不涉及Hadoop的架构设计,如有兴趣请参考相关书籍和文献。在梳理过程中,我对一些感兴趣的源码也会逐行研究学习,以期强化基础。
作者:Jaytalent
开始日期:2013年9月9日参考资料:【1】《Hadoop技术内幕--深入解析MapReduce架构设计与实现原理》董西成【2】Hadoop 1.0.0 源码
Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);
JobID jobId = jobSubmitClient.getNewJobId();作业ID时从JobTracker获取的,这是一次RPC调用,方法为getNewJobId,定义在JobSubmissionProtocol接口。
private JobSubmissionProtocol jobSubmitClient;
/** * Allocate a name for the job. * @return a unique job name for submitting jobs. * @throws IOException */ public JobID getNewJobId() throws IOException;用户使用该协议通过JobTracker提交作业,查看作业状态等。
copyAndConfigureFiles(jobCopy, submitJobDir);在配置了提交副本数(mapred.submit.replication,默认为10)等信息后,主要代码分析如下(为了清晰起见,省略了一些日志和异常处理):
// Retrieve command line arguments placed into the JobConf // by GenericOptionsParser. String files = job.get("tmpfiles"); String libjars = job.get("tmpjars"); String archives = job.get("tmparchives");首先,从配置中获取不同类型文件的名称和路径,这些配置在作业提交时从命令行(Hadoop Shell)指定。files表示作业依赖的普通文件,比如文本文件;libjars表示应用程序依赖的第三方jar包;archives表示应用程序使用的多个文件打包而成的压缩文件。
// Create a number of filenames in the JobTracker's fs namespace FileSystem fs = submitJobDir.getFileSystem(job); submitJobDir = fs.makeQualified(submitJobDir); FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms); Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);接下来,在JobTracker的文件系统(通常为HDFS)的命名空间创建一系列文件路径名,其中包括前述三种文件类型。
// add all the command line files/ jars and archive // first copy them to jobtrackers filesystem if (files != null) { FileSystem.mkdirs(fs, filesDir, mapredSysPerms); String[] fileArr = files.split(","); for (String tmpFile: fileArr) { URI tmpURI; tmpURI = new URI(tmpFile); Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication); URI pathURI = getPathURI(newPath, tmpURI.getFragment()); DistributedCache.addCacheFile(pathURI, job); DistributedCache.createSymlink(job); } } if (libjars != null) { FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms); String[] libjarsArr = libjars.split(","); for (String tmpjars: libjarsArr) { Path tmp = new Path(tmpjars); Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication); DistributedCache.addArchiveToClassPath (new Path(newPath.toUri().getPath()), job, fs); } } if (archives != null) { FileSystem.mkdirs(fs, archivesDir, mapredSysPerms); String[] archivesArr = archives.split(","); for (String tmpArchives: archivesArr) { URI tmpURI; tmpURI = new URI(tmpArchives); Path tmp = new Path(tmpURI); Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication); URI pathURI = getPathURI(newPath, tmpURI.getFragment()); DistributedCache.addCacheArchive(pathURI, job); DistributedCache.createSymlink(job); }注意,MapReduce作业文件的上传和下载是通过DistributedCache工具完成的,它是一个数据分发工具。用户指定的文件会被分发到各个TaskTracker上以运行Task。这里暂不涉及该工具的细节,留待日后讨论。
String originalJarPath = job.getJar(); if (originalJarPath != null) { // copy jar to JobTracker's fs // use jar name if job is not named. if ("".equals(job.getJobName())){ job.setJobName(new Path(originalJarPath).getName()); } Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir); job.setJar(submitJarFile.toString()); fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile); fs.setReplication(submitJarFile, replication); fs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); }注意,在每次上传一种类型的文件后,都会将这种文件的路径配置到JobConf对象中,具体的工作由
DistributedCache.addCacheFile(pathURI, job); DistributedCache.addArchiveToClassPath(new Path(newPath.toUri().getPath()), job, fs); DistributedCache.addCacheArchive(pathURI, job); job.setJar(submitJarFile.toString());这四行代码完成。顺便提一句,Path类Hadoop文件系统在java.net.URI的基础上抽象了文件系统中的路径【3】。Java的File类和URL类分别抽象了不同的事物,Path可以说将二者统一起来。
// Create the splits for the job FileSystem fs = submitJobDir.getFileSystem(jobCopy); int maps = writeSplits(context, submitJobDir); jobCopy.setNumMapTasks(maps);jobCopy是一个JobConf对象。其中,writeSplits方法会实际调用InputSplit.getSplits方法生成splits信息,并将splits原始信息和元信息写入HDFS对应的目录和文件中。有关split的生成过程日后研究,这里不展开了。最后,将作业对应的JobConf对象以XML配置文件形式写入到HDFS中:
// Write job file to JobTracker's fs FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); try { jobCopy.writeXml(out); } finally { out.close(); }至此,作业文件上传才算正式完毕。