hive0.11的hive server实现kerberos认证和impersonation中碰到的问题

标签: hive0 hive server | 发表时间:2013-10-23 22:12 | 作者:lalaguozhe
出处:http://blog.csdn.net
背景
最近在做hive0.9升级到0.11的工作,其中一个步骤就是将之前apply到0.9的patch re-apply到0.11中,有一个patch( https://github.com/lalaguozhe/hive/commit/f2892f9e4706f3ea04117cbc7e7f54ff6af1e415)参考了hive metastore service的实现,对hive server增加了sasl kerberos认证,支持impersonate成client ugi的身份来启动作业(默认方式会以起hive service daemon的用户身份来执行,导致所有query共用一个用户启动作业)。

发现的问题
不过在re-apply这个patch后发现,用jdbc client访问hive server对于某些语句返回的是空结果集(HiveQueryResultSet中的fetchedRows是个空集合),中间也没有任何报错。非常奇怪,通过多次尝试定位出只有一种case的语句会正常返回结果,即类似“select * from xxx where yyy”这种不会起MapReduce Job的语句,其他“show tables/databases”,“select a from xxx”等语句都返回为空结果集。

调研
Hive jdbc client(底层是thrift client)在提交一条语句的时候会经历如下过程:
1. 构建execute_args对象,里面封装了要执行的语句,发送远程方法名execute和execute_args对象,接收execute返回结果,这时候client已经获取了column names和column types信息。
    public void send_execute(String query) throws org.apache.thrift.TException
    {
      execute_args args = new execute_args();
      args.setQuery(query);
      sendBase("execute", args);
    }
server端由HiveServerHandler来处理,对应的execute方法会new一个driver,调用driver.run(cmd)来执行具体的语句

2. 多次发送远程方法名fetchN和最大返回记录数numRows,返回的结果集会放在List<String> fetchedRows中。比如一共要返回90条record,每次fetchN最多返回50条,则一共调用了两次fetchN
    public void send_fetchN(int numRows) throws org.apache.thrift.TException
    {
      fetchN_args args = new fetchN_args();
      args.setNumRows(numRows);
      sendBase("fetchN", args);
    }
server端HiveServerHandler中的fetchN会调用driver.getResult(), 由QueryPlan的FetchTask中的FetchOperator获取存放结果集的文件路径,得到InputFormat信息,有了InputFormat就可以调用getSplits方法获取一串InputSplit,依次获取每一个InputSplit的RecordReader,迭代next获取key/value值(value代表每行结果record)

3. HiveConnection close,发送clean远程方法名
    public void send_clean() throws org.apache.thrift.TException
    {
      clean_args args = new clean_args();
      sendBase("clean", args);
    }
server端执行clean方法,close掉driver并对context做一些清理工作,删除语句产生的scratch directories (local file system和hdfs上的都会清除)
Context.java的removeScratchDir方法
  private void removeScratchDir() {
    for (Map.Entry<String, String> entry : fsScratchDirs.entrySet()) {
      try {
        Path p = new Path(entry.getValue());
        p.getFileSystem(conf).delete(p, true);
      } catch (Exception e) {
        LOG.warn("Error Removing Scratch: "
                 + StringUtils.stringifyException(e));
      }
    }
    fsScratchDirs.clear();
  }

具体定位语句可以分为三种情况:
1. 针对”select * from xxx“这种不起MR Job的语句,server端是直接通过MetastoreClient拿到了表对应hdfs的存放路径用FetchTask读取出来的。这边有点要注意的是hive 0.11中新增加了一个配置项”hive.fetch.task.conversion“,由jira HIVE-887引入,默认值是minimal,此外可以设置成more,minimal模式下对于”SELECT STAR, FILTER on partition columns, LIMIT only“不会起MR Job,more模式下对于”SELECT, FILTER, LIMIT only (TABLESAMPLE, virtual columns)“这种没有子查询,聚合操作和distinct的语句也不会起MR Job,大大降低了query latency,观察实现代码,其实它是将TableScanOperator, FilterOperator, SelectOperator作为FetchOperator的子Operator将数据拿到client端(即hive server端)来做filter或者projection
FetchOperator中保存了Operator Tree信息,类似深度遍历调用operator.process()方法。
FetchOperator.java
  public boolean pushRow() throws IOException, HiveException {
    InspectableObject row = getNextRow();
    if (row != null) {
      operator.process(row.o, 0);
    }
    return row != null;
  }
比如对于语句”select c1 from abc where c1 = 1;“,会依次调用Fetch Operator -> TableScanOperator -> FilterOperator -> SelectOperator -> ListSinkOperator


2. 类似”show tables/databases“这种DDL/DML语句,这种语句会先在本地FileSystem上创建一个scratch目录(由hive.exec.local.scratchdir配置),将计算结果写到本地scratch目录下,再通过FetchTask读取

3. 类似”select count(1) from tblname“这种会起MR Job的语句,会先在HDFS上创建scratch目录(由hive.exec.scratchdir配置),计算结果写到hdfs scratch目录下,再通过FetchTask读出来。

尝试多次后发现第一种类型的语句能返回结果,第二种第三种类型语句返回为空集合,而两者区别就在于是直接读取原表数据路径还是从scratch目录中读取。

HIVE在Compile环节会设置环境Context,创建local/hdfs scratch目录。在0.10版本之前,会存在一个问题,如果用户强制kill掉正在执行的语句,那么这些scratch dir就变成orphaned dir,未被清理。HIVE在0.10中加入了HIVE-3251来解决这个问题。
Driver中设置Context的HDFSCleanUp为true
      command = new VariableSubstitution().substitute(conf,command);
      ctx = new Context(conf);
      ctx.setTryCount(getTryCount());
      ctx.setCmd(command);
      ctx.setHDFSCleanup(true);

获取和创建Scratch dir的时候将scratch dir path加入filesystem instance内部的deleteOnExit集合中
  private String getScratchDir(String scheme, String authority, boolean mkdir, String scratchDir) {
    String fileSystem =  scheme + ":" + authority;
    String dir = fsScratchDirs.get(fileSystem);
    if (dir == null) {
      Path dirPath = new Path(scheme, authority, scratchDir);
      if (mkdir) {
        try {
          FileSystem fs = dirPath.getFileSystem(conf);
          dirPath = new Path(fs.makeQualified(dirPath).toString());
          if (!fs.mkdirs(dirPath)) {
            throw new RuntimeException("Cannot make directory: "
                                       + dirPath.toString());
          }
          if (isHDFSCleanup) {
            fs.deleteOnExit(dirPath);
          }
        } catch (IOException e) {
          throw new RuntimeException (e);
        }
      }
      dir = dirPath.toString();
      fsScratchDirs.put(fileSystem, dir);
    }

filesystem close的时候会先删除所有mark为deleteOnExit的files
  public void close() throws IOException {
    // delete all files that were marked as delete-on-exit.
    processDeleteOnExit();
    CACHE.remove(this.key, this);
  }

同时我们知道FileSystem抽象类内部有个静态final成员变量Cache,以schema, authority, ugi, unique的组合为key缓存了filesystem instance,内部还有一个ClientFinalizer对象(实现了Runnable),注册到JVM的shutdown hook中,在JVM关闭的时候,会启动ClientFinalizer线程,依次关闭所有Cache中的filesystem,通过这种方式来清理删除与filesystem挂钩的资源文件,在Hive中这些挂钩的文件就是local/hdfs scratch dir
    private class ClientFinalizer implements Runnable {
      @Override
      public synchronized void run() {
        try {
          closeAll(true);
        } catch (IOException e) {
          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
        }
      }
    }

回到前面的问题,第二第三种类型语句通过在hive server执行端execute方法中打log能够确认数据是已经dump到scratch目录下了,但是在一进入fetchN方法的时候,发现这些文件就莫名奇妙消失了,从而导致读出来的是空数据。排查了很久发现是由于HIVE 0.10中引入了JIRA HIVE-3098(解决FileSystem Cache内存泄露的问题)所引起的。

每一个function call都会由一个HadoopThriftAuthBridge20S中的TUGIAssumingProcessor来处理,在process方法中会先创建一个proxyUser UGI,用clientUgi.doAs来执行具体的逻辑,这样daemon user会impersonate成client user,具体逻辑代码里面如果要创建一个filesystem对象,会通过UserGroupInformation.getCurrentUser()(即clientUgi)来作为FileSystem Cache Key的一部分加入Cache中。

HIVE-3098增加了process方法的finally中清除clientUGI在FileSystem.Cache中对应的filesystem instance
finally {
 if (clientUgi != null) {
 // 清除与此clientUgi相关的filesystem
 try { FileSystem.closeAllForUGI(clientUgi); }
 catch(IOException exception) {
 LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
 }
}

正是由于第一个execute方法在finally中调用FileSystem.closeAllForUGI(clientUgi),close掉相关filesystem对象,同时也删除了绑定的scratch目录,第二个fetchN方法才没有数据可读。但是为什么同样实现了kerberos认证和impersonation的hive server 2没有碰到这个问题呢? 其实hive server 2在开启impersonation(set hive.server2.enable.doAs=true)后并不是在thrift processor level而是在hive session level做impersonation的,从而不会在process finally中清理filesystem
           // hive server 2中useProxy = false;
           if (useProxy) {
             clientUgi = UserGroupInformation.createProxyUser(
               endUser, UserGroupInformation.getLoginUser());
             remoteUser.set(clientUgi.getShortUserName());
             returnCode = clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
                 public Boolean run() {
                   try {
                     return wrapped.process(inProt, outProt);
                   } catch (TException te) {
                     throw new RuntimeException(te);
                   }
                 }
               });
           } else {
             remoteUser.set(endUser);
             return wrapped.process(inProt, outProt);
           }

在HiveSessionProxy(代理HiveSessionImplwithUGI)中用ugi doAs执行
  public Object invoke(Object arg0, final Method method, final Object[] args)
      throws Throwable {
    try {
      return ShimLoader.getHadoopShims().doAs(ugi,
        new PrivilegedExceptionAction<Object> () {
          @Override
          public Object run() throws HiveSQLException {
            try {
              return method.invoke(base, args);
            } catch (InvocationTargetException e) {
              if (e.getCause() instanceof HiveSQLException) {
                throw (HiveSQLException)e.getCause();
              } else {
                throw new RuntimeException(e.getCause());
              }
            } catch (IllegalArgumentException e) {
              throw new RuntimeException(e);
            } catch (IllegalAccessException e) {
              throw new RuntimeException(e);
            }
          }
        });
    } catch (UndeclaredThrowableException e) {
      Throwable innerException = e.getCause();
      if (innerException instanceof PrivilegedActionException) {
        throw innerException.getCause();
      } else {
        throw e.getCause();
      }
    }
  }

client端调用HiveConnection.close后,最终server端会调用HiveSessionImplwithUGI.close();关闭UGI相对应的filesystem对象
  public void close() throws HiveSQLException {
    try {
    acquire();
    ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi);
    cancelDelegationToken();
    } finally {
      release();
      super.close();
    }
  }

