hive中udf读写hbase

标签: hive udf hbase | 发表时间:2015-08-04 01:34 | 作者:zzuiezhangqihui
出处:http://blog.csdn.net

在大数据开发过程中经常会遇到,将hive中处理后的结果写入hbase中,每次都要写java程序会非常浪费时间,我们就想了一个办法 ,用hive的udf来实现。 只需要调用同一个udf,将表名字段名以及每一个字段的值作为udf的参数,就可以实现写hbase了。 这样大大的节省了开发时间,提升了开发效率。 大家可以按照这种思路写自己需要的功能。这里只简单的列举几个供大家参考,具体操作如下:

一、依赖的jar包
commons-codec-1.7.jar
commons-collections-3.2.1.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-logging-1.1.1.jar
hadoop-core-1.2.1.jar
hbase-client-0.98.6-cdh5.3.0.jar
hbase-common-0.98.6-cdh5.3.0.jar
hbase-protocol-0.98.6-cdh5.3.0.jar
hive-exec-0.13.1.jar
htrace-core-2.04.jar
log4j-1.2.17.jar
netty-3.6.6.Final.jar
protobuf-java-2.5.0.jar
slf4j-api-1.6.4.jar
slf4j-log4j12-1.6.4.jar
zookeeper-3.4.6.jar

二、具体功能:
(1)写入功能:
package com.paic.pad.dp.hbase.udf;

import static org.apache.hadoop.hbase.util.Bytes.toBytes;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

/**
* import hive data to hbase table create this function: CREATE TEMPORARY
* FUNCTION upa_default_hive2HBase as
* ‘com.unionpay.upa.hive.udf.UDFDefaultHiveOutputToHbase’;
*/
@Description(name = “pad_put_HBase”, value = “ FUNC(zookeeperQuorum, hbaseTable, CF, rowKey, ‘name1, name2, name3’, c1, c2, c3, …) - read data from hive and import it to hbase, ”
+ “returns success of the import.”, extended = “The first argument is zookeeperQuorum, ”
+ “the second argument is the hbase table, ”
+ “the Third argument is the CF, ”
+ “the fourth argument is the rowKey, ”
+ “the other args should be a map, seprated by ‘,’ .”
+ “example: select FUNC(‘zookeeperQuorum’, ‘tableName’, ‘columFamily’, key, ‘columnName1,columnName2’, columnName1value,columnName2value) from dual;”)
@UDFType(deterministic = false)
public class UDFHbaseMerge extends GenericUDF {
private static final Log LOG = LogFactory.getLog(UDFHbaseMerge.class
.getName());

  protected transient ObjectInspector[] argumentOI;
protected transient String hbaseTable;
protected HTable table;
protected HConnection connection;
protected static String cf = "F";
protected static String[] cols;
protected final static String NULL_FLAG = "";
protected final Text result = new Text();
protected String zookeeperQuorum;

@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
        throws UDFArgumentTypeException {
    argumentOI = arguments;
    for (int i = 0; i < 3; i++) {
        if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
            PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
            if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
                throw new UDFArgumentTypeException(i,
                        "The argument of function  should be \""
                                + serdeConstants.STRING_TYPE_NAME
                                + "\", but \"" + arguments[i].getTypeName()
                                + "\" is found");
            }
        }
    }
    for (int i = 3; i < arguments.length; i++) {

        if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {

            throw new UDFArgumentTypeException(i,
                    "The argument of function should be primative"
                            + ", but \"" + arguments[i].getTypeName()
                            + "\" is found");
        }
    }

    return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}

@Override
public Object evaluate(DeferredObject[] arguments) {
    try {
        if (table == null) {
            zookeeperQuorum = getDeferredObject(arguments, 0);
            hbaseTable = getDeferredObject(arguments, 1);
            cf = getDeferredObject(arguments, 2);
            cols = getDeferredObject(arguments, 4).split(",");
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
            conf.set("hbase.zookeeper.property.clientPort", "2181");
            conf.set("mapred.task.timeout", "3600000"); // ms
            conf.set("dfs.socket.timeout", "3600000");
            conf.set("dfs.datanode.socket.write.timeout", "3600000");

            connection = HConnectionManager.createConnection(conf);
            table = (HTable) connection.getTable(hbaseTable);
            table.setAutoFlush(false, false);
        }
        Put put = getPut(arguments);

        try {
            table.put(put);
        } catch (IOException e) {
            LOG.error(Bytes.toString(table.getTableName())+ "  put error " + e.getMessage());
        }

        result.set("success");
    } catch (Exception ex) {
        LOG.error(ex);
        result.set(ex.toString());
        this.close();
    }
    return result;
}

