MapReduce调度与执行原理之任务调度
- - CSDN博客云计算推荐文章前言:本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教. 本文不涉及Hadoop的架构设计,如有兴趣请参考相关书籍和文献. 在梳理过程中,我对一些感兴趣的源码也会逐行研究学习,以期强化基础. 作者:Jaytalent.
前言:本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教。本文不涉及Hadoop的架构设计,如有兴趣请参考相关书籍和文献。在梳理过程中,我对一些感兴趣的源码也会逐行研究学习,以期强化基础。
作者:Jaytalent
开始日期:2013年9月9日参考资料:【1】《Hadoop技术内幕--深入解析MapReduce架构设计与实现原理》董西成【2】 Hadoop 1.0.0 源码
JobTracker tracker = startTracker(new JobConf()); tracker.offerService();其中offerService方法负责启动JobTracker提供的各个服务,有这样一行代码:
taskScheduler.start();taskScheduler即为任务调度器。start方法是抽象类TaskScheduler提供的接口,用于启动调度器。每个调度器类都要继承TaskScheduler类。回忆一下,调度器启动时会将各个监听器对象注册到JobTracker,以FIFO调度器JobQueueTaskScheduler为例:
@Override public synchronized void start() throws IOException { super.start(); taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener); eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager); eagerTaskInitializationListener.start(); taskTrackerManager.addJobInProgressListener( eagerTaskInitializationListener); }这里注册了两个监听器,其中eagerTaskInitializationListener负责作业初始化,而jobQueueJobInProgressListener则负责作业的执行和监控。当有作业提交到JobTracker时,JobTracker会执行所有订阅它消息的监听器的jobAdded方法。对于eagerTaskInitializationListener来说:
@Override public void jobAdded(JobInProgress job) { synchronized (jobInitQueue) { jobInitQueue.add(job); resortInitQueue(); jobInitQueue.notifyAll(); } }提交的作业的JobInProgress对象被添加到作业初始化队列jobInitQueue中,并唤醒初始化线程(若原来没有作业可以初始化):
class JobInitManager implements Runnable { public void run() { JobInProgress job = null; while (true) { try { synchronized (jobInitQueue) { while (jobInitQueue.isEmpty()) { jobInitQueue.wait(); } job = jobInitQueue.remove(0); } threadPool.execute(new InitJob(job)); } catch (InterruptedException t) { LOG.info("JobInitManagerThread interrupted."); break; } } threadPool.shutdownNow(); } }这种工作方式是一种“生产者-消费者”模式:作业初始化线程是消费者,而监听器eagerTaskInitializationListener是生产者。这里可以有多个消费者线程,放到一个固定资源的线程池中,线程个数通过mapred.jobinit.threads参数配置,默认为4个。
public JobQueueJobInProgressListener() { this(new TreeMap<JobSchedulingInfo, JobInProgress>(FIFO_JOB_QUEUE_COMPARATOR)); } /** * For clients that want to provide their own job priorities. * @param jobQueue A collection whose iterator returns jobs in priority order. */ protected JobQueueJobInProgressListener(Map<JobSchedulingInfo, JobInProgress> jobQueue) { this.jobQueue = Collections.synchronizedMap(jobQueue); }其中,第一个构造器调用重载的第二个构造器。可以看到,调度器使用一个队列jobQueue来保存提交的作业。这个队列使用一个TreeMap对象实现,TreeMap的特点是底层使用红黑树实现,可以按照键来排序,并且由于是平衡树,效率较高。作为键的是一个JobSchedulingInfo对象,作为值就是提交的作业对应的JobInProgress对象。另外,由于TreeMap本身不是线程安全的,这里使用了集合类的同步方法构造了一个线程安全的Map。使用带有排序功能的数据结构的目的是使作业在队列中按照优先级的大小排列,这样每次调度器只需从队列头部获得作业即可。
static class JobSchedulingInfo { private JobPriority priority; private long startTime; private JobID id; ... }
@Override public boolean equals(Object obj) { if (obj == null || obj.getClass() != JobSchedulingInfo.class) { return false; } else if (obj == this) { return true; } else if (obj instanceof JobSchedulingInfo) { JobSchedulingInfo that = (JobSchedulingInfo)obj; return (this.id.equals(that.id) && this.startTime == that.startTime && this.priority == that.priority); } return false; }我们看到,两个JobSchedulingInfo对象相等的条件是类型一致,并且作业ID、开始时间和优先级都相等。hashCode的计算比较简单:
@Override public int hashCode() { return (int)(id.hashCode() * priority.hashCode() + startTime); }注意,监听器的第一个构造器有一个比较器参数,用于定义JobSchedulingInfo的比较方式:
static final Comparator<JobSchedulingInfo> FIFO_JOB_QUEUE_COMPARATOR = new Comparator<JobSchedulingInfo>() { public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) { int res = o1.getPriority().compareTo(o2.getPriority()); if (res == 0) { if (o1.getStartTime() < o2.getStartTime()) { res = -1; } else { res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1); } } if (res == 0) { res = o1.getJobID().compareTo(o2.getJobID()); } return res; } };从上面看出,首先比较作业的优先级,若优先级相等则比较开始时间(FIFO),若再相等则比较作业ID。我们在实现自己的调度器时可能要定义自己的作业队列,那么作业在队列中的顺序(即JobSchedulingInfo的比较器)就要仔细定义,这是调度器能够正常运行基础。
if (!processHeartbeat(status, initialContact, now)) { if (prevHeartbeatResponse != null) { trackerToHeartbeatResponseMap.remove(trackerName); } return new HeartbeatResponse(newResponseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); }具体的心跳处理,由私有函数processHeartbeat完成。该函数中有以下两个方法调用:
updateTaskStatuses(trackerStatus); updateNodeHealthStatus(trackerStatus, timeStamp);分别用来更新任务的状态和结点的健康状态。在第一个方法中有下面代码片段:
TaskInProgress tip = taskidToTIPMap.get(taskId); // Check if the tip is known to the jobtracker. In case of a restarted // jt, some tasks might join in later if (tip != null || hasRestarted()) { if (tip == null) { tip = job.getTaskInProgress(taskId.getTaskID()); job.addRunningTaskToTIP(tip, taskId, status, false); } // Update the job and inform the listeners if necessary JobStatus prevStatus = (JobStatus)job.getStatus().clone(); // Clone TaskStatus object here, because JobInProgress // or TaskInProgress can modify this object and // the changes should not get reflected in TaskTrackerStatus. // An old TaskTrackerStatus is used later in countMapTasks, etc. job.updateTaskStatus(tip, (TaskStatus)report.clone()); JobStatus newStatus = (JobStatus)job.getStatus().clone(); // Update the listeners if an incomplete job completes if (prevStatus.getRunState() != newStatus.getRunState()) { JobStatusChangeEvent event = new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, newStatus); updateJobInProgressListeners(event); } } else { LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskID()); }这里的job对象通过从TaskTracker那里得到的task状态信息中抽取出来。注意,这里拷贝了原有作业状态的一个副本,然后修改这个副本的相关信息,调用的是updateJobStatus方法,更新任务的状态信息和JobInProgress的相关信息,如map和reduce任务的进度等,这里不展开了。这些信息的更新可以为调度器的工作提供依据。
// Update the listeners about the job // Assuming JobTracker is locked on entry. private void updateJobInProgressListeners(JobChangeEvent event) { for (JobInProgressListener listener : jobInProgressListeners) { listener.jobUpdated(event); } }这次每个监听器要回调jobUpdated方法,表示作业有更新。对于jobQueueJobInProgressListener来说是这样做的:
@Override public synchronized void jobUpdated(JobChangeEvent event) { JobInProgress job = event.getJobInProgress(); if (event instanceof JobStatusChangeEvent) { // Check if the ordering of the job has changed // For now priority and start-time can change the job ordering JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event; JobSchedulingInfo oldInfo = new JobSchedulingInfo(statusEvent.getOldStatus()); if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED || statusEvent.getEventType() == EventType.START_TIME_CHANGED) { // Make a priority change reorderJobs(job, oldInfo); } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) { // Check if the job is complete int runState = statusEvent.getNewStatus().getRunState(); if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED) { jobCompleted(oldInfo); } } } }首先,获取作业更新前的状态。然后根据事件的类型,进行相应的处理。比如,如果优先级变化了,则要重新排列队列中作业的顺序。这里直接取出原有作业,重新插入队列。插入后,作业会自动重新排序,体现了TreeMap的优越性。再比如,如果作业状态变为完成,那么就从队列中删除该作业。
private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo) { synchronized (jobQueue) { jobQueue.remove(oldInfo); jobQueue.put(new JobSchedulingInfo(job), job); } }下面就是调度器中最关键的一步了:任务选择。此时,作业队列中信息已经更新完毕,可以选择一些任务返回给TaskTracker执行了。heartbeat方法接下来会有这样的代码:
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); if (tasks == null ) { tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)); }如果不需要setup和cleanup,就说明需要选择map或reduce任务。调用TaskScheduler的assignTasks方法完成任务选择。由于篇幅限制,我打算将这部分内容放到下一篇文章中,并关注heartbeat中JobTracker下达的命令过程以及JobInProgress和TaskInProgress对调度有影响的一些字段。请看下一篇文章: MapReduce调度与执行原理之任务调度(续)