一文精通 Flink on YARN

标签: dev | 发表时间:2019-02-20 00:00 | 作者:
出处:http://itindex.net/relian

简介

本文主要是讲解flink on yarn的部署过程,然后yarn-session的基本原理,如何启动多个yarn-session的话如何部署应用到指定的yarn-session上,然后是用户jar的管理配置及故障恢复相关的参数。

交互过程概览

flink on yarn的整个交互过程图,如下:

要使得flink运行于yarn上,flink要能找到hadoop配置,因为要连接到yarn的resourcemanager和hdfs。可以使用下面的策略来指定hadoop配置:

1.会查看YARN_CONF_DIR,HADOOP_CONF_DIR或者HADOOP_CONF_PATH是否设置,按照顺序检查的。然后,假如配置了就会从该文件夹下读取配置。

2.如果上面环境变量都没有配置的话,会使用HADOOP_HOME环境变量。对于hadoop2的话会查找的配置路径是$HADOOP_HOME/etc/hadoop;对于hadoop1会查找的路径是$HADOOP_HOME/conf.

每当常见一个新flink的yarn session的时候,客户端会首先检查要请求的资源(containers和memory)是否可用。然后,将包含flink相关的jar包盒配置上传到hdfs。

接下来就是客户端会向resourcemanager申请一个yarn container用以启动ApplicationMaster。由于客户端已经将配置和jar文件注册为了container的资源,所以nodemanager会直接使用这些资源准备好container(例如,下载文件等)。一旦该过程结束,AM就被启动了。

Jobmanager和AM运行于同一个container。一旦创建成功,AM就知道了Jobmanager的地址。它会生成一个新的flink配置文件,这个配置文件是给将要启动的taskManager用的,该配置文件也会上传到hdfs。另外,AM的container也提供了Flink的web接口。Yarn代码申请的端口都是临时端口,目的是为了让用户并行启动多个Flink YARN Session。 

最后,AM开始申请启动Flink Taskmanager的containers,这些container会从hdfs上下载jar文件和已修改的配置文件。一旦这些步骤完成,flink就可以接受任务了。

部署启动yarn-session

这个就是yarn-session脚本启动的整个过程吧。

默认可以直接执行bin/yarn-session.sh默认启动的配置是

   {masterMemoryMB=1024, taskManagerMemoryMB=1024,numberTaskManagers=1, slotsPerTaskManager=1}

需要自己自定义配置的话,可以使用来查看参数:

   bin/yarn-session.sh–help


比如,我们启动一个yarn-session有10个Taskmanager,8GB内存,32处理slot,那么脚本编写应该是这样的:

   

./bin/yarn-session.sh-n10-tm8192-s32

系统默认使用con/flink-conf.yaml里的配置。Flink onyarn将会覆盖掉几个参数:jobmanager.rpc.address因为jobmanager的在集群的运行位置并不是实现确定的,前面也说到了就是am的地址;taskmanager.tmp.dirs使用yarn给定的临时目录;parallelism.default也会被覆盖掉,如果在命令行里指定了slot数。

如果你想保证conf/flink-conf.yaml仅是全局末日配置,然后针对要启动的每一个yarn-session.sh都设置自己的配置,那么可以考虑使用-D修饰。

这种情况下启动完成yarn-session.sh会在会话窗口结尾

输入stop然后回车就会停掉整个应用。

官网说的是CTRL+C可以会在杀死yarn-session.sh的客户端的时候停止整个应用,max os下实测,不行的。

假如要启动多个需要多个shell会话窗口,那么假如想在启动完yarn-session.sh脚本之后使其退出,那么只需要加上-d或者-detached参数即可。这种情况下,客户端在提交flink到集群之后就会退出,这个时候要停止该yarn-session.sh必须要用yarn的命令了yarn application –kill <appid> 

提交job到yarn-session

启动完yarn-session就是提交应用了,那么一个集群中可以存在多个yarn-session如何提交到自己的yarn-session呢?

其实,前面在讲yarn-session启动的时候应该强调一下那个叫做-nm的参数,这个就是给你的yarn-session起一个名字。比如

   bin/yarn-session.sh-nmtest

 

这样根据你的业务需求特点,可以自己起一个名字,然后就可以确定那个yarn-session可以用来提交job了。 

当然,前面我们也说了

运行bin/flink run –help可以产看flink提交到yarn的相关参数其中有一个叫做

然后就可以提交任务了

   

./bin/flink run./examples/batch/WordCount.jar --input/input/test.txt --output/output/result.txt 

假如只启动了一个yarn-session的话,那么就是他会找到默认的,否则的话就用-m参数指定了。

jobmanager的地址也可以从下面页面查询。


用户依赖与classlpath

用户依赖管理还是有一定的注意事项的,默认情况下当单个job在运行的时候flink会将用户jar包含进系统chasspath内部。该行为也可以通过yarn.per-job-cluster.include-user-jar参数进行控制。

当将该参数设置为 DISABLED,flink会将jar放入到用户classpath里面(这里要强调一下,前面说的是系统classpath,而这里是用户classpath)。

用户jar的在classpath的位置顺序是由该参数的下面几个值决定的:

1).ORDER:(默认)按照字典顺序将jar添加到系统classpath里。

