MapReduce调度与执行原理之任务调度

标签: mapreduce 调度 原理 | 发表时间:2013-09-14 00:45 | 作者:jtz_MPP
出处:http://blog.csdn.net
前言:本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教。本文不涉及Hadoop的架构设计,如有兴趣请参考相关书籍和文献。在梳理过程中,我对一些感兴趣的源码也会逐行研究学习,以期强化基础。
作者:Jaytalent
开始日期:2013年9月9日
参考资料:【1】《Hadoop技术内幕--深入解析MapReduce架构设计与实现原理》董西成
                  【2】   Hadoop 1.0.0 源码
                            【3】《Hadoop技术内幕--深入解析Hadoop Common和HDFS架构设计与实现原理》蔡斌 陈湘萍
上一篇文章谈到作业的初始化,经过初始化的作业就可以被调度器调度并开始执行了。这篇文章关注调度器的工作原理。
一个MapReduce作业的生命周期大体分为5个阶段【1】:
1. 作业提交与初始化
2. 任务调度与监控
3. 任务运行环境准备
4. 任务执行
5. 作业完成
我们假设JobTracker已经启动,那么调度器是怎么启动的?JobTracker在启动时有以下代码:
JobTracker tracker = startTracker(new JobConf());
tracker.offerService();
其中offerService方法负责启动JobTracker提供的各个服务,有这样一行代码:
taskScheduler.start();
taskScheduler即为任务调度器。start方法是抽象类TaskScheduler提供的接口,用于启动调度器。每个调度器类都要继承TaskScheduler类。回忆一下,调度器启动时会将各个监听器对象注册到JobTracker,以FIFO调度器JobQueueTaskScheduler为例:
  @Override
  public synchronized void start() throws IOException {
    super.start();
    taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
    eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
    eagerTaskInitializationListener.start();
    taskTrackerManager.addJobInProgressListener(
        eagerTaskInitializationListener);
  }
这里注册了两个监听器,其中eagerTaskInitializationListener负责作业初始化,而jobQueueJobInProgressListener则负责作业的执行和监控。当有作业提交到JobTracker时,JobTracker会执行所有订阅它消息的监听器的jobAdded方法。对于eagerTaskInitializationListener来说:
  @Override
  public void jobAdded(JobInProgress job) {
    synchronized (jobInitQueue) {
      jobInitQueue.add(job);
      resortInitQueue();
      jobInitQueue.notifyAll();
    }
  }
提交的作业的JobInProgress对象被添加到作业初始化队列jobInitQueue中,并唤醒初始化线程(若原来没有作业可以初始化):
class JobInitManager implements Runnable {
    public void run() {
      JobInProgress job = null;
      while (true) {
        try {
          synchronized (jobInitQueue) {
            while (jobInitQueue.isEmpty()) {
              jobInitQueue.wait();
            }
            job = jobInitQueue.remove(0);
          }
          threadPool.execute(new InitJob(job));
        } catch (InterruptedException t) {
          LOG.info("JobInitManagerThread interrupted.");
          break;
        } 
      }
      threadPool.shutdownNow();
    }
  }
这种工作方式是一种“生产者-消费者”模式:作业初始化线程是消费者,而监听器eagerTaskInitializationListener是生产者。这里可以有多个消费者线程,放到一个固定资源的线程池中,线程个数通过mapred.jobinit.threads参数配置,默认为4个。
下面我们重点来看调度器中的另一个监听器。jobQueueJobInProgressListener对象在调度器中初始化时连续执行了两个构造器完成初始化:
 public JobQueueJobInProgressListener() {
    this(new TreeMap<JobSchedulingInfo, 
                     JobInProgress>(FIFO_JOB_QUEUE_COMPARATOR));
  }
  /**
   * For clients that want to provide their own job priorities.
   * @param jobQueue A collection whose iterator returns jobs in priority order.
   */
  protected JobQueueJobInProgressListener(Map<JobSchedulingInfo, 
                                          JobInProgress> jobQueue) {
    this.jobQueue = Collections.synchronizedMap(jobQueue);
  }
