Hive Server 2 安装部署测试
- - CSDN博客推荐文章Hive 0.11 包含了Hive Server 1 和 Hive Server 2,还包含1的原因是为了做到向下兼容性. 从长远来看都会以Hive Server 2作为首选. 1. 配置hive server监听端口和Host. 2. 配置kerberos认证,这样thrift client与hive server 2, hive server 2和hdfs 都由kerberos作认证.
public void send_execute(String query) throws org.apache.thrift.TException
{
execute_args args = new execute_args();
args.setQuery(query);
sendBase("execute", args);
}
server端由HiveServerHandler来处理,对应的execute方法会new一个driver,调用driver.run(cmd)来执行具体的语句 public void send_fetchN(int numRows) throws org.apache.thrift.TException
{
fetchN_args args = new fetchN_args();
args.setNumRows(numRows);
sendBase("fetchN", args);
}
server端HiveServerHandler中的fetchN会调用driver.getResult(), 由QueryPlan的FetchTask中的FetchOperator获取存放结果集的文件路径,得到InputFormat信息,有了InputFormat就可以调用getSplits方法获取一串InputSplit,依次获取每一个InputSplit的RecordReader,迭代next获取key/value值(value代表每行结果record) public void send_clean() throws org.apache.thrift.TException
{
clean_args args = new clean_args();
sendBase("clean", args);
}
server端执行clean方法,close掉driver并对context做一些清理工作,删除语句产生的scratch directories (local file system和hdfs上的都会清除) private void removeScratchDir() {
for (Map.Entry<String, String> entry : fsScratchDirs.entrySet()) {
try {
Path p = new Path(entry.getValue());
p.getFileSystem(conf).delete(p, true);
} catch (Exception e) {
LOG.warn("Error Removing Scratch: "
+ StringUtils.stringifyException(e));
}
}
fsScratchDirs.clear();
}
public boolean pushRow() throws IOException, HiveException {
InspectableObject row = getNextRow();
if (row != null) {
operator.process(row.o, 0);
}
return row != null;
}
比如对于语句”select c1 from abc where c1 = 1;“,会依次调用Fetch Operator -> TableScanOperator -> FilterOperator -> SelectOperator -> ListSinkOperator command = new VariableSubstitution().substitute(conf,command);
ctx = new Context(conf);
ctx.setTryCount(getTryCount());
ctx.setCmd(command);
ctx.setHDFSCleanup(true);
private String getScratchDir(String scheme, String authority, boolean mkdir, String scratchDir) {
String fileSystem = scheme + ":" + authority;
String dir = fsScratchDirs.get(fileSystem);
if (dir == null) {
Path dirPath = new Path(scheme, authority, scratchDir);
if (mkdir) {
try {
FileSystem fs = dirPath.getFileSystem(conf);
dirPath = new Path(fs.makeQualified(dirPath).toString());
if (!fs.mkdirs(dirPath)) {
throw new RuntimeException("Cannot make directory: "
+ dirPath.toString());
}
if (isHDFSCleanup) {
fs.deleteOnExit(dirPath);
}
} catch (IOException e) {
throw new RuntimeException (e);
}
}
dir = dirPath.toString();
fsScratchDirs.put(fileSystem, dir);
}
public void close() throws IOException {
// delete all files that were marked as delete-on-exit.
processDeleteOnExit();
CACHE.remove(this.key, this);
}
private class ClientFinalizer implements Runnable {
@Override
public synchronized void run() {
try {
closeAll(true);
} catch (IOException e) {
LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
}
}
}
finally {
if (clientUgi != null) {
// 清除与此clientUgi相关的filesystem
try { FileSystem.closeAllForUGI(clientUgi); }
catch(IOException exception) {
LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
}
}
// hive server 2中useProxy = false;
if (useProxy) {
clientUgi = UserGroupInformation.createProxyUser(
endUser, UserGroupInformation.getLoginUser());
remoteUser.set(clientUgi.getShortUserName());
returnCode = clientUgi.doAs(new PrivilegedExceptionAction<Boolean>() {
public Boolean run() {
try {
return wrapped.process(inProt, outProt);
} catch (TException te) {
throw new RuntimeException(te);
}
}
});
} else {
remoteUser.set(endUser);
return wrapped.process(inProt, outProt);
}
public Object invoke(Object arg0, final Method method, final Object[] args)
throws Throwable {
try {
return ShimLoader.getHadoopShims().doAs(ugi,
new PrivilegedExceptionAction<Object> () {
@Override
public Object run() throws HiveSQLException {
try {
return method.invoke(base, args);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof HiveSQLException) {
throw (HiveSQLException)e.getCause();
} else {
throw new RuntimeException(e.getCause());
}
} catch (IllegalArgumentException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
});
} catch (UndeclaredThrowableException e) {
Throwable innerException = e.getCause();
if (innerException instanceof PrivilegedActionException) {
throw innerException.getCause();
} else {
throw e.getCause();
}
}
}
public void close() throws HiveSQLException {
try {
acquire();
ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi);
cancelDelegationToken();
} finally {
release();
super.close();
}
}
$HIVE_HOME/bin/hive --service hiveserver2 --hiveconf fs.hdfs.impl.disable.cache=true --hiveconf fs.file.impl.disable.cache=true2. Hive Context中设置setHDFSCleanup(false),从而不会自动清除scratch目录,但是会有orphaned files问题,需要另外部署一个定时脚本去主动删除
finally {
if (!returnCode) {
if (clientUgi != null) {
LOG.info("Start to close filesystem for clientUgi:" + clientUgi.getUserName());
try { FileSystem.closeAllForUGI(clientUgi); }
catch(IOException exception) {
LOG.error("Could not clean up file-system handles for UGI: " + clientUgi, exception);
}
}
}
}
public void clean() {
if (driver != null) {
driver.close();
driver.destroy();
}
SessionState session = SessionState.get();
if (session.getTmpOutputFile() != null) {
session.getTmpOutputFile().delete();
}
pipeIn = null;
try {
LOG.info("Start to close filesystem for ugi:" + UserGroupInformation.getCurrentUser().getUserName());
ShimLoader.getHadoopShims().closeAllForUGI(UserGroupInformation.getCurrentUser());
} catch (IOException ioe) {
ioe.printStackTrace();
}
}