解决方法
了解实现原理后,解决的方式有三种:
1. 启动Hive Server的时候关闭FileSystem Cache
$HIVE_HOME/bin/hive --service hiveserver2 --hiveconf fs.hdfs.impl.disable.cache=true --hiveconf fs.file.impl.disable.cache=true​
2. Hive Context中设置setHDFSCleanup(false),从而不会自动清除scratch目录,但是会有orphaned files问题,需要另外部署一个定时脚本去主动删除
3. thrift processor中根据每个function call的返回值来判断是否close filesystem,并且在最后connection close的时候,主动close filesystem

我们最终采用了第三种方案:
将clientUgi.doAs返回的结果保存下来,在finally环节判断如果返回值为false,也就是执行结果fail的时候可以closeAllForUGI
         finally {
           if (!returnCode) {
             if (clientUgi != null) {
               LOG.info("Start to close filesystem for clientUgi:" + clientUgi.getUserName());
               try { FileSystem.closeAllForUGI(clientUgi); }
                 catch(IOException exception) {
                   LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
                 }
             }
           }
         }

同时在HiveServerHandler的clean方法中(即关闭一个Hive Coonection的时候)加入对于filesystem清理的逻辑
    public void clean() {
      if (driver != null) {
        driver.close();
        driver.destroy();
      }
      SessionState session = SessionState.get();
      if (session.getTmpOutputFile() != null) {
        session.getTmpOutputFile().delete();
      }
      pipeIn = null;
      try {
        LOG.info("Start to close filesystem for ugi:" + UserGroupInformation.getCurrentUser().getUserName());
        ShimLoader.getHadoopShims().closeAllForUGI(UserGroupInformation.getCurrentUser());
      } catch (IOException ioe) {
        ioe.printStackTrace();
      }
    }