其中,第一个构造器调用重载的第二个构造器。可以看到,调度器使用一个队列jobQueue来保存提交的作业。这个队列使用一个TreeMap对象实现,TreeMap的特点是底层使用红黑树实现,可以按照键来排序,并且由于是平衡树,效率较高。作为键的是一个JobSchedulingInfo对象,作为值就是提交的作业对应的JobInProgress对象。另外,由于TreeMap本身不是线程安全的,这里使用了集合类的同步方法构造了一个线程安全的Map。使用带有排序功能的数据结构的目的是使作业在队列中按照优先级的大小排列,这样每次调度器只需从队列头部获得作业即可。
作业的顺序由优先级决定,而优先级信息包含在JobSchedulingInfo对象中:
static class JobSchedulingInfo {
    private JobPriority priority;
    private long startTime;
    private JobID id;
    ...
}
该对象包含了作业的优先级、ID和开始时间等信息。在Hadoop中,作业的优先级有以下五种:VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOW。这些字段是通过作业的JobStatus对象初始化的。由于该对象作为TreeMap的键,因此要实现自己的equals方法和hashCode方法:
    @Override
    public boolean equals(Object obj) {
      if (obj == null || obj.getClass() != JobSchedulingInfo.class) {
        return false;
      } else if (obj == this) {
        return true;
      }
      else if (obj instanceof JobSchedulingInfo) {
        JobSchedulingInfo that = (JobSchedulingInfo)obj;
        return (this.id.equals(that.id) && 
                this.startTime == that.startTime && 
                this.priority == that.priority);
      }
      return false;
    }
我们看到,两个JobSchedulingInfo对象相等的条件是类型一致,并且作业ID、开始时间和优先级都相等。hashCode的计算比较简单:
    @Override
    public int hashCode() {
      return (int)(id.hashCode() * priority.hashCode() + startTime);
    }
注意,监听器的第一个构造器有一个比较器参数,用于定义JobSchedulingInfo的比较方式:
    static final Comparator<JobSchedulingInfo> FIFO_JOB_QUEUE_COMPARATOR
    = new Comparator<JobSchedulingInfo>() {
    public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
      int res = o1.getPriority().compareTo(o2.getPriority());
      if (res == 0) {
        if (o1.getStartTime() < o2.getStartTime()) {
          res = -1;
        } else {
          res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
        }
      }
      if (res == 0) {
        res = o1.getJobID().compareTo(o2.getJobID());
      }
      return res;
    }
  };
从上面看出,首先比较作业的优先级,若优先级相等则比较开始时间(FIFO),若再相等则比较作业ID。我们在实现自己的调度器时可能要定义自己的作业队列,那么作业在队列中的顺序(即JobSchedulingInfo的比较器)就要仔细定义,这是调度器能够正常运行基础。
Hadoop中的作业调度采用pull方式,即TaskTracker定时向JobTracker发送心跳信息索取一个新的任务,这些信息包括数据结点上作业和任务的运行情况,以及该TaskTracker上的资源使用情况。JobTracker会依据以上信息更新作业队列的状态,并调用调度器选择一个或多个任务以心跳响应的形式返回给TaskTracker。从上面描述可以看出,JobTracker和taskScheduler之间的互相利用关系:前者利用后者为TaskTracker分配任务;后者利用前者更新队列和作业信息。接下来,我们一步步详述该过程。
首先,当一个心跳到达JobTracker时(实际上这是一个来自TaskTracker的远程过程调用heartbeat方法,协议接口是InterTrackerProtocol),会执行两种动作:更新状态和下达命令【1】。下达命令稍后关注。有关更新状态的一些代码片段如下:
    if (!processHeartbeat(status, initialContact, now)) {
      if (prevHeartbeatResponse != null) {
        trackerToHeartbeatResponseMap.remove(trackerName);
      }
      return new HeartbeatResponse(newResponseId, 
                   new TaskTrackerAction[] {new ReinitTrackerAction()});
    }