2).FIRST:将jar添加到系统classpath的开始位置。

3).LAST:将jar添加到系统classpath里的结束位置。

故障恢复

Flink的yarn客户端有一些配置可以控制在containers失败的情况下应该怎么做。可以在conf/flink-conf.yaml或者启动YARN session以-D形式指定。

yarn.reallocate-failed: 默认值是true,该参数控制flink是否会重新申请失败的taskmanager的container。

yarn.maximum-failed-containers: 在整个yarn-session挂掉之前,ApplicationMaster最大接受失败containers的数目。默认是最初请求的taskmanager数(-n)

yarn.application-attempts: yarn的applicationMaster失败后尝试的次数,如果此值设置为1,默认值,则当AM失败时,整个yarn session就失败了,所以该值可以设置为一个较大的值。

推荐阅读:

浪尖原创flink视频第一弹

flink 有状态udf 引起血案一

会了也要看的flink安装部署|适合阅读

2019与近550位球友一起进步~

相关 [flink on yarn] 推荐:

一文精通 Flink on YARN

- - IT瘾-dev
本文主要是讲解flink on yarn的部署过程,然后yarn-session的基本原理,如何启动多个yarn-session的话如何部署应用到指定的yarn-session上,然后是用户jar的管理配置及故障恢复相关的参数. flink on yarn的整个交互过程图,如下:. 要使得flink运行于yarn上,flink要能找到hadoop配置,因为要连接到yarn的resourcemanager和hdfs.

如何收集 Yarn/K8s 集群中的 Flink 任务日志?

- - zhisheng的博客
从一个小需求(任务异常或者失败时排查问题不便)出发,开始调研业界常见的解决方案进行解决我们的需求,接着有对应的代码实现和效果展示. 熟悉 Flink On Yarn 的应该知道 Flink 任务运行结束/失败后,只能去 Yarn UI 看到任务的 Jobmanager 日志,对于 TaskManager 日志这些是看不到的,这对于有时候想排查下任务失败的原因日志会比较困难(不过大多数任务挂掉的原因日志都会在 Jobmanager 存在).

flink-watermark

- - ITeye博客
     当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计.        模拟初始数据:早上10:00 11.10 用户点击了一次,但是延迟到10:00 11.15 才发送过来,允许最大延迟5秒, 5秒窗口统计. /** 实际时间-偏移量 偏移后的时间*/.

YARN/MRv2 NodeManager整体架构

- - 董的博客
Dong | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/mapreduce-nextgen/nodemanager-architecture/. (注:本文章主要翻译自Hortonworks官方博客的 “Apache Hadoop YARN – NodeManager”,红色部分为我的注解.

文章: Arun Murthy谈Apache YARN

- - InfoQ cn
Apache Hadoop YARN是一种新的Hadoop资源管理器,前不久被提升为高层次的Hadoop子项目. InfoQ有幸在Hortonworks与YARN的创始人和架构师Arun Murthy进行了讨论. 黑客马拉松•杭州 12月15-16日-Hacking Different,名额有限,请速报名.

Flink SQL 编程实践

- - Jark's Blog
注: 本教程实践基于 Ververica 开源的. sql-training 项目. 基于 Flink 1.7.2. 本文将通过五个实例来贯穿 Flink SQL 的编程实践,主要会涵盖以下几个方面的内容. 如何使用 SQL CLI 客户端. 如何在流上运行 SQL 查询. 运行 window aggregate 与 non-window aggregate,理解其区别.

谈谈 Flink Shuffle 演进

- - 时间与精神的小屋
在分布式计算中,Shuffle 是非常关键但常常容易被忽视的一环. 比如著名的 MapReduce 的命名跳过 Shuffle ,只包含其前后的 Map 跟 Reduce. 背后原因一方面是 Shuffle 是底层框架在做的事情,用户基本不会感知到其存在,另一方面是 Shuffle 听起来似乎是比较边缘的基础服务.

Yarn(MR2)上的应用汇总

- - BlogJava-首页技术区
Yarn做为hadoop下一代集群资源管理和调度平台, 其上能支持多种计算框架, 本文就简要介绍一下这些计算框架.. 首先是大家熟悉的mapreduce, 在MR2之前, hadoop包括HDFS和mapreduce, 做为hadoop上唯一的分布式计算框架, 其优点是用户可以很方便的编写分布式计算程序, 并支持许多的应用, 如hive, mahout, pig等.

YARN/MRv2 中基本术语介绍

- - 董的博客
YARN/MRv2是下一代MapReduce框架(见 Hadoop-0.23.0),该框架完全不同于当前的MapReduce框架,它在扩展性,容错性和通用性等方面更出色,据统计,Yarn有超过150000行代码,完全是重写编写的. 本文介绍了YARN/MRv2中基本术语的含义,帮助有兴趣的程序员们对YARN有一个初步的理解.

Hadoop YARN安装部署初探

- - 董的博客
Dong | 新浪微博: 西成懂 | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及 版权声明. 网址: http://dongxicheng.org/mapreduce-nextgen/hadoop-yarn-install/. 本文主要介绍了在实验环境下,能使YARN(以CDH4为例,Apache版本安装方法类似)正常工作的最简单的配置部署方法.