hadoop/spark关闭钩子研究

标签: hadoop spark 研究 | 发表时间:2016-02-06 09:39 | 作者:distantlight1
出处:http://www.iteye.com

引子:在使用spark和hadoop的时候,遇到一些进程退出时的报错。因此顺便研究了一下jvm以及一些开源框架的关闭钩子的机制。这篇文章不涉及底层native实现,仅限Java层面

 

1.jvm关闭钩子

注册jvm关闭钩子通过Runtime.addShutdownHook(),实际调用ApplicationShutdownHooks.add()。后者维护了一个钩子集合IdentityHashMap<Thread, Thread> hooks

ApplicationShutdownHooks类初始化的时候,会注册一个线程到Shutdown类

 

static {
    try {
        Shutdown.add(1, false,
            new Runnable() {
                public void run() {
                    runHooks();
                }
            }
        );
        hooks = new IdentityHashMap<>();
    } catch (IllegalStateException e) {
        hooks = null;
    }
}

 Shutdown类里也维护了一个钩子集合

private static final Runnable[] hooks = new Runnable[MAX_SYSTEM_HOOKS];

 这个集合是分优先级的(优先级就是下标数值),自定义的钩子优先级默认是1,也就是最先执行。关闭钩子最终触发就是从这个集合进行

 

应用关闭时,以System.exit()为例,依次调用Runtime.exit()、Shutdow.exit()

Shutdown执行jvm退出逻辑,并维护了若干关闭状态

private static final int RUNNING = 0;	// 初始状态,开始关闭
private static final int HOOKS = 1;	// 运行钩子
private static final int FINALIZERS = 2;	// 运行finalizer
private static int state = RUNNING;

static void exit(int status) {
    boolean runMoreFinalizers = false;
    synchronized (lock) {		// 根据退出码status参数做不同处理
        if (status != 0) runFinalizersOnExit = false;	// 只有正常退出才会运行finalizer
        switch (state) {
        case RUNNING:       // 执行钩子并修改状态
            state = HOOKS;
            break;
        case HOOKS:         // 执行钩子
            break;
        case FINALIZERS:    // 执行finalizer
            if (status != 0) {
                halt(status); // 如果是异常退出,直接退出进程。halt()底层是native实现,这时不会执行finalizer
            } else {      // 正常退出则标记是否需要执行finalizer
                runMoreFinalizers = runFinalizersOnExit;
            }
            break;
        }
    }

    if (runMoreFinalizers) { // 如果有需要,就执行finalizer,注意只有state=FINALIZERS会走这个分支
        runAllFinalizers();
        halt(status);
    }

    synchronized (Shutdown.class) {
        // 这里执行state= HOOKS逻辑,包括执行钩子和finalizer
        sequence();
        halt(status);
    }
}

private static void sequence() {
    synchronized (lock) {
        if (state != HOOKS) return;
    }
    runHooks();	// 执行钩子,这里会依次执行hooks数组里的各线程
    boolean rfoe;	// finalizer逻辑
    synchronized (lock) {
        state = FINALIZERS;
        rfoe = runFinalizersOnExit;
    }
    if (rfoe) runAllFinalizers();
}

private static void runHooks() {
    for (int i=0; i < MAX_SYSTEM_HOOKS; i++) {
        try {
            Runnable hook;
            synchronized (lock) {
                currentRunningHook = i;
                hook = hooks[i];
            }
	// 由于之前注册了ApplicationShutdownHooks的钩子线程,这里又会回调ApplicationShutdownHooks.runHooks
            if (hook != null) hook.run();
        } catch(Throwable t) {
            if (t instanceof ThreadDeath) {
                ThreadDeath td = (ThreadDeath)t;
                throw td;
            }
        }
    }
}

static void runHooks() {
    Collection<Thread> threads;
    synchronized(ApplicationShutdownHooks.class) {
        threads = hooks.keySet();
        hooks = null;
    }

    // 注意ApplicationShutdownHooks里的钩子之间是没有优先级的,如果定义了多个钩子,那么这些钩子会并发执行
    for (Thread hook : threads) {
        hook.start();
    }
    for (Thread hook : threads) {
        try {
            hook.join();
        } catch (InterruptedException x) { }
    }
}

 

2.Spring关闭钩子