修改上述代码后重新编译,之前三种case语句都能正常返回结果了,就这个问题折腾了一天,hive的bug不是一般的多啊,所以时不时会踩到坑,不过在发现问题到debug再到解决问题的过程中也学习到了很多。

作者:lalaguozhe 发表于2013-10-23 14:12:50 原文链接
阅读:113 评论:0 查看评论

相关 [hive0 hive server] 推荐:

Hive Server 2 安装部署测试

- - CSDN博客推荐文章
Hive 0.11 包含了Hive Server 1 和 Hive Server 2,还包含1的原因是为了做到向下兼容性. 从长远来看都会以Hive Server 2作为首选. 1. 配置hive server监听端口和Host. 2. 配置kerberos认证,这样thrift client与hive server 2, hive server 2和hdfs 都由kerberos作认证.

hive0.11的hive server实现kerberos认证和impersonation中碰到的问题

- - CSDN博客云计算推荐文章
不过在re-apply这个patch后发现,用jdbc client访问hive server对于某些语句返回的是空结果集(HiveQueryResultSet中的fetchedRows是个空集合),中间也没有任何报错. 非常奇怪,通过多次尝试定位出只有一种case的语句会正常返回结果,即类似“select * from xxx where yyy”这种不会起MapReduce Job的语句,其他“show tables/databases”,“select a from xxx”等语句都返回为空结果集.