具体的心跳处理,由私有函数processHeartbeat完成。该函数中有以下两个方法调用:
    updateTaskStatuses(trackerStatus);
    updateNodeHealthStatus(trackerStatus, timeStamp);
分别用来更新任务的状态和结点的健康状态。在第一个方法中有下面代码片段:
      TaskInProgress tip = taskidToTIPMap.get(taskId);
      // Check if the tip is known to the jobtracker. In case of a restarted
      // jt, some tasks might join in later
      if (tip != null || hasRestarted()) {
        if (tip == null) {
          tip = job.getTaskInProgress(taskId.getTaskID());
          job.addRunningTaskToTIP(tip, taskId, status, false);
        }
        
        // Update the job and inform the listeners if necessary
        JobStatus prevStatus = (JobStatus)job.getStatus().clone();
        // Clone TaskStatus object here, because JobInProgress
        // or TaskInProgress can modify this object and
        // the changes should not get reflected in TaskTrackerStatus.
        // An old TaskTrackerStatus is used later in countMapTasks, etc.
        job.updateTaskStatus(tip, (TaskStatus)report.clone());
        JobStatus newStatus = (JobStatus)job.getStatus().clone();
        // Update the listeners if an incomplete job completes
        if (prevStatus.getRunState() != newStatus.getRunState()) {
          JobStatusChangeEvent event = 
            new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, 
                                     prevStatus, newStatus);
          updateJobInProgressListeners(event);
        }
      } else {
        LOG.info("Serious problem.  While updating status, cannot find taskid " 
                 + report.getTaskID());
      }
这里的job对象通过从TaskTracker那里得到的task状态信息中抽取出来。注意,这里拷贝了原有作业状态的一个副本,然后修改这个副本的相关信息,调用的是updateJobStatus方法,更新任务的状态信息和JobInProgress的相关信息,如map和reduce任务的进度等,这里不展开了。这些信息的更新可以为调度器的工作提供依据。
作业状态的更新是通过updateJobInProgressListeners方法实现,该方法的参数是一个JobStatusChangeEvent对象,表示作业状态变化的事件。这种事件的类型可以是运行状态改变、开始时间改变、优先级改变等等。用户也可以根据需要自定义事件类型。事件对象维护了两个JobStatus对象,分别表示事件发生前后作业的状态。
进入该方法后,我们又看到了熟悉的观察者模式:
  // Update the listeners about the job
  // Assuming JobTracker is locked on entry.
  private void updateJobInProgressListeners(JobChangeEvent event) {
    for (JobInProgressListener listener : jobInProgressListeners) {
      listener.jobUpdated(event);
    }
  }
这次每个监听器要回调jobUpdated方法,表示作业有更新。对于jobQueueJobInProgressListener来说是这样做的:
  @Override
  public synchronized void jobUpdated(JobChangeEvent event) {
    JobInProgress job = event.getJobInProgress();
    if (event instanceof JobStatusChangeEvent) {
      // Check if the ordering of the job has changed
      // For now priority and start-time can change the job ordering
      JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event;
      JobSchedulingInfo oldInfo =  
        new JobSchedulingInfo(statusEvent.getOldStatus());
      if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED 
          || statusEvent.getEventType() == EventType.START_TIME_CHANGED) {
        // Make a priority change
        reorderJobs(job, oldInfo);
      } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
        // Check if the job is complete
        int runState = statusEvent.getNewStatus().getRunState();
        if (runState == JobStatus.SUCCEEDED
            || runState == JobStatus.FAILED
            || runState == JobStatus.KILLED) {
          jobCompleted(oldInfo);
        }
      }
    }
  }
首先,获取作业更新前的状态。然后根据事件的类型,进行相应的处理。比如,如果优先级变化了,则要重新排列队列中作业的顺序。这里直接取出原有作业,重新插入队列。插入后,作业会自动重新排序,体现了TreeMap的优越性。再比如,如果作业状态变为完成,那么就从队列中删除该作业。
  private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo) {
    synchronized (jobQueue) {
      jobQueue.remove(oldInfo);
      jobQueue.put(new JobSchedulingInfo(job), job);
    }
  }
