Hive Server 2 安装部署测试
- - CSDN博客推荐文章Hive 0.11 包含了Hive Server 1 和 Hive Server 2,还包含1的原因是为了做到向下兼容性. 从长远来看都会以Hive Server 2作为首选. 1. 配置hive server监听端口和Host. 2. 配置kerberos认证,这样thrift client与hive server 2, hive server 2和hdfs 都由kerberos作认证.
public void send_execute(String query) throws org.apache.thrift.TException { execute_args args = new execute_args(); args.setQuery(query); sendBase("execute", args); }server端由HiveServerHandler来处理,对应的execute方法会new一个driver,调用driver.run(cmd)来执行具体的语句
public void send_fetchN(int numRows) throws org.apache.thrift.TException { fetchN_args args = new fetchN_args(); args.setNumRows(numRows); sendBase("fetchN", args); }server端HiveServerHandler中的fetchN会调用driver.getResult(), 由QueryPlan的FetchTask中的FetchOperator获取存放结果集的文件路径,得到InputFormat信息,有了InputFormat就可以调用getSplits方法获取一串InputSplit,依次获取每一个InputSplit的RecordReader,迭代next获取key/value值(value代表每行结果record)
public void send_clean() throws org.apache.thrift.TException { clean_args args = new clean_args(); sendBase("clean", args); }server端执行clean方法,close掉driver并对context做一些清理工作,删除语句产生的scratch directories (local file system和hdfs上的都会清除)
private void removeScratchDir() { for (Map.Entry<String, String> entry : fsScratchDirs.entrySet()) { try { Path p = new Path(entry.getValue()); p.getFileSystem(conf).delete(p, true); } catch (Exception e) { LOG.warn("Error Removing Scratch: " + StringUtils.stringifyException(e)); } } fsScratchDirs.clear(); }
public boolean pushRow() throws IOException, HiveException { InspectableObject row = getNextRow(); if (row != null) { operator.process(row.o, 0); } return row != null; }比如对于语句”select c1 from abc where c1 = 1;“,会依次调用Fetch Operator -> TableScanOperator -> FilterOperator -> SelectOperator -> ListSinkOperator
command = new VariableSubstitution().substitute(conf,command); ctx = new Context(conf); ctx.setTryCount(getTryCount()); ctx.setCmd(command); ctx.setHDFSCleanup(true);
private String getScratchDir(String scheme, String authority, boolean mkdir, String scratchDir) { String fileSystem = scheme + ":" + authority; String dir = fsScratchDirs.get(fileSystem); if (dir == null) { Path dirPath = new Path(scheme, authority, scratchDir); if (mkdir) { try { FileSystem fs = dirPath.getFileSystem(conf); dirPath = new Path(fs.makeQualified(dirPath).toString()); if (!fs.mkdirs(dirPath)) { throw new RuntimeException("Cannot make directory: " + dirPath.toString()); } if (isHDFSCleanup) { fs.deleteOnExit(dirPath); } } catch (IOException e) { throw new RuntimeException (e); } } dir = dirPath.toString(); fsScratchDirs.put(fileSystem, dir); }
public void close() throws IOException { // delete all files that were marked as delete-on-exit. processDeleteOnExit(); CACHE.remove(this.key, this); }
private class ClientFinalizer implements Runnable { @Override public synchronized void run() { try { closeAll(true); } catch (IOException e) { LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e); } } }
finally { if (clientUgi != null) { // 清除与此clientUgi相关的filesystem try { FileSystem.closeAllForUGI(clientUgi); } catch(IOException exception) { LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception); } }
// hive server 2中useProxy = false; if (useProxy) { clientUgi = UserGroupInformation.createProxyUser( endUser, UserGroupInformation.getLoginUser()); remoteUser.set(clientUgi.getShortUserName()); returnCode = clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() { public Boolean run() { try { return wrapped.process(inProt, outProt); } catch (TException te) { throw new RuntimeException(te); } } }); } else { remoteUser.set(endUser); return wrapped.process(inProt, outProt); }
public Object invoke(Object arg0, final Method method, final Object[] args) throws Throwable { try { return ShimLoader.getHadoopShims().doAs(ugi, new PrivilegedExceptionAction<Object> () { @Override public Object run() throws HiveSQLException { try { return method.invoke(base, args); } catch (InvocationTargetException e) { if (e.getCause() instanceof HiveSQLException) { throw (HiveSQLException)e.getCause(); } else { throw new RuntimeException(e.getCause()); } } catch (IllegalArgumentException e) { throw new RuntimeException(e); } catch (IllegalAccessException e) { throw new RuntimeException(e); } } }); } catch (UndeclaredThrowableException e) { Throwable innerException = e.getCause(); if (innerException instanceof PrivilegedActionException) { throw innerException.getCause(); } else { throw e.getCause(); } } }
public void close() throws HiveSQLException { try { acquire(); ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi); cancelDelegationToken(); } finally { release(); super.close(); } }
$HIVE_HOME/bin/hive --service hiveserver2 --hiveconf fs.hdfs.impl.disable.cache=true --hiveconf fs.file.impl.disable.cache=true2. Hive Context中设置setHDFSCleanup(false),从而不会自动清除scratch目录,但是会有orphaned files问题,需要另外部署一个定时脚本去主动删除
finally { if (!returnCode) { if (clientUgi != null) { LOG.info("Start to close filesystem for clientUgi:" + clientUgi.getUserName()); try { FileSystem.closeAllForUGI(clientUgi); } catch(IOException exception) { LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception); } } } }
public void clean() { if (driver != null) { driver.close(); driver.destroy(); } SessionState session = SessionState.get(); if (session.getTmpOutputFile() != null) { session.getTmpOutputFile().delete(); } pipeIn = null; try { LOG.info("Start to close filesystem for ugi:" + UserGroupInformation.getCurrentUser().getUserName()); ShimLoader.getHadoopShims().closeAllForUGI(UserGroupInformation.getCurrentUser()); } catch (IOException ioe) { ioe.printStackTrace(); } }