@Override
public String getDisplayString(String[] children) {
    StringBuilder sb = new StringBuilder();
    sb.append("pad_default_hive2HBase(");
    if (children.length > 0) {
        sb.append(children[0]);
        for (int i = 1; i < children.length; i++) {
            sb.append(",");
            sb.append(children[i]);
        }
    }
    sb.append(")");
    return sb.toString();
}

@Override
public void close() {
    try {
        super.close();
        if (table != null) {
            table.flushCommits();
            table.close();
            connection.close();
        }
    } catch (Exception e) {
        LOG.error(Bytes.toString(table.getTableName()) + "  close  error " + e.getMessage());
    }

}

@Override
public String[] getRequiredFiles() {
    return super.getRequiredFiles();
}

protected String getDeferredObject(DeferredObject[] arguments, int index)
        throws HiveException {
    if (arguments[index].get() == null) {
        return NULL_FLAG;
    }
    return ((PrimitiveObjectInspector) argumentOI[index])
            .getPrimitiveJavaObject(arguments[index].get()).toString();
}

protected Put getPut(DeferredObject[] arguments) throws Exception {
    String rowKey = getDeferredObject(arguments, 3);
    Put put = new Put(toBytes(rowKey));
    for (int i = 0; i < cols.length; i++) {
        put.add(toBytes(cf), toBytes(cols[i]), toBytes(getDeferredObject(
                arguments, i + 5)));
    }
    return put;
}

}

(2)、删除功能。
package com.paic.pad.dp.hbase.udf;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

/**
* import hive data to hbase table create this function: CREATE TEMPORARY
* FUNCTION upa_default_hive2HBase as
* ‘com.unionpay.upa.hive.udf.UDFDefaultHiveOutputToHbase’;
*/
@Description(name = “pad_delete_HBase”, value = “ FUNC(zookeeperQuorum,hbaseTable, CF, rowKey, c1, c2, c3, …) - read data from hive and delete same date at HBase, ”
+ “returns success of the import.”, extended = “The first argument is zookeeperQuorum, ”
+ “the second argument is the hbase table, ”
+ “the third argument is the CF, ”
+ “the fourth argument is the rowKey, ”
+ “example: select FUNC(‘zookeeperQuorum’, ‘tableName’, ‘key’) from dual;”)
@UDFType(deterministic = false)
public class UDFHbaseDelete extends GenericUDF {
private static final Log LOG = LogFactory.getLog(UDFHbaseMerge.class
.getName());

  protected transient ObjectInspector[] argumentOI;
protected transient String hbaseTable;
protected HTable table;
protected HConnection connection;
protected final static String NULL_FLAG = "";
protected final Text result = new Text();
protected String zookeeperQuorum;

@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
        throws UDFArgumentTypeException {
    argumentOI = arguments;
    for (int i = 0; i < 3; i++) {
        if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
            PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
            if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
                throw new UDFArgumentTypeException(i,
                        "The argument of function  should be \""
                                + serdeConstants.STRING_TYPE_NAME
                                + "\", but \"" + arguments[i].getTypeName()
                                + "\" is found");
            }
        }
    }
    for (int i = 3; i < arguments.length; i++) {

        if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {

            throw new UDFArgumentTypeException(i,
                    "The argument of function should be primative"
                            + ", but \"" + arguments[i].getTypeName()
                            + "\" is found");
        }
    }

    return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}

@Override
public Object evaluate(DeferredObject[] arguments) {
    try {
        if (table == null) {
            zookeeperQuorum = getDeferredObject(arguments, 0);
            hbaseTable = getDeferredObject(arguments, 1);
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
            conf.set("hbase.zookeeper.property.clientPort", "2181");
            conf.set("mapred.task.timeout", "3600000"); // ms
            conf.set("dfs.socket.timeout", "3600000");
            conf.set("dfs.datanode.socket.write.timeout", "3600000");

            connection = HConnectionManager.createConnection(conf);
            table = (HTable) connection.getTable(hbaseTable);
            table.setAutoFlush(false, false);
        }
        Delete d = new Delete(Bytes.toBytes(getDeferredObject(arguments, 2)));
        table.delete(d);
        result.set("success");
    } catch (Exception ex) {
        LOG.error(ex);
        result.set(ex.toString());
        this.close();
    }
    return result;
}

@Override
public String getDisplayString(String[] children) {
    StringBuilder sb = new StringBuilder();
    sb.append("pad_default_hive2HBase(");
    if (children.length > 0) {
        sb.append(children[0]);
        for (int i = 1; i < children.length; i++) {
            sb.append(",");
            sb.append(children[i]);
        }
    }
    sb.append(")");
    return sb.toString();
}

