如何编写Hadoop调度器

标签: MapReduce Hadoop调度器,编写Hadoop调度器 | 发表时间:2011-08-14 16:10 | 作者:Dong Guancheng(冠诚)
出处:http://dongxicheng.org

1. 编写目的

在Hadoop中,调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器,然后在配置文件中指定相应的调度器,这样,当Hadoop集群启动时,便会加载该调度器。当前Hadoop自带了几种调度器,分别是FIFO(默认调度器),Capacity Scheduler和FairScheduler,通常境况下,这些调度器很难满足公司复杂的应用需求,因而往往需要开发自己的调度器。本文介绍了Hadoop调度器的基本编写方法。

2. Hadoop调度框架

Hadoop的调度器是在JobTracker中加载和调用的,用户可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler属性中指定调度器。本节分析了Hadoop调度器的调度框架,实际上分析了两个重要类:TaskScheduler和JobTracker的关系。

(1) TaskScheduler

如果用户要编写自己的调度器,需要继承抽象类TaskScheduler,该类的接口如下:


abstract class TaskScheduler implements Configurable {

protected Configuration conf; //配置文件

protected TaskTrackerManager taskTrackerManager; //一般会设为JobTracker

public Configuration getConf() {

  return conf;

}

public void setConf(Configuration conf) {

  this.conf = conf;

}

public synchronized void setTaskTrackerManager(

TaskTrackerManager taskTrackerManager) {

  this.taskTrackerManager = taskTrackerManager;

}

public void start() throws IOException { //初始化函数,如加载配置文件等

  // do nothing

}

public void terminate() throws IOException { //结束函数

// do nothing

}

//最重要的函数,为该taskTracker分配合适的task

public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker)

throws IOException;

  //根据队列名字获job列表

public abstract Collection<JobInProgress> getJobs(String queueName);

}

(2) JobTracker

JobTracker是Hadoop最核心的组件,它监控整个集群中的作业运行情况并对资源进行管理和调度。

每个TaskTracker每个3s(默认值,可配置)通过heartbeat向JobTracker汇报自己管理的机器的一些基本信息,包括内存使用量,内存剩余量,正在运行的task,空闲的slot数目等,一旦JobTracker发现该TaskTracker出现了空闲的slot,便会调用调度器中的AssignTasks方法为该TaskTracker分配task。

下面分析JobTracker调用TaskScheduler的具体流程:


……

private final TaskScheduler taskScheduler; //声明调度器对象

……

public static JobTracker startTracker(JobConf conf, String identifier) {

  …….

  result = new JobTracker(conf, identifier);

  result.taskScheduler.setTaskTrackerManager(result); //设置调度器的manager

  ……

}

//创建调度器

JobTracker(JobConf conf, String identifier) {

  ……

  // Create the scheduler

  Class<? extends TaskScheduler> schedulerClass

  = conf.getClass("mapred.jobtracker.taskScheduler",

    JobQueueTaskScheduler.class, TaskScheduler.class);

  taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);

  …..

}

//run forever

public void offerService() {

  ……

  taskScheduler.start(); //启动调度器

  ……

}

。。。。。

HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId) {

  …….

  // Check for new tasks to be executed on the tasktracker

  if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {

    ……

    //使用调度器,为该taskTracker分配作业

    tasks = taskScheduler.assignTasks(taskTrackerStatus);

    ……

  }

}

从上面的分析可以知道,Scheduler和JobTracker之间会相互包含(实际上是组合模式),Scheduler中要包含JobTracker(实际上就是TaskTrackerManager)对象,以便获取整个Hadoop集群的一些信息,如slot总数,QueueManager对象,添加JobInProgressListener以便增加或删除job时,通知Scheduler;JobTracker中要包含Scheduler对象,以便可以对每个TaskTracker分配task。

3. 编写Hadoop调度器

假设我们要编写一个新的调度器,为MyHadoopScheduler,需要进行以下工作:

(1) 用户需要自己实现的类