SQL Server--索引

- - CSDN博客推荐文章
         1,概念:  数据库索引是对数据表中一个或多个列的值进行排序的结构,就像一本书的目录一样,索引提供了在行中快速查询特定行的能力..             2.1优点:  1,大大加快搜索数据的速度,这是引入索引的主要原因..                             2,创建唯一性索引,保证数据库表中每一行数据的唯一性..

SQL Server 面试

- - SQL - 编程语言 - ITeye博客
在SQL语言中,一个SELECT…FROM…WHERE语句称为一个查询块,将一个查询块嵌套在另一个查询块的WHERE子句中的查询称为子查询. 子查询分为嵌套子查询和相关子查询两种. 嵌套子查询的求解方法是由里向外处理,即每个子查询在其上一级查询处理之前求解,子查询的结果作为其父查询的查询条件. 子查询只执行一次,且可以单独执行;.

hive调优

- - 互联网 - ITeye博客
一、    控制hive任务中的map数: . 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);.

hive 优化 tips

- - CSDN博客推荐文章
一、     Hive join优化. 也可以显示声明进行map join:特别适用于小表join大表的时候,SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key. 2.     注意带表分区的join, 如:.

Hive中的join

- - CSDN博客云计算推荐文章
select a.* from a join b on a.id = b.id select a.* from a join b on (a.id = b.id and a.department = b.department). 在使用join写查询的时候有一个原则:应该将条目少的表或者子查询放在join操作符的左边.

hive优化(2)

- - 开源软件 - ITeye博客
Hive是将符合SQL语法的字符串解析生成可以在Hadoop上执行的MapReduce的工具. 使用Hive尽量按照分布式计算的一些特点来设计sql,和传统关系型数据库有区别,. 所以需要去掉原有关系型数据库下开发的一些固有思维. 1:尽量尽早地过滤数据,减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段.

hive优化

- - 开源软件 - ITeye博客
hive.optimize.cp=true:列裁剪. hive.optimize.prunner:分区裁剪. hive.limit.optimize.enable=true:优化LIMIT n语句. hive.limit.optimize.limit.file=10:最大文件数.   1.job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB).

Hive优化

- - 互联网 - ITeye博客
     使用Hive有一段时间了,目前发现需要进行优化的较多出现在出现join、distinct的情况下,而且一般都是reduce过程较慢.      Reduce过程比较慢的现象又可以分为两类:. 情形一:map已经达到100%,而reduce阶段一直是99%,属于数据倾斜. 情形二:使用了count(distinct)或者group by的操作,现象是reduce有进度但是进度缓慢,31%-32%-34%...一个附带的提示是使用reduce个数很可能是1.