@Override
public void close() {
    try {
        super.close();
        if (table != null) {
            table.flushCommits();
            table.close();
            connection.close();
        }
    } catch (Exception e) {
        System.out.println(Bytes.toString(table.getTableName())
                + "  close  error " + e.getMessage());
    }

}

@Override
public String[] getRequiredFiles() {
    return super.getRequiredFiles();
}

protected String getDeferredObject(DeferredObject[] arguments, int index)
        throws HiveException {
    if (arguments[index].get() == null) {
        return NULL_FLAG;
    }
    return ((PrimitiveObjectInspector) argumentOI[index])
            .getPrimitiveJavaObject(arguments[index].get()).toString();
}

}
(3)查询
package com.paic.pad.dp.hbase.udf;

import static org.apache.hadoop.hbase.util.Bytes.toBytes;

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

/**
* import hive data to hbase table create this function: CREATE TEMPORARY
* FUNCTION upa_default_hive2HBase as
* ‘com.unionpay.upa.hive.udf.UDFDefaultHiveOutputToHbase’;
*/
@Description(name = “pad_put_HBase”, value = “ FUNC(zookeeperQuorum, hbaseTable, CF, rowKey, ‘name1, name2, name3’, c1, c2, c3, …) - read data from hive and import it to hbase, ”
+ “returns success of the import.”, extended = “The first argument is zookeeperQuorum, ”
+ “the second argument is the hbase table, ”
+ “the Third argument is the CF, ”
+ “the fourth argument is the rowKey, ”
+ “the other args should be a map, seprated by ‘,’ .”
+ “example: select FUNC(‘zookeeperQuorum’, ‘tableName’, ‘columFamily’, key, ‘columnName’) from dual;”)
@UDFType(deterministic = false)
public class UDFHbaseSelect extends GenericUDF {
private static final Log LOG = LogFactory.getLog(UDFHbaseMerge.class
.getName());

  protected transient ObjectInspector[] argumentOI;
protected transient String hbaseTable;
protected HTable table;
protected HConnection connection;
protected static String cf = "F";
protected static String[] cols;
protected final static String NULL_FLAG = "";
protected final Text result = new Text();
protected String zookeeperQuorum;

@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
    // TODO Auto-generated method stub
    String cv="";
    try {
        if (table == null) {
            zookeeperQuorum = getDeferredObject(arguments, 0);
            hbaseTable = getDeferredObject(arguments, 1);
            cf = getDeferredObject(arguments, 2);
            cols = getDeferredObject(arguments, 4).split(",");
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
            conf.set("hbase.zookeeper.property.clientPort", "2181");
            conf.set("mapred.task.timeout", "3600000"); // ms
            conf.set("dfs.socket.timeout", "3600000");
            conf.set("dfs.datanode.socket.write.timeout", "3600000");

            connection = HConnectionManager.createConnection(conf);
            table = (HTable) connection.getTable(hbaseTable);
            table.setAutoFlush(false, false);
        }



        try {
            cv=getColumnValue(arguments);
        } catch (IOException e) {
            LOG.error(Bytes.toString(table.getTableName()) + "  put error "
                    + e.getMessage());
        }

        result.set(cv);
    } catch (Exception ex) {
        LOG.error(ex);
        result.set(ex.toString());
        this.close();
    }
    return result;
}

@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
        throws UDFArgumentException {
    // TODO Auto-generated method stub
    argumentOI = arguments;
    for (int i = 0; i < 3; i++) {
        if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
            PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
            if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
                throw new UDFArgumentTypeException(i,
                        "The argument of function  should be \""
                                + serdeConstants.STRING_TYPE_NAME
                                + "\", but \"" + arguments[i].getTypeName()
                                + "\" is found");
            }
        }
    }
    for (int i = 3; i < arguments.length; i++) {

        if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {

            throw new UDFArgumentTypeException(i,
                    "The argument of function should be primative"
                            + ", but \"" + arguments[i].getTypeName()
                            + "\" is found");
        }
    }

    return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}

public String getDisplayString(String[] children) {
    StringBuilder sb = new StringBuilder();
    sb.append("pad_default_hive2HBase(");
    if (children.length > 0) {
        sb.append(children[0]);
        for (int i = 1; i < children.length; i++) {
            sb.append(",");
            sb.append(children[i]);
        }
    }
    sb.append(")");
    return sb.toString();
}

@Override
public void close() {
    try {
        super.close();
        if (table != null) {
            table.flushCommits();
            table.close();
            connection.close();
        }
    } catch (Exception e) {
        LOG.error(Bytes.toString(table.getTableName()) + "  close  error "
                + e.getMessage());
    }

}