@ MyHadoopSchedulerConf:配置文件管理类,读取你自己的配置文件,并保存到合适的数据结构中,一般而言,这个类应该支持动态加载配置文件。

@ MyHadoopSchedulerListener:编写自己的JobInProgressListener,并调用JobTracker的addJobInProgressListener(),将之加到系统的Listener队列中,以便系统中添加或删除job后,JobTracker可立刻告诉调度器。

@ MyHadoopScheduler:调度器的核心实现算法

(2) 用户要用到的系统类

@ JobTracker:JobTracker在startTracker函数中,会将MyHadoopScheduler的taskTrackerManager赋值为JobTracker对象,这样,在MyHadoopScheduler中,可调用Jobracker中的所有public方法和成员变量,常用的有:

$ getClusterStatus():获取集群的状态,如tasktracker列表,map slot总数,reduce slot总数,当前正在运行的map/reduce task总数等

$ getQueueManager():如果MyHadoopScheduler支持多队列,那么需要使用该方法获取QueueManager对象,通过该对象,会用可以获取系统的所有队列名称,每个队列的ACL(Access Control List),具体参考:http://hadoop.apache.org/common/docs/current/service_level_auth.html

$ killJob:可以调用该函数杀死某个job

$ killTask:如果调度器支持资源抢占,可调用该函数 杀死某个task以便进行资源抢占。

@ JobInprogress:用户向Hadoop中提交一个job后,Hadoop会为该job创建一个叫JobInProgress的对象,该对象中包含了job相关的基本信息,且它会伴随某个job的一生(与job共存亡)。该对象中包含的job信息有:该job包含的所有task的信息(如:正在运行的task列表,已经完成的task列表,尚未运行的task列表等),作业的优先级,作业的提交时间,开始运行时间,运行结束时间等信息。

在JobInprogress的task列表中,每个task以对象TaskInProgress的形式保存,该对象中包含了每个task的基本信息,包括:task要处理的数据split,task创建时间,task开始执行时间,task结束时间等信息。这些信息肯定会在调度器中使用。

@ JobConf

每个作业的运行参数和配置选项被保存到一个JobConf对象中,该对象包含了配置文件mapred-site.xml,core-site.xml和hdfs-site.xml设置的选项和该作业的特有属性(用户名,InputFormat,Mapper等),一般是以key/value的形式保存,比如:想获取当前用户名,可以这样:


JobConf conf;

…….

String username = conf.get("user.name");

用户也可以通过该对象传递一些自己定义的全局属性,如用户自己定义了一个属性叫mapred.job.deadline(作业的deadline时间),用户可以在提交作业时设定该值:

hadoop jar hadoop-examples.jar wordcount -files cachefile.txt \

-D mapred.job.deadline=100000 \

input output

然后在调度器中这样获取该属性的值:


JobConf conf;

…….

int deadline=conf.getInt("mapred.job.deadline", -1); //获取mapred.job.deadline属性,如果没有设置,则返回-1

4. 总结

调度器是Hadoop的中枢,其重要性可想而知。用户如果要设计Hadoop调度器,需要对Hadoop的整个框架有比较深入的理解,同时需阅读一些很重要的类(如JobTracker和JobInprogress等)的源码,以便利用这些类完成你的调度算法。

Hadoop目前自带了三个比较常用的调度器,分别为JobQueueTaskScheduler (FIFO,但队列调度器),Capacity Scheduler(多队列多用户调度器)和Fair Scheduler(多队列多用户调度器),它们是你学习Hadoop调度器的最好资料。

5. 参考资料

(1) Hadoop-0.20.2源代码

原创文章,转载请注明: 转载自董的博客

本文链接地址: http://dongxicheng.org/mapreduce/how-to-write-hadoop-schedulers/

相关 [hadoop 调度] 推荐:

hadoop调度算法