下面就是调度器中最关键的一步了:任务选择。此时,作业队列中信息已经更新完毕,可以选择一些任务返回给TaskTracker执行了。heartbeat方法接下来会有这样的代码:
  List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
  if (tasks == null ) {
    tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
  }
如果不需要setup和cleanup,就说明需要选择map或reduce任务。调用TaskScheduler的assignTasks方法完成任务选择。由于篇幅限制,我打算将这部分内容放到下一篇文章中,并关注heartbeat中JobTracker下达的命令过程以及JobInProgress和TaskInProgress对调度有影响的一些字段。请看下一篇文章: MapReduce调度与执行原理之任务调度(续)















作者:jtz_MPP 发表于2013-9-13 16:45:17 原文链接
阅读:69 评论:0 查看评论

相关 [mapreduce 调度 原理] 推荐:

MapReduce调度与执行原理之任务调度

- - CSDN博客云计算推荐文章
前言:本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教. 本文不涉及Hadoop的架构设计,如有兴趣请参考相关书籍和文献. 在梳理过程中,我对一些感兴趣的源码也会逐行研究学习,以期强化基础. 作者:Jaytalent.

MapReduce原理

- - C++博客-牵着老婆满街逛
       MapReduce 是由Google公司的Jeffrey Dean 和 Sanjay Ghemawat 开发的一个针对大规模群组中的海量数据处理的分布式编程模型. MapReduce实现了两个功能. Map把一个函数应用于集合中的所有成员,然后返回一个基于这个处理的结果集. 而Reduce是把从两个或更多个Map中,通过多个线程,进程或者独立系统并行执行处理的结果集进行分类和归纳.

MapReduce调度与执行原理之作业提交

- - CSDN博客云计算推荐文章
前言:本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教. 本文不涉及Hadoop的架构设计,如有兴趣请参考相关书籍和文献. 在梳理过程中,我对一些感兴趣的源码也会逐行研究学习,以期强化基础. 作者:Jaytalent.

MapReduce工作原理图文详解

- - ITeye博客
                                                                      MapReduce工作原理图文详解. 前段时间我们云计算团队一起学习了hadoop相关的知识,大家都积极地做了、学了很多东西,收获颇丰. 可是开学后,大家都忙各自的事情,云计算方面的动静都不太大.

Mapreduce小结

- MAGI-CASPER/Peter Pan - 博客园-唯有前进值得敬仰
读完mapreduce论文小结一下. 1.MapReduce是一个编程模型,封装了并行计算、容错、数据分布、负载均衡等细节问题. 输入是一个key-value对的集合,中间输出也是key-value对的集合,用户使用两个函数:Map和Reduce. Map函数接受一个输入的key-value对,然后产生一个中间key-value 对的集合.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

MapReduce优化

- - 行业应用 - ITeye博客
相信每个程序员在 编程时都会问自己两个问题“我如何完成这个任务”,以及“怎么能让程序运行得更快”. 同样,MapReduce计算模型的多次优化也是为了更好地解答这两个问题. MapReduce计算模型的优化涉及了方方面面的内容,但是主要集中在两个方面:一是计算性能方面的优化;二是I/O操作方面的优化.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.

《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》电子版下载

- - 董的博客
Dong | 新浪微博: 西成懂 | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/mapreduce-nextgen/hadoop-internals-mapreduce/. 本博客的文章集合: http://dongxicheng.org/recommend/.

Google Percolator替代MapReduce

- Hao - Solidot
Google在新一代内容索引系统中放弃了MapReduce,替代者是尚不为人知的分布式数据处理系统Percolator. The Register报道,Percolator是一种增量处理平台,它能持续更新索引系统,无需从头重新处理一遍整个系统. Google的工程师计划在下个月举行的年度USENIX Symposium 会议上公布Percolator相关论文.