@Override
public String[] getRequiredFiles() {
    return super.getRequiredFiles();
}

protected String getDeferredObject(DeferredObject[] arguments, int index)
        throws HiveException {
    if (arguments[index].get() == null) {
        return NULL_FLAG;
    }
    return ((PrimitiveObjectInspector) argumentOI[index])
            .getPrimitiveJavaObject(arguments[index].get()).toString();
}

protected Put getPut(DeferredObject[] arguments) throws Exception {
    String rowKey = getDeferredObject(arguments, 3);
    Put put = new Put(toBytes(rowKey));
    for (int i = 0; i < cols.length; i++) {
        put.add(toBytes(cf), toBytes(cols[i]), toBytes(getDeferredObject(
                arguments, i + 5)));
    }
    return put;
}

protected String getColumnValue(DeferredObject[] arguments)
        throws Exception {
    StringBuffer columnValues = new StringBuffer();
    String rowKey = getDeferredObject(arguments, 3);
    Get get = new Get(toBytes(rowKey));
    org.apache.hadoop.hbase.client.Result dbResult = table.get(get);

    if (dbResult.size() >= 0) {
        columnValues.append(Bytes.toString(dbResult.getValue(Bytes
                .toBytes(getDeferredObject(arguments, 2)), Bytes
                .toBytes(getDeferredObject(arguments, 4)))));
    }

    return columnValues.toString();
}

}
(4)truncate表
package com.paic.pad.dp.hbase.udf;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

@Description(name = “pad_truncate_hbase”, value = “ FUNC(evn, table, split) - truncate HBase table, ”
+ “returns success of the truncate success.”, extended = “The first argument is zookeeper, ”
+ “the second argument is the HBase table name, ”
+ “the Third argument is the HBase split, seprated by ‘,’ . ”
+ “example: select FUNC(‘zookeeperQuorum’,’tableName’, ‘1,2,3,4,5,6,7,8,9,0’) from dual;”)
@UDFType(deterministic = false)
public class UDFHbaseTruncate extends GenericUDF {
private static final Log LOG = LogFactory.getLog(UDFHbaseTruncate.class
.getName());

  protected transient ObjectInspector[] argumentOI;
protected transient String hbaseTable;
protected HConnection connection;
protected static String[] cols;
protected final static String NULL_FLAG = "";
protected final Text result = new Text();
protected String zookeeperQuorum;
protected String split;
protected byte[][] splits = null;

@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
        throws UDFArgumentTypeException {
    argumentOI = arguments;
    for (int i = 0; i < 2; i++) {
        if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
            PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
            if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
                throw new UDFArgumentTypeException(i,
                        "The argument of function  should be \""
                                + serdeConstants.STRING_TYPE_NAME
                                + "\", but \"" + arguments[i].getTypeName()
                                + "\" is found");
            }
        }
    }

    return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}

@Override
public Object evaluate(DeferredObject[] arguments) {
    try {
        zookeeperQuorum = getDeferredObject(arguments, 0);
        hbaseTable = getDeferredObject(arguments, 1);
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
        // conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("mapred.task.timeout", "3600000"); // ms
        conf.set("dfs.socket.timeout", "3600000");
        conf.set("dfs.datanode.socket.write.timeout", "3600000");

        connection = HConnectionManager.createConnection(conf);
        HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
        HTableDescriptor td = hBaseAdmin.getTableDescriptor(Bytes
                .toBytes(hbaseTable));
        hBaseAdmin.disableTable(Bytes.toBytes(hbaseTable));
        hBaseAdmin.deleteTable(Bytes.toBytes(hbaseTable));

        LOG.error("\n\n\n\n\n\nLength:" + arguments.length + "\n\n\n\n");

        if(arguments.length > 2){
            split = getDeferredObject(arguments, 2);
            if (null != split && !split.equals(NULL_FLAG)) {
                String[] strs = split.split(",");
                splits = new byte[strs.length][];

                for (int i = 0; i < splits.length; i++) {
                    splits[i] = Bytes.toBytes(strs[i].trim());
                }
            }
        }

        hBaseAdmin.createTable(td, splits);
        result.set("success");
    } catch (Exception ex) {
        LOG.error(ex);
        result.set(ex.toString());
        this.close();
    }
    return result;
}

@Override
public String getDisplayString(String[] children) {
    StringBuilder sb = new StringBuilder();
    sb.append("pad_truncate_hbase(");
    if (children.length > 0) {
        sb.append(children[0]);
        for (int i = 1; i < children.length; i++) {
            sb.append(",");
            sb.append(children[i]);
        }
    }
    sb.append(")");
    return sb.toString();
}

