Spark 任务调度

标签: dev | 发表时间:2019-06-07 00:00 | 作者:
出处:http://itindex.net/relian

 Spark的核心是基于RDD来实现的,Spark任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,然后将每个Stage中的任务(Task)分发到指定的节点去运行得到最终的结果。

先来了解下几个概念:

  • Application:用户编写的Spark应用程序,由一个或多个Job组成。提交到Spark之后,Spark会为Application分配资源,将程序进行转换并执行。

  • Job(作业):由Action算子触发生成的由一个或多个Stage组成的计算作业。

  • Stage(调度阶段):每个Job会根据RDD的宽依赖被切分为多个Stage,每个Stage都包含一个TaskSet。

  • TaskSet(任务集):一组关联的,但相互之间没有shuffle依赖关系的Task集合。一个TaskSet对应的调度阶段。

  • Task(任务):RDD中的一个分区对应一个Task,Task是单个分区上最小的处理流程单元。

 Spark任务调度模块主要包含两大部分:DAGScheduler和TaskScheduler,它们负责将用户提交的计算任务按照DaG划分为不同的阶段并且将不同阶段的计算任务提交到集群进行最终的计算。

  • DAGScheduler:主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,并以TaskSet的形式把Stage提交给TaskScheduler。其中每个Stage由可以并发执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据。

  • TaskScheduler:负责Application中不同job之间的调度,将TaskSet提交给Worker执行并返回结果,在Task执行失败时启动重试机制,并且为执行速度慢的Task启动备份的任务。

Spark任务调度整个过程如下图所示:

RDD Objects就是在代码中创建的RDD,这些代码逻辑上组成了一个DAG。

DAGScheduler主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,其中每个Stage由可以并发执行的一组Task组成,这些Task的执行逻辑完全相同,只是作用于不同的数据。在DAGScheduler将这组Task划分完成之后,会将这组Task(TaskSets)提交到TaskScheduler。

TaskScheduler负责Task级的调度,将DAGScheduler提交过来的TaskSet按照指定的调度实现,分别对接到不同的资源管理系统。TaskScheduler会将TaskSets封装成TaskSetManager,并加入到调度的队列中,TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务的。TaskScheduler通过Cluster Manager在集群中的某个Worker的Executor上启动任务。但是在不同的资源管理框架下TaskScheduler的实现方式有一定的差别。

Task在Executor中运行,如果缓存中没有计算结果,那么就需要开始计算,同时,计算的结果会回传到Driver或者保存在本地。

Stage划分

用户提交的计算是一个由RDD构成的DAG,如果RDD在转换的时候需要做Shuffle,没那么这个Shuffle的过程就将这个DAG分为了不同的阶段(Stage)。由于Shuffle的存在,在不同的Stage是不能并行计算的,因为后面Stage的计算需要前面Stage的Shuffle的结果。而一个Stage由一组完全独立的计算任务(即Task)组成,每个Task的运算逻辑完全相同,只不过每个Task都会处理其对应的Partition。其中,Partition的数量和Task的数量是一致的,即一个Partition会被该Stage的一个Task处理。

划分依据:Stage的划分依据就是宽依赖。

核心算法:从触发Action操作的那个RDD开始从后往前推,首先会为最后一个RDD创建一个Stage,然后继续倒推,如果发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD创建一个新的Stage,那么RDD就是新的Stage的最后一个RDD。然后以此类推,直到所有的RDD全部遍历完成为止。

Stage调度

经过Stage划分之后,会产生一个或者多个互相关联的Stage。其中,真正执行Action算子的rDD所在的Stage被称为Final Stage。DAGScheduler会从这个Final Stage生成作业实例。

在提交Stage时,DAGScheduler会先判断该Stage的父Stage的执行结果是否可用。如果所有父Stage的执行结果都可用,则提交该Stage。如果有任意一个父Stage的结果不可用,则尝试迭代提交该父Stage。

Task调度

TaskScheduler接收到DAGScheduler提交过来的TaskSet,会为每一个收到的TaskSet创建一个TaskSetManager。TaskSetManager负责TaskSet中Task的管理调度工作。

每个TaskScheduler都对应一个SchedulerBackend。其中TaskScheduler负责Application的不同Job之间的调度,在Task执行失败的时候启动重试机制,并且为执行速度慢的Task启动备份的任务。SchedulerBackend负责与Cluster Manager交互,取得该Application分配到的资源,并且将这些资源传给TaskScheduler,由TaskScheduler为Task最终分配计算资源。

Spark调度模式