Spring在AbstractApplicationContext里维护了一个shutdownHook属性,用来关闭Spring上下文。但这个钩子不是默认生效的,需要手动调用ApplicationContext.registerShutdownHook()来开启

在自行维护ApplicationContext(而不是托管给tomcat之类的容器时),注意尽量使用ApplicationContext.registerShutdownHook()或者手动调用ApplicationContext.close()来关闭Spring上下文,否则应用退出时可能会残留资源

	public void registerShutdownHook() {
		if (this.shutdownHook == null) {
			this.shutdownHook = new Thread() {
				@Override
				public void run() {
					// 这里会调用Spring的关闭逻辑,包括资源清理,bean的销毁等
					doClose();
				}
			};
			// 这里会把spring的钩子注册到jvm关闭钩子
			Runtime.getRuntime().addShutdownHook(this.shutdownHook);
		}
	}

 

3.Hadoop关闭钩子

Hadoop客户端初始化时,org.apache.hadoop.util.ShutdownHookManager会向Runtime注册一个钩子线程。ShutdownHookManager是一个单例类,并维护了一个钩子集合

private Set<HookEntry> hooks;

static {
    Runtime.getRuntime().addShutdownHook(
      new Thread() {
        @Override
        public void run() {
          MGR.shutdownInProgress.set(true);		// MGR是本类的单例
          for (Runnable hook: MGR.getShutdownHooksInOrder()) {
            try {
              hook.run();
            } catch (Throwable ex) {
              LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() +
                       "' failed, " + ex.toString(), ex);
            }
          }
        }
      }
    );
  }

 这里HookEntry是hadoop封装的钩子类,HookEntry是带优先级的,一个priority属性。MGR.getShutdownHooksInOrder()方法会按priority依次(单线程)执行钩子

 

默认挂上的钩子就一个:org.apache.hadoop.fs.FileSystem$Cache$ClientFinalizer(priority=10),这个钩子用来清理hadoop FileSystem缓存以及销毁FileSystem实例。这个钩子是在第一次hadoop IO发生时(如FileSystem.get)lazy加载

 

此外调用FileContext.deleteOnExit()方法也会通过注册钩子

hadoop集群(非客户端)启动时,还会注册钩子清理临时路径

 

4.SparkContext关闭钩子

Spark也有关闭钩子管理类org.apache.spark.util.ShutdownHookManager,结构与hadoop的ShutdownHookManager基本类似

hadoop 2.x开始,spark的ShutdownHookManager会挂一个SparkShutdownHook钩子到hadoop的ShutdownHookManager(priority=40),用来实现SparkContext的清理逻辑。hadoop 1.x没有ShutdownHookManager,所以SparkShutdownHook直接挂在jvm上

def install(): Unit = {
  val hookTask = new Runnable() {
    // 执行钩子的回调进程,根据priority依次执行钩子
    override def run(): Unit = runAll()
  }
  Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
    case Success(shmClass) =>
      val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get(null).asInstanceOf[Int]
      val shm = shmClass.getMethod("get").invoke(null)
      shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int]).invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))

    case Failure(_) =>	// hadoop 1.x 
      Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
  }
}

顺便说一下,hadoop的FileSystem实例底层默认是复用的,所以如果执行了两次fileSystem.close(),第二次会报错FileSystem Already Closed异常(即使表面上是对两个实例执行的)

一个典型的场景是同时使用Spark和Hadoop-Api,Spark会创建FileSystem实例,Hadoop-Api也会创建,由于底层复用,两者其实是同一个。因为关闭钩子的存在,应用退出时会执行两次FileSystem.close(),导致报错。解决这个问题的办法是在hdfs-site.xml增加以下配置,关闭FileSystem实例复用

    <property>
        <name>fs.hdfs.impl.disable.cache</name>
        <value>true</value>
    </property>

 

5.总结

以下为相关调用逻辑整理

紫红色箭头表示钩子注册,蓝色箭头表示钩子触发

蓝色、黄色、红色线框分别表示Spring、Hadoop、Spark相关代码



 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [hadoop spark 研究] 推荐:

hadoop/spark关闭钩子研究

- - 开源软件 - ITeye博客
引子:在使用spark和hadoop的时候,遇到一些进程退出时的报错. 因此顺便研究了一下jvm以及一些开源框架的关闭钩子的机制. 这篇文章不涉及底层native实现,仅限Java层面. 注册jvm关闭钩子通过Runtime.addShutdownHook(),实际调用ApplicationShutdownHooks.add().