@Override
public void close() {
    try {
        super.close();
        connection.close();
    } catch (Exception e) {
        LOG.error("  close  error " + e.getMessage());
    }
}

protected String getDeferredObject(DeferredObject[] arguments, int index)
        throws HiveException {
    if (arguments[index].get() == null) {
        return NULL_FLAG;
    }
    return ((PrimitiveObjectInspector) argumentOI[index])
            .getPrimitiveJavaObject(arguments[index].get()).toString();
}

}

作者:zzuiezhangqihui 发表于2015/8/3 17:34:08 原文链接
阅读:3 评论:0 查看评论

相关 [hive udf hbase] 推荐:

hive中udf读写hbase

- - CSDN博客推荐文章
在大数据开发过程中经常会遇到,将hive中处理后的结果写入hbase中,每次都要写java程序会非常浪费时间,我们就想了一个办法 ,用hive的udf来实现. 只需要调用同一个udf,将表名字段名以及每一个字段的值作为udf的参数,就可以实现写hbase了. 这样大大的节省了开发时间,提升了开发效率.

apache hive udf 移动平均

- - 冰火岛
某人需要一个计算移动平均的udf.   青春就应该这样绽放   游戏测试:三国时期谁是你最好的兄弟.

从hbase(hive)将数据导出到mysql

- - CSDN博客云计算推荐文章
在上一篇文章《 用sqoop进行mysql和hdfs系统间的数据互导》中,提到sqoop可以让RDBMS和HDFS之间互导数据,并且也支持从mysql中导入到HBase,但从HBase直接导入mysql则不是直接支持,而是间接支持. 要么将HBase导出到HDFS平面文件,要么将其导出到Hive中,再导出到mysql.

Hive部署(包括集成Hbase和Sqoop)

- - ITeye博客
Hive部署(包括集成Hbase和Sqoop) .     主要是选择软件版本. 将解压后的hive-0.8.1文件放在系统的/home/hadoop/hive/中. 4.1 设置HADOOP_HOME. 修改hive-0.8.1目录下/conf/hive-env.sh.template中的HADOOP_HOME为实际的Hadoop安装目录.

Hive集成HBase详解 - MOBIN - 博客园

- -
Hive提供了与HBase的集成,使得能够在HBase表上使用HQL语句进行查询 插入操作以及进行Join和Union等复杂查询. 将ETL操作的数据存入HBase. HBase作为Hive的数据源. 从Hive中创建HBase表. 使用HQL语句创建一个指向HBase的Hive表. 通过HBase shell可以查看刚刚创建的HBase表的属性.

实时分析系统(HIVE/HBASE/IMPALA)浅析

- - 数据库 - ITeye博客
1. 什么是实时分析(在线查询)系统. 大数据领域里面,实时分析(在线查询)系统是最常见的一种场景,通常用于客户投诉处理,实时数据分析,在线查询等等过. 因为是查询应用,通常有以下特点:. b. 查询条件复杂(多个维度,维度不固定),有简单(带有ID). c. 查询范围大(通常查询表记录在几十亿级别).

hive中与hbase外部表join时内存溢出(hive处理mapjoin的优化器机制)

- - CSDN博客云计算推荐文章
与hbase外部表(wizad_mdm_main)进行join出现问题:. 最后在进行到0.83时,内存溢出失败. 默认情况下,Hive会自动将小表加到DistributeCache中,然后在Map扫描大表的时候,去和DistributeCache中的小表做join,这称为Mapjoin. 这里wizad_mdm_main是基于HBase的外部表,而这张表在HDFS上的源路径为 /hivedata/warehouse/wizad.db/wizad_mdm_main,实际这个目录为空,.

hive中创建关联hbase表的几种方案_大数据_Tony_仔仔 的博客-CSDN博客

- -
有时候我们需要把已存在Hbase中的用户画像数据导到hive里面查询,也就是通过hive就能查到hbase里的数据. 但是我又不想使用sqoop或者DataX等工具倒来倒去. 这时候可以在hive中创建关联表的方式来查询hbase中的数据. 前提是:hbase中已经存在了一张表. 可选的方案:既可以在hive中关联此表的所有列簇,也可以仅关联一个列簇,也可以关联单一列蔟下的单一列,还可以关联单一列簇下的多个列.

hive调优

- - 互联网 - ITeye博客
一、    控制hive任务中的map数: . 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);.

hive 优化 tips

- - CSDN博客推荐文章
一、     Hive join优化. 也可以显示声明进行map join:特别适用于小表join大表的时候,SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key. 2.     注意带表分区的join, 如:.