Spark可以采用两种调度模式:

  • FIFO:先进先出调度模式(默认)。FIFO调度会根据StageID和JobID的大小来调度,数值较小的任务优先被调度。FIFO调度方式存在一个缺点:当遇到一个耗时较长的任务时,后续任务必须等待这个耗时任务执行完成才能得到可用的计算资源。

  • FAIR:公平调度模式。FAIR模式下每个计算任务具有相等的优先级,Spark以轮询的方式为每个任务分配计算资源。FAIR不像FIFO那样必须等待前面耗时任务完成后后续任务才能执行。在FAIR模式下,无论是耗时短任务还是耗时长任务、无论是先提交的任务还是后提交的任务都可以公平的获得资源执行,这样就提高了耗时短的任务的响应时间。FAIR比FIFO更加灵活,FAIR模式为用户提供了一个调度池的概念,用户可以将重要的计算任务放入一个调度池Pool中,通过设置该调度池的权重来使该调度池中的计算任务获得较高的优先级。

关注获得更多分享

 


好文推荐:


相关 [spark 任务 调度] 推荐:

Spark 任务调度

- - IT瘾-dev
 Spark的核心是基于RDD来实现的,Spark任务调度就是如何组织任务去处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,然后将每个Stage中的任务(Task)分发到指定的节点去运行得到最终的结果. Application:用户编写的Spark应用程序,由一个或多个Job组成.

Spark 性能相关参数配置详解-任务调度篇

- - ITeye博客
随着Spark的逐渐成熟完善, 越来越多的可配置参数被添加到Spark中来, 本文试图通过阐述这其中部分参数的工作原理和配置思路, 和大家一起探讨一下如何根据实际场合对Spark进行配置优化. 由于篇幅较长,所以在这里分篇组织,如果要看最新完整的网页版内容,可以戳这里: http://spark-config.readthedocs.org/,主要是便于更新内容.

spark之路第四课——提交spark任务 | uohzoaix

- -
spark是使用spark-submit这个命令来提交任务的. --class:一个spark任务的入口方法,一般指main方法. 如:org.apache.spark.examples.SparkPi). -master:集群的master URL. 如spark://23.195.26.187:7077.

使用Quartz和Obsidian来调度任务

- - Java译站
在介绍使用到的Quartz和Obsidian的API之前,首先我得声明一下,一般来说使用API并不是调度任务的最佳方式. Quartz提供了一个通过XML来配置作业的机制,而Obsidian则为你提供了一套完整的管理和监控的WEB应用. 然而,有一些使用场景还是强烈推荐使用API的,我们来看一下吧.

MapReduce调度与执行原理之任务调度

- - CSDN博客云计算推荐文章
前言:本文旨在理清在Hadoop中一个MapReduce作业(Job)在提交到框架后的整个生命周期过程,权作总结和日后参考,如有问题,请不吝赐教. 本文不涉及Hadoop的架构设计,如有兴趣请参考相关书籍和文献. 在梳理过程中,我对一些感兴趣的源码也会逐行研究学习,以期强化基础. 作者:Jaytalent.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.

Spark迷思

- - ITeye博客
目前在媒体上有很大的关于Apache Spark框架的声音,渐渐的它成为了大数据领域的下一个大的东西. 证明这件事的最简单的方式就是看google的趋势图:. 上图展示的过去两年Hadoop和Spark的趋势. Spark在终端用户之间变得越来越受欢迎,而且这些用户经常在网上找Spark相关资料. 这给了Spark起了很大的宣传作用;同时围绕着它的也有误区和思维错误,而且很多人还把这些误区作为银弹,认为它可以解决他们的问题并提供比Hadoop好100倍的性能.

Spark 优化

- - CSDN博客推荐文章
提到Spark与Hadoop的区别,基本最常说的就是Spark采用基于内存的计算方式,尽管这种方式对数据处理的效率很高,但也会往往引发各种各样的问题,Spark中常见的OOM等等. 效率高的特点,注定了Spark对性能的严苛要求,那Spark不同程序的性能会碰到不同的资源瓶颈,比如:CPU,带宽、内存.

Java Spring注解任务调度并实现AOP监控任务执行情况

- - 极客521 | 极客521
本文讲的是通过Spring注解的方式实现任务调度. 只要引入了spring-context包就能够在项目中使用注解方式的任务调度. 需要在Spring配置文件中加入task的schema. 然后在代码中就可以直接用了,要定时执行的方法必须是void的,并且没有任何参数的. cron表达式请自行问百度,下面只列出几个从网上找的例子.