Spark是否会替代Hadoop?

- - CSDN博客推荐文章
我经常会从客户或者网上听到这个问题,尤其是最近几年. 那么关于spark哪些被我们神化了,哪些又是真实的,以及它在“大数据”的生态系统中又是怎样的. 说实话,其实我把这个问题作为标题是有问题的,但是我们经常会这样问. Hadoop并不是一个单独的产品而是一个生态系统,而spark也是一样的. 目前Hadoop生态系统主要包括:.

Spark和RDD模型研究

- - CSDN博客云计算推荐文章
现今分布式计算框架像MapReduce和Dryad都提供了高层次的原语,使用户不用操心任务分发和错误容忍,非常容易地编写出并行计算程序. 然而这些框架都缺乏对分布式内存的抽象和支持,使其在某些应用场景下不够高效和强大. RDD(Resilient Distributed Datasets弹性分布式数据集)模型的产生动机主要来源于两种主流的应用场景:.

Spark:比Hadoop更强大的分布式数据计算项目

- - 标点符
Spark是一个由加州大学伯克利分校(UC Berkeley AMP)开发的一个分布式数据快速分析项目. 它的核心技术是弹性分布式数据集(Resilient distributed datasets),提供了比Hadoop更加丰富的MapReduce模型,可以快速在内存中对数据集进行多次迭代,来支持复杂的数据挖掘算法和图计算算法.

如何用 Hadoop/Spark 构建七牛数据平台

- - leejun_2005的个人页面
数据平台在大部分公司都属于支撑性平台,做的不好立刻会被吐槽,这点和运维部门很像. 所以在技术选型上优先考虑现成的工具,快速出成果,没必要去担心有技术负担. 早期,我们走过弯路,认为没多少工作量,收集存储和计算都自己研发,发现是吃力不讨好. 去年上半年开始,我们全面拥抱开源工具,搭建自己的数据平台. 公司的主要数据来源是散落在各个业务服务器上的半结构化日志,比如系统日志、程序日志、访问日志、审计日志等.

ElasticSearch位置搜索 - Spring , Hadoop, Spark , BI , ML - CSDN博客

- -
在ElasticSearch中,地理位置通过. geo_point这个数据类型来支持. 地理位置的数据需要提供经纬度信息,当经纬度不合法时,ES会拒绝新增文档. 这种类型的数据支持距离计算,范围查询等. mapping为city:. geo_point类型必须显示指定,ES无法从数据中推断. 在ES中,位置数据可以通过对象,字符串,数组三种形式表示,分别如下:.

hadoop深入研究:(一)——hdfs介绍

- - CSDN博客云计算推荐文章
转载请注明出处: http://blog.csdn.net/lastsweetop/article/details/8992505. 这里的非常大是指几百MB,GB,TB.雅虎的hadoop集群已经可以存储PB级别的数据.  hdfs的高可用是用软件来解决,因此不需要昂贵的硬件来保障高可用性,各个生产商售卖的pc或者虚拟机即可.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.

Spark与Mapreduce?

- - 崔永键的博客
我本人是类似Hive平台的系统工程师,我对MapReduce的熟悉程度是一般,它是我的底层框架. 我隔壁组在实验Spark,想将一部分计算迁移到Spark上. 年初的时候,看Spark的评价,几乎一致表示,Spark是小数据集上处理复杂迭代的交互系统,并不擅长大数据集,也没有稳定性. 但是最近的风评已经变化,尤其是14年10月他们完成了Peta sort的实验,这标志着Spark越来越接近替代Hadoop MapReduce了.

Spark迷思

- - ITeye博客
目前在媒体上有很大的关于Apache Spark框架的声音,渐渐的它成为了大数据领域的下一个大的东西. 证明这件事的最简单的方式就是看google的趋势图:. 上图展示的过去两年Hadoop和Spark的趋势. Spark在终端用户之间变得越来越受欢迎,而且这些用户经常在网上找Spark相关资料. 这给了Spark起了很大的宣传作用;同时围绕着它的也有误区和思维错误,而且很多人还把这些误区作为银弹,认为它可以解决他们的问题并提供比Hadoop好100倍的性能.