hive中udf读写hbase
在大数据开发过程中经常会遇到,将hive中处理后的结果写入hbase中,每次都要写java程序会非常浪费时间,我们就想了一个办法 ,用hive的udf来实现。 只需要调用同一个udf,将表名字段名以及每一个字段的值作为udf的参数,就可以实现写hbase了。 这样大大的节省了开发时间,提升了开发效率。 大家可以按照这种思路写自己需要的功能。这里只简单的列举几个供大家参考,具体操作如下:
一、依赖的jar包
commons-codec-1.7.jar
commons-collections-3.2.1.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-logging-1.1.1.jar
hadoop-core-1.2.1.jar
hbase-client-0.98.6-cdh5.3.0.jar
hbase-common-0.98.6-cdh5.3.0.jar
hbase-protocol-0.98.6-cdh5.3.0.jar
hive-exec-0.13.1.jar
htrace-core-2.04.jar
log4j-1.2.17.jar
netty-3.6.6.Final.jar
protobuf-java-2.5.0.jar
slf4j-api-1.6.4.jar
slf4j-log4j12-1.6.4.jar
zookeeper-3.4.6.jar
二、具体功能:
(1)写入功能:
package com.paic.pad.dp.hbase.udf;
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
/**
* import hive data to hbase table create this function: CREATE TEMPORARY
* FUNCTION upa_default_hive2HBase as
* ‘com.unionpay.upa.hive.udf.UDFDefaultHiveOutputToHbase’;
*/
@Description(name = “pad_put_HBase”, value = “ FUNC(zookeeperQuorum, hbaseTable, CF, rowKey, ‘name1, name2, name3’, c1, c2, c3, …) - read data from hive and import it to hbase, ”
+ “returns success of the import.”, extended = “The first argument is zookeeperQuorum, ”
+ “the second argument is the hbase table, ”
+ “the Third argument is the CF, ”
+ “the fourth argument is the rowKey, ”
+ “the other args should be a map, seprated by ‘,’ .”
+ “example: select FUNC(‘zookeeperQuorum’, ‘tableName’, ‘columFamily’, key, ‘columnName1,columnName2’, columnName1value,columnName2value) from dual;”)
@UDFType(deterministic = false)
public class UDFHbaseMerge extends GenericUDF {
private static final Log LOG = LogFactory.getLog(UDFHbaseMerge.class
.getName());
protected transient ObjectInspector[] argumentOI;
protected transient String hbaseTable;
protected HTable table;
protected HConnection connection;
protected static String cf = "F";
protected static String[] cols;
protected final static String NULL_FLAG = "";
protected final Text result = new Text();
protected String zookeeperQuorum;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentTypeException {
argumentOI = arguments;
for (int i = 0; i < 3; i++) {
if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
throw new UDFArgumentTypeException(i,
"The argument of function should be \""
+ serdeConstants.STRING_TYPE_NAME
+ "\", but \"" + arguments[i].getTypeName()
+ "\" is found");
}
}
}
for (int i = 3; i < arguments.length; i++) {
if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(i,
"The argument of function should be primative"
+ ", but \"" + arguments[i].getTypeName()
+ "\" is found");
}
}
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) {
try {
if (table == null) {
zookeeperQuorum = getDeferredObject(arguments, 0);
hbaseTable = getDeferredObject(arguments, 1);
cf = getDeferredObject(arguments, 2);
cols = getDeferredObject(arguments, 4).split(",");
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("mapred.task.timeout", "3600000"); // ms
conf.set("dfs.socket.timeout", "3600000");
conf.set("dfs.datanode.socket.write.timeout", "3600000");
connection = HConnectionManager.createConnection(conf);
table = (HTable) connection.getTable(hbaseTable);
table.setAutoFlush(false, false);
}
Put put = getPut(arguments);
try {
table.put(put);
} catch (IOException e) {
LOG.error(Bytes.toString(table.getTableName())+ " put error " + e.getMessage());
}
result.set("success");
} catch (Exception ex) {
LOG.error(ex);
result.set(ex.toString());
this.close();
}
return result;
}
@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("pad_default_hive2HBase(");
if (children.length > 0) {
sb.append(children[0]);
for (int i = 1; i < children.length; i++) {
sb.append(",");
sb.append(children[i]);
}
}
sb.append(")");
return sb.toString();
}
@Override
public void close() {
try {
super.close();
if (table != null) {
table.flushCommits();
table.close();
connection.close();
}
} catch (Exception e) {
LOG.error(Bytes.toString(table.getTableName()) + " close error " + e.getMessage());
}
}
@Override
public String[] getRequiredFiles() {
return super.getRequiredFiles();
}
protected String getDeferredObject(DeferredObject[] arguments, int index)
throws HiveException {
if (arguments[index].get() == null) {
return NULL_FLAG;
}
return ((PrimitiveObjectInspector) argumentOI[index])
.getPrimitiveJavaObject(arguments[index].get()).toString();
}
protected Put getPut(DeferredObject[] arguments) throws Exception {
String rowKey = getDeferredObject(arguments, 3);
Put put = new Put(toBytes(rowKey));
for (int i = 0; i < cols.length; i++) {
put.add(toBytes(cf), toBytes(cols[i]), toBytes(getDeferredObject(
arguments, i + 5)));
}
return put;
}
}
(2)、删除功能。
package com.paic.pad.dp.hbase.udf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
/**
* import hive data to hbase table create this function: CREATE TEMPORARY
* FUNCTION upa_default_hive2HBase as
* ‘com.unionpay.upa.hive.udf.UDFDefaultHiveOutputToHbase’;
*/
@Description(name = “pad_delete_HBase”, value = “ FUNC(zookeeperQuorum,hbaseTable, CF, rowKey, c1, c2, c3, …) - read data from hive and delete same date at HBase, ”
+ “returns success of the import.”, extended = “The first argument is zookeeperQuorum, ”
+ “the second argument is the hbase table, ”
+ “the third argument is the CF, ”
+ “the fourth argument is the rowKey, ”
+ “example: select FUNC(‘zookeeperQuorum’, ‘tableName’, ‘key’) from dual;”)
@UDFType(deterministic = false)
public class UDFHbaseDelete extends GenericUDF {
private static final Log LOG = LogFactory.getLog(UDFHbaseMerge.class
.getName());
protected transient ObjectInspector[] argumentOI;
protected transient String hbaseTable;
protected HTable table;
protected HConnection connection;
protected final static String NULL_FLAG = "";
protected final Text result = new Text();
protected String zookeeperQuorum;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentTypeException {
argumentOI = arguments;
for (int i = 0; i < 3; i++) {
if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
throw new UDFArgumentTypeException(i,
"The argument of function should be \""
+ serdeConstants.STRING_TYPE_NAME
+ "\", but \"" + arguments[i].getTypeName()
+ "\" is found");
}
}
}
for (int i = 3; i < arguments.length; i++) {
if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(i,
"The argument of function should be primative"
+ ", but \"" + arguments[i].getTypeName()
+ "\" is found");
}
}
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) {
try {
if (table == null) {
zookeeperQuorum = getDeferredObject(arguments, 0);
hbaseTable = getDeferredObject(arguments, 1);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("mapred.task.timeout", "3600000"); // ms
conf.set("dfs.socket.timeout", "3600000");
conf.set("dfs.datanode.socket.write.timeout", "3600000");
connection = HConnectionManager.createConnection(conf);
table = (HTable) connection.getTable(hbaseTable);
table.setAutoFlush(false, false);
}
Delete d = new Delete(Bytes.toBytes(getDeferredObject(arguments, 2)));
table.delete(d);
result.set("success");
} catch (Exception ex) {
LOG.error(ex);
result.set(ex.toString());
this.close();
}
return result;
}
@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("pad_default_hive2HBase(");
if (children.length > 0) {
sb.append(children[0]);
for (int i = 1; i < children.length; i++) {
sb.append(",");
sb.append(children[i]);
}
}
sb.append(")");
return sb.toString();
}
@Override
public void close() {
try {
super.close();
if (table != null) {
table.flushCommits();
table.close();
connection.close();
}
} catch (Exception e) {
System.out.println(Bytes.toString(table.getTableName())
+ " close error " + e.getMessage());
}
}
@Override
public String[] getRequiredFiles() {
return super.getRequiredFiles();
}
protected String getDeferredObject(DeferredObject[] arguments, int index)
throws HiveException {
if (arguments[index].get() == null) {
return NULL_FLAG;
}
return ((PrimitiveObjectInspector) argumentOI[index])
.getPrimitiveJavaObject(arguments[index].get()).toString();
}
}
(3)查询
package com.paic.pad.dp.hbase.udf;
import static org.apache.hadoop.hbase.util.Bytes.toBytes;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
/**
* import hive data to hbase table create this function: CREATE TEMPORARY
* FUNCTION upa_default_hive2HBase as
* ‘com.unionpay.upa.hive.udf.UDFDefaultHiveOutputToHbase’;
*/
@Description(name = “pad_put_HBase”, value = “ FUNC(zookeeperQuorum, hbaseTable, CF, rowKey, ‘name1, name2, name3’, c1, c2, c3, …) - read data from hive and import it to hbase, ”
+ “returns success of the import.”, extended = “The first argument is zookeeperQuorum, ”
+ “the second argument is the hbase table, ”
+ “the Third argument is the CF, ”
+ “the fourth argument is the rowKey, ”
+ “the other args should be a map, seprated by ‘,’ .”
+ “example: select FUNC(‘zookeeperQuorum’, ‘tableName’, ‘columFamily’, key, ‘columnName’) from dual;”)
@UDFType(deterministic = false)
public class UDFHbaseSelect extends GenericUDF {
private static final Log LOG = LogFactory.getLog(UDFHbaseMerge.class
.getName());
protected transient ObjectInspector[] argumentOI;
protected transient String hbaseTable;
protected HTable table;
protected HConnection connection;
protected static String cf = "F";
protected static String[] cols;
protected final static String NULL_FLAG = "";
protected final Text result = new Text();
protected String zookeeperQuorum;
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
// TODO Auto-generated method stub
String cv="";
try {
if (table == null) {
zookeeperQuorum = getDeferredObject(arguments, 0);
hbaseTable = getDeferredObject(arguments, 1);
cf = getDeferredObject(arguments, 2);
cols = getDeferredObject(arguments, 4).split(",");
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("mapred.task.timeout", "3600000"); // ms
conf.set("dfs.socket.timeout", "3600000");
conf.set("dfs.datanode.socket.write.timeout", "3600000");
connection = HConnectionManager.createConnection(conf);
table = (HTable) connection.getTable(hbaseTable);
table.setAutoFlush(false, false);
}
try {
cv=getColumnValue(arguments);
} catch (IOException e) {
LOG.error(Bytes.toString(table.getTableName()) + " put error "
+ e.getMessage());
}
result.set(cv);
} catch (Exception ex) {
LOG.error(ex);
result.set(ex.toString());
this.close();
}
return result;
}
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentException {
// TODO Auto-generated method stub
argumentOI = arguments;
for (int i = 0; i < 3; i++) {
if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
throw new UDFArgumentTypeException(i,
"The argument of function should be \""
+ serdeConstants.STRING_TYPE_NAME
+ "\", but \"" + arguments[i].getTypeName()
+ "\" is found");
}
}
}
for (int i = 3; i < arguments.length; i++) {
if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(i,
"The argument of function should be primative"
+ ", but \"" + arguments[i].getTypeName()
+ "\" is found");
}
}
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("pad_default_hive2HBase(");
if (children.length > 0) {
sb.append(children[0]);
for (int i = 1; i < children.length; i++) {
sb.append(",");
sb.append(children[i]);
}
}
sb.append(")");
return sb.toString();
}
@Override
public void close() {
try {
super.close();
if (table != null) {
table.flushCommits();
table.close();
connection.close();
}
} catch (Exception e) {
LOG.error(Bytes.toString(table.getTableName()) + " close error "
+ e.getMessage());
}
}
@Override
public String[] getRequiredFiles() {
return super.getRequiredFiles();
}
protected String getDeferredObject(DeferredObject[] arguments, int index)
throws HiveException {
if (arguments[index].get() == null) {
return NULL_FLAG;
}
return ((PrimitiveObjectInspector) argumentOI[index])
.getPrimitiveJavaObject(arguments[index].get()).toString();
}
protected Put getPut(DeferredObject[] arguments) throws Exception {
String rowKey = getDeferredObject(arguments, 3);
Put put = new Put(toBytes(rowKey));
for (int i = 0; i < cols.length; i++) {
put.add(toBytes(cf), toBytes(cols[i]), toBytes(getDeferredObject(
arguments, i + 5)));
}
return put;
}
protected String getColumnValue(DeferredObject[] arguments)
throws Exception {
StringBuffer columnValues = new StringBuffer();
String rowKey = getDeferredObject(arguments, 3);
Get get = new Get(toBytes(rowKey));
org.apache.hadoop.hbase.client.Result dbResult = table.get(get);
if (dbResult.size() >= 0) {
columnValues.append(Bytes.toString(dbResult.getValue(Bytes
.toBytes(getDeferredObject(arguments, 2)), Bytes
.toBytes(getDeferredObject(arguments, 4)))));
}
return columnValues.toString();
}
}
(4)truncate表
package com.paic.pad.dp.hbase.udf;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;
@Description(name = “pad_truncate_hbase”, value = “ FUNC(evn, table, split) - truncate HBase table, ”
+ “returns success of the truncate success.”, extended = “The first argument is zookeeper, ”
+ “the second argument is the HBase table name, ”
+ “the Third argument is the HBase split, seprated by ‘,’ . ”
+ “example: select FUNC(‘zookeeperQuorum’,’tableName’, ‘1,2,3,4,5,6,7,8,9,0’) from dual;”)
@UDFType(deterministic = false)
public class UDFHbaseTruncate extends GenericUDF {
private static final Log LOG = LogFactory.getLog(UDFHbaseTruncate.class
.getName());
protected transient ObjectInspector[] argumentOI;
protected transient String hbaseTable;
protected HConnection connection;
protected static String[] cols;
protected final static String NULL_FLAG = "";
protected final Text result = new Text();
protected String zookeeperQuorum;
protected String split;
protected byte[][] splits = null;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentTypeException {
argumentOI = arguments;
for (int i = 0; i < 2; i++) {
if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
throw new UDFArgumentTypeException(i,
"The argument of function should be \""
+ serdeConstants.STRING_TYPE_NAME
+ "\", but \"" + arguments[i].getTypeName()
+ "\" is found");
}
}
}
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) {
try {
zookeeperQuorum = getDeferredObject(arguments, 0);
hbaseTable = getDeferredObject(arguments, 1);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
// conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("mapred.task.timeout", "3600000"); // ms
conf.set("dfs.socket.timeout", "3600000");
conf.set("dfs.datanode.socket.write.timeout", "3600000");
connection = HConnectionManager.createConnection(conf);
HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
HTableDescriptor td = hBaseAdmin.getTableDescriptor(Bytes
.toBytes(hbaseTable));
hBaseAdmin.disableTable(Bytes.toBytes(hbaseTable));
hBaseAdmin.deleteTable(Bytes.toBytes(hbaseTable));
LOG.error("\n\n\n\n\n\nLength:" + arguments.length + "\n\n\n\n");
if(arguments.length > 2){
split = getDeferredObject(arguments, 2);
if (null != split && !split.equals(NULL_FLAG)) {
String[] strs = split.split(",");
splits = new byte[strs.length][];
for (int i = 0; i < splits.length; i++) {
splits[i] = Bytes.toBytes(strs[i].trim());
}
}
}
hBaseAdmin.createTable(td, splits);
result.set("success");
} catch (Exception ex) {
LOG.error(ex);
result.set(ex.toString());
this.close();
}
return result;
}
@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("pad_truncate_hbase(");
if (children.length > 0) {
sb.append(children[0]);
for (int i = 1; i < children.length; i++) {
sb.append(",");
sb.append(children[i]);
}
}
sb.append(")");
return sb.toString();
}
@Override
public void close() {
try {
super.close();
connection.close();
} catch (Exception e) {
LOG.error(" close error " + e.getMessage());
}
}
protected String getDeferredObject(DeferredObject[] arguments, int index)
throws HiveException {
if (arguments[index].get() == null) {
return NULL_FLAG;
}
return ((PrimitiveObjectInspector) argumentOI[index])
.getPrimitiveJavaObject(arguments[index].get()).toString();
}
}