Spark动态资源分配-Dynamic Resource Allocation – lxw的大数据田地

标签: | 发表时间:2018-01-10 17:00 | 作者:
出处:http://lxw1234.com

关键字:spark、资源分配、dynamic resource allocation

Spark中,所谓资源单位一般指的是executors,和Yarn中的Containers一样,在Spark On Yarn模式下,通常使用–num-executors来指定Application使用的executors数量,而–executor-memory和–executor-cores分别用来指定每个executor所使用的内存和虚拟CPU核数。相信很多朋友至今在提交Spark应用程序时候都使用该方式来指定资源。

假设有这样的场景,如果使用Hive,多个用户同时使用hive-cli做数据开发和分析,只有当用户提交执行了Hive SQL时候,才会向YARN申请资源,执行任务,如果不提交执行,无非就是停留在Hive-cli命令行,也就是个JVM而已,并不会浪费YARN的资源。现在想用Spark-SQL代替Hive来做数据开发和分析,也是多用户同时使用,如果按照之前的方式,以yarn-client模式运行spark-sql命令行( http://lxw1234.com/archives/2015/08/448.htm),在启动时候指定–num-executors 10,那么每个用户启动时候都使用了10个YARN的资源(Container),这10个资源就会一直被占用着,只有当用户退出spark-sql命令行时才会释放。

spark-sql On Yarn,能不能像Hive一样,执行SQL的时候才去申请资源,不执行的时候就释放掉资源呢,其实从Spark1.2之后,对于On Yarn模式,已经支持动态资源分配(Dynamic Resource Allocation),这样,就可以根据Application的负载(Task情况),动态的增加和减少executors,这种策略非常适合在YARN上使用spark-sql做数据开发和分析,以及将spark-sql作为长服务来使用的场景。

本文以Spark1.5.0和hadoop-2.3.0-cdh5.0.0,介绍在spark-sql On Yarn模式下,如何使用动态资源分配策略。

YARN的配置

首先需要对YARN的NodeManager进行配置,使其支持Spark的Shuffle Service。

  • 修改每台NodeManager上的yarn-site.xml:

##修改
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle, spark_shuffle</value>
</property>
##增加
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>spark.shuffle.service.port</name>
<value>7337</value>
</property>

  • 将$SPARK_HOME/lib/spark-1.5.0-yarn-shuffle.jar拷贝到每台NodeManager的${HADOOP_HOME}/share/hadoop/yarn/lib/下。
  • 重启所有NodeManager。

Spark的配置

配置$SPARK_HOME/conf/spark-defaults.conf,增加以下参数:

spark.shuffle.service.enabled true   //启用External shuffle Service服务
spark.shuffle.service.port 7337 //Shuffle Service服务端口,必须和yarn-site中的一致
spark.dynamicAllocation.enabled true  //开启动态资源分配
spark.dynamicAllocation.minExecutors 1  //每个Application最小分配的executor数
spark.dynamicAllocation.maxExecutors 30  //每个Application最大并发分配的executor数
spark.dynamicAllocation.schedulerBacklogTimeout 1s 
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
  • 动态资源分配策略:

开启动态分配策略后,application会在task因没有足够资源被挂起的时候去动态申请资源,这种情况意味着该application现有的executor无法满足所有task并行运行。spark一轮一轮的申请资源,当有task挂起或等待spark.dynamicAllocation.schedulerBacklogTimeout(默认1s)时间的时候,会开始动态资源分配;之后会每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(默认1s)时间申请一次,直到申请到足够的资源。每次申请的资源量是指数增长的,即1,2,4,8等。
之所以采用指数增长,出于两方面考虑:其一,开始申请的少是考虑到可能application会马上得到满足;其次要成倍增加,是为了防止application需要很多资源,而该方式可以在很少次数的申请之后得到满足。

  • 资源回收策略

当application的executor空闲时间超过spark.dynamicAllocation.executorIdleTimeout(默认60s)后,就会被回收。

使用spark-sql On Yarn执行SQL,动态分配资源

./spark-sql --master yarn-client \
--executor-memory 1G \
-e "SELECT COUNT(1) FROM ut.t_ut_site_log where pt >= '2015-12-09' and pt <= '2015-12-10'"

spark

该查询需要123个Task。

spark

从AppMaster的WEB界面可以看到,总共有31个Executors,其中一个是Driver,既有30个Executors并发执行,而30,正是在spark.dynamicAllocation.maxExecutors参数中配置的最大并发数。如果一个查询只有10个Task,那么只会向Yarn申请10个executors的资源。

需要注意:
如果使用
./spark-sql –master yarn-client –executor-memory 1G
进入spark-sql命令行,在命令行中执行任何SQL查询,都不会执行,原因是spark-sql在提交到Yarn时候,已经被当成一个Application,而这种,除了Driver,是不会被分配到任何executors资源的,所有,你提交的查询因为没有executor而不能被执行。

而这个问题,我使用Spark的ThriftServer(HiveServer2)得以解决。

使用Thrift JDBC方式执行SQL,动态分配资源

首选以yarn-client模式,启动Spark的ThriftServer服务,也就是HiveServer2.

  • 配置ThriftServer监听的端口号和地址
vi $SPARK_HOME/conf/spark-env.sh
export HIVE_SERVER2_THRIFT_PORT=10000
export HIVE_SERVER2_THRIFT_BIND_HOST=0.0.0.0
  • 以yarn-client模式启动ThriftServer
cd $SPARK_HOME/sbin/
./start-thriftserver.sh \
--master yarn-client \
--conf spark.driver.memory=3G \
--conf spark.shuffle.service.enabled=true \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=30 \
--conf spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=5s

启动后,ThriftServer会在Yarn上作为一个长服务来运行:

spark

  • 使用beeline通过JDBC连接spark-sql
cd $SPARK_HOME/bin
./beeline -u jdbc:hive2://localhost:10000 -n lxw1234

spark

执行查询:
select count(1) from ut.t_ut_site_log where pt = ‘2015-12-10′;
该任务有64个Task:

spark

而监控页面上的并发数仍然是30:

spark

执行完后,executors数只剩下1个,应该是缓存数据,其余的全部被回收:

spark

这样,多个用户可以通过beeline,JDBC连接到Thrift Server,执行SQL查询,而资源也是动态分配的。

需要注意的是,在启动ThriftServer时候指定的spark.dynamicAllocation.maxExecutors=30,是整个ThriftServer同时并发的最大资源数,如果多个用户同时连接,则会被多个用户共享竞争,总共30个。

 

这样,也算是解决了多用户同时使用spark-sql,并且动态分配资源的需求了。

Spark动态资源分配官方文档:http://spark.apache.org/docs/1.5.0/job-scheduling.html#dynamic-resource-allocation

您可以关注 lxw的大数据田地,或者 加入邮件列表,随时接收博客更新的通知邮件。

 

 

如果觉得本博客对您有帮助,请 赞助作者

转载请注明: lxw的大数据田地» Spark动态资源分配-Dynamic Resource Allocation

喜欢 (15) 分享 (0)

相关 [spark 资源 dynamic] 推荐:

Spark动态资源分配-Dynamic Resource Allocation – lxw的大数据田地

- -
关键字:spark、资源分配、dynamic resource allocation. Spark中,所谓资源单位一般指的是executors,和Yarn中的Containers一样,在Spark On Yarn模式下,通常使用–num-executors来指定Application使用的executors数量,而–executor-memory和–executor-cores分别用来指定每个executor所使用的内存和虚拟CPU核数.

Dynamic Proxy (动态代理)

- - CSDN博客推荐文章
动态代理主要有一个 Proxy类 和一个 InvocationHandler接口. 真实主题角色(实现了抽象主题接口). 动态代理主题角色(实现了 InvocationHandler接口,并实现了 invoke()方法). Proxy 要调用 newProxyInstance方法. 1.抽象主题角色 SubjectDemo.java.

Dynamic Code Evolution for Java dcevm 原理 - redcreen

- - 博客园_redcreen的专栏
在 hostswap dcevm中我们对Dynamic Code Evolution VM有了一个简单的了解,这篇文章将介绍Dynamic Code Evolution VM的实现原理. Dynamic Code Evolution (下文简称DCE):泛指java在运行时修改程序的技术.例如aop等.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.

Spark迷思

- - ITeye博客
目前在媒体上有很大的关于Apache Spark框架的声音,渐渐的它成为了大数据领域的下一个大的东西. 证明这件事的最简单的方式就是看google的趋势图:. 上图展示的过去两年Hadoop和Spark的趋势. Spark在终端用户之间变得越来越受欢迎,而且这些用户经常在网上找Spark相关资料. 这给了Spark起了很大的宣传作用;同时围绕着它的也有误区和思维错误,而且很多人还把这些误区作为银弹,认为它可以解决他们的问题并提供比Hadoop好100倍的性能.

Spark 优化

- - CSDN博客推荐文章
提到Spark与Hadoop的区别,基本最常说的就是Spark采用基于内存的计算方式,尽管这种方式对数据处理的效率很高,但也会往往引发各种各样的问题,Spark中常见的OOM等等. 效率高的特点,注定了Spark对性能的严苛要求,那Spark不同程序的性能会碰到不同的资源瓶颈,比如:CPU,带宽、内存.

使用Mono.Cecil辅助ASP.NET MVC使用dynamic类型Model

- wang - 老赵点滴 - 追求编程之美
这也是之前在珠三角技术沙龙上的示例之一,解决的是在ASP.NET MVC使用dynamic类型Model时遇到的一个真实问题. C# 4编译器支持dynamic类型,因此在编写页面模板的时候自然就可以把它作为视图的Model类型. 表现层的需求很容易改变,因此dynamic类型的Model可以减少我们反复修改强类型Model的麻烦,再配合匿名类型的使用,可谓是动静相宜,如鱼得水.

CRM的客户数据模型:Microsoft Dynamic CRM 2011 (II)

- - CSDN博客推荐文章
图1展示了Microsoft Dynamic CRM 2011的概念客户数据模型,来自以下信息源:. 在Microsoft Dynamics CRM 2011中, 使用三个主要的实体:lead(线索), contact(联络人) 和account(账户,本文不用引起混淆的“客户”). Customer Address (客户地址)用来存储客户的各类地址信息,比如shipping address (寄送地址) 和billingaddress(账单地址).

基于系统负载的动态限流组件 dynamic-limiter

- -
 宋鹏玉,2014年加入 Qunar,目前在国内机票报价组,热爱技术,欢迎交流. 一个系统的处理能力是有限的,当请求量超过处理能力时,通常会引起排队,造成响应时间迅速提升. 如果对服务占用的资源量没有约束,还可能因为系统资源占用过多而宕机. 因此,为了保证系统在遭遇突发流量时,能够正常运行,需要为你的服务加上限流.