- - 互联网 - ITeye博客
1 hadoop目前支持以下三种调度器:. FifoScheduler:最简单的调度器,按照先进先出的方式处理应用. 只有一个队列可提交应用,所有用户提交到这个队列. CapacityScheduler:可以看作是FifoScheduler的多队列版本. 但是,队列间的资源分配以使用量作排列依据,使得容量小的队列有竞争优势.

hadoop资源调度器

- - ITeye博客
         hadoop支持多用户环境,在生产环境中,往往一个大的hadoop集群,供多个应用如Hive,Mahout等之类的使用. 在多用户环境下,有的用户提交的工作量很大、很频繁,而有的很少,还有的优先级很高,那么如何保证“按需”来为各个用户分配资源(内存、CPU、带宽、IO、磁盘)呢.         这里,我对hadoop的资源调度做个简单的归纳总结:.

如何编写Hadoop调度器

- Guancheng(冠诚) - 董的博客
在Hadoop中,调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器,然后在配置文件中指定相应的调度器,这样,当Hadoop集群启动时,便会加载该调度器. 当前Hadoop自带了几种调度器,分别是FIFO(默认调度器),Capacity Scheduler和FairScheduler,通常境况下,这些调度器很难满足公司复杂的应用需求,因而往往需要开发自己的调度器.

Hadoop中Speculative Task调度策略

- - 董的博客
Dong | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/mapreduce/hadoop-speculative-task/. Speculative Task思路是以空间换时间的,同时启动多个相同task,哪个完成的早,则采用哪个task的结果,这样明显可以提高任务计算速度,但是,这样却会占用更多的资源,在集群资源紧缺的情况下,合理的控制Speculative Task,可在多用少量资源情况下,减少大作业的计算时间.

Hadoop计算能力调度器算法解析

- Roger - 董的博客
本文描述了hadoop中的计算能力调度器(Capacity Scheduler)的实现算法,计算能力调度器是由Yahoo贡献的,主要是解决HADOOP-3421中提出的,在调度器上完成HOD(Hadoop On Demand)功能,克服已有HOD的性能低效的缺点. 它适合于多用户共享集群的环境的调度器.

分布式系统Hadoop源码阅读与分析(一):作业调度器实现机制

- otter - 博客园-首页原创精华区
上一篇博文对Hadoop的作业调度器进行了介绍,我们知道,JobTracker和TaskTracker是Hadoop作业调度过程中最核心的两个部分,前者负责map/reduce作业的调度与分派,后者负责map/reduce作业的实际执行,它们之间通过RPC机制进行通讯. 下面将对Hadoop 0.20.2版本中作业调度相关源码进行分析,至于JobTracker和TaskTracker中与作业调度无关的源码部分,并未进行详细介绍.

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.

Hadoop使用(一)

- Pei - 博客园-首页原创精华区
Hadoop使用主/从(Master/Slave)架构,主要角色有NameNode,DataNode,secondary NameNode,JobTracker,TaskTracker组成. 其中NameNode,secondary NameNode,JobTracker运行在Master节点上,DataNode和TaskTracker运行在Slave节点上.

Hadoop MapReduce技巧

- - 简单文本
我在使用Hadoop编写MapReduce程序时,遇到了一些问题,通过在Google上查询资料,并结合自己对Hadoop的理解,逐一解决了这些问题. Hadoop对MapReduce中Key与Value的类型是有要求的,简单说来,这些类型必须支持Hadoop的序列化. 为了提高序列化的性能,Hadoop还为Java中常见的基本类型提供了相应地支持序列化的类型,如IntWritable,LongWritable,并为String类型提供了Text类型.

Hadoop TaskScheduler浅析

- - kouu&#39;s home
TaskScheduler,顾名思义,就是MapReduce中的任务调度器. 在MapReduce中,JobTracker接收JobClient提交的Job,将它们按InputFormat的划分以及其他相关配置,生成若干个Map和Reduce任务. 然后,当一个TaskTracker通过心跳告知JobTracker自己还有空闲的任务Slot时,JobTracker就会向其分派任务.