Hive-UDAF开发指南

标签: hive udaf 开发 | 发表时间:2015-05-21 12:52 | 作者:lixuguang
出处:http://www.iteye.com

refer to: http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888051.html

 

在用Hive进行ETL的时候,对于一些复杂的数据处理逻辑,往往不能用简单的HQL来解决,这个时候就需要使用UDAF了。

 

对于底层的内容还没有细看,先从应用的角度来说一下吧。

使用UDAF需要实现接口GenericUDAFResolver2,或者继承抽象类AbstractGenericUDAFResolver。

 

UDAF主要分为2个部分,第一个部分是对传入参数进行校验,数据类型的校验。然后根据传入的数据类型不同调用具体的处理逻辑。

比如说,自己写了一个SUM,SUM对于Long类型和Double类型进行求和,没有问题。

但是,如果传入的参数是一个Array呢?这个时候,就需要在Evaluator方法里面,对参数进行校验了。

Java代码   收藏代码
  1. public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)  
  2.       throws SemanticException {  
  3.     if (parameters.length != 1) {  
  4.       throw new UDFArgumentTypeException(parameters.length - 1,  
  5.           "Exactly one argument is expected.");  
  6.     }  
  7.   
  8.     if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {  
  9.       throw new UDFArgumentTypeException(0,  
  10.           "Only primitive type arguments are accepted but "  
  11.           + parameters[0].getTypeName() + " is passed.");  
  12.     }  
  13.     switch (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory()) {  
  14.     case BYTE:  
  15.     case SHORT:  
  16.     case INT:  
  17.     case LONG:  
  18.     case FLOAT:  
  19.     case DOUBLE:  
  20.     case STRING:  
  21.     case TIMESTAMP:  
  22.       return new GenericUDAFAverageEvaluator();  
  23.     case BOOLEAN:  
  24.     default:  
  25.       throw new UDFArgumentTypeException(0,  
  26.           "Only numeric or string type arguments are accepted but "  
  27.           + parameters[0].getTypeName() + " is passed.");  
  28.     }  
  29.   }  

 

这个方法只支持Primitive类型,也就是INT,String,Double,Float这些。

 

UDAF使用一个ObjectInspector来抽象化每一行数据的读取。

上面使用的Primitive类型的数据,所以使用PrimitiveObjectInspector来读取传入的参数。

 

UDAF会根据不同的计算模型,产生不同的阶段。

如:SUM()聚合函数,接受一个原始类型的整型数值,然后创建一个整型的PARTIAL数据,
返回一个固定的整型结果。

如:median() 中位数
可以接受原始整型输入,然后会产生一个中间的整数PARTIAL数据(排序),
然后再返回一个固定的整型结果。

 

注意:

Java代码   收藏代码
  1. 聚合操作会在reduce的环境下执行,然后由一个Java进程的内存大小限制这个操作。  
  2. 因此像排序大结构体的数据,可能会产生对内存不足的异常。  
  3. 一般情况下可以增加内存来解决这个问题。  
  4. <property>  
  5. <name>mapred.child.java.opts</name>  
  6. <value>-Xmx200m</value>  
  7. </property>  

  

在处理逻辑之前,介绍一下UDAF的Mode。

UDAF的Mode,也就是执行阶段。无论怎样的UDAF,最终都会变成MapReduce Job。

Mode是一UDAF的使用类型,主要有4种形势:

因为MapReduce可能是,Map->Reduce也可能是,Map->Reduce->Reduce

Java代码   收藏代码
  1. public static enum Mode {  
  2.     /** 
  3.      * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合 
  4.      * 将会调用iterate()和terminatePartial() 
  5.      */  
  6.     PARTIAL1,  
  7.         /** 
  8.      * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合: 
  9.      * 将会调用merge() 和 terminatePartial()  
  10.      */  
  11.     PARTIAL2,  
  12.         /** 
  13.      * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合  
  14.      * 将会调用merge()和terminate() 
  15.      */  
  16.     FINAL,  
  17.         /** 
  18.      * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合 
  19.       * 将会调用 iterate()和terminate() 
  20.      */  
  21.     COMPLETE  
  22. };  

 

 

有的UDAF函数会可以像UDF函数那样使用,有的必须在聚合函数环境下使用,如group by,over(partition by )

而在使用UDAF进行计算的时候,会启用一个init方法。这个init的方法会在买个阶段前面都启动一次。第一次启动的时候,参数指的是读入每一行记录的参数。第二次启动的时候,传入的参数只有1个,指的是中间结果的参数。这里需要特别注意。

Java代码   收藏代码
  1. @Override  
  2.         public ObjectInspector init(Mode m, ObjectInspector[] parameters)  
  3.                 throws HiveException {  
  4.             super.init(m, parameters);  
  5.               
  6.             //init input  
  7.             if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有  
  8.                 LOG.info(" Mode:"+m.toString()+" result has init");  
  9.                 inputOI = (PrimitiveObjectInspector) parameters[0];  
  10.                 inputOI2 = (PrimitiveObjectInspector) parameters[1];  
  11. //              result = new DoubleWritable(0);  
  12.             }  
  13.             //init output  
  14.             if (m == Mode.PARTIAL2 || m == Mode.FINAL) {  
  15.                 outputOI = (PrimitiveObjectInspector) parameters[0];  
  16.                 result = new DoubleWritable(0);  
  17.                 return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;  
  18.             }else{  
  19.                 result = new DoubleWritable(0);  
  20.                 return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;  
  21.             }   
  22.               
  23.         }  

 所以我们使用枚举方法,根据init启动阶段的不同,接入不同的参数。

 

实现UDAF的时候,实际就是一个Reducer

 

对于计算过程中的中间结果,会有一个Buffer对象来进行缓冲。

Buffer对象相当于Reducer里面记录结果集的一个内存对象。

 

这里面可以大大的发挥想象,作出你想要的各种数据类型。

另外,在UDAF输出的时候,也可以输出Struct,Array类型的数据。

这一部分等到用到再进行研究吧。

 

最后是完整的UDAF代码。实现一个有条件的SUM,传入2个参数,当第二个参数>1 的时候进行SUM。

 

Java代码   收藏代码
  1. package com.test.udaf;  
  2.   
  3. import org.apache.commons.logging.Log;  
  4. import org.apache.commons.logging.LogFactory;  
  5. import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;  
  6. import org.apache.hadoop.hive.ql.metadata.HiveException;  
  7. import org.apache.hadoop.hive.ql.parse.SemanticException;  
  8. import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;  
  9. import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;  
  10. import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2;  
  11. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;  
  12. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;  
  13. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;  
  14. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;  
  15. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;  
  16. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;  
  17. import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;  
  18. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;  
  19. import org.apache.hadoop.hive.serde2.io.DoubleWritable;  
  20. import org.apache.hadoop.util.StringUtils;  
  21.   
  22. public class GenericUdafMemberLevel2 extends AbstractGenericUDAFResolver {  
  23.     private static final Log LOG = LogFactory  
  24.             .getLog(GenericUdafMemberLevel2.class.getName());  
  25.       
  26.     @Override  
  27.       public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)  
  28.         throws SemanticException {  
  29.           
  30.         return new GenericUdafMeberLevelEvaluator();  
  31.       }  
  32.       
  33.     public static class GenericUdafMeberLevelEvaluator extends GenericUDAFEvaluator {  
  34.         private PrimitiveObjectInspector inputOI;  
  35.         private PrimitiveObjectInspector inputOI2;  
  36.         private PrimitiveObjectInspector outputOI;  
  37.         private DoubleWritable result;  
  38.   
  39.         @Override  
  40.         public ObjectInspector init(Mode m, ObjectInspector[] parameters)  
  41.                 throws HiveException {  
  42.             super.init(m, parameters);  
  43.               
  44.             //init input  
  45.             if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ //必须得有  
  46.                 LOG.info(" Mode:"+m.toString()+" result has init");  
  47.                 inputOI = (PrimitiveObjectInspector) parameters[0];  
  48.                 inputOI2 = (PrimitiveObjectInspector) parameters[1];  
  49. //              result = new DoubleWritable(0);  
  50.             }  
  51.             //init output  
  52.             if (m == Mode.PARTIAL2 || m == Mode.FINAL) {  
  53.                 outputOI = (PrimitiveObjectInspector) parameters[0];  
  54.                 result = new DoubleWritable(0);  
  55.                 return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;  
  56.             }else{  
  57.                 result = new DoubleWritable(0);  
  58.                 return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;  
  59.             }   
  60.               
  61.         }  
  62.   
  63.         /** class for storing count value. */  
  64.         static class SumAgg implements AggregationBuffer {  
  65.             boolean empty;  
  66.             double value;  
  67.         }  
  68.   
  69.         @Override  
  70.         //创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。  
  71.         //使用buffer对象前,先进行内存的清空——reset  
  72.         public AggregationBuffer getNewAggregationBuffer() throws HiveException {  
  73.             SumAgg buffer = new SumAgg();  
  74.             reset(buffer);  
  75.             return buffer;  
  76.         }  
  77.   
  78.         @Override  
  79.         //重置为0  
  80.         //mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。  
  81.         public void reset(AggregationBuffer agg) throws HiveException {  
  82.             ((SumAgg) agg).value = 0.0;  
  83.             ((SumAgg) agg).empty = true;  
  84.         }  
  85.   
  86.         private boolean warned = false;  
  87.         //迭代  
  88.         //只要把保存当前和的对象agg,再加上输入的参数,就可以了。  
  89.         @Override  
  90.         public void iterate(AggregationBuffer agg, Object[] parameters)  
  91.                 throws HiveException {  
  92.             // parameters == null means the input table/split is empty  
  93.             if (parameters == null) {  
  94.                 return;  
  95.             }  
  96.             try {  
  97.                 double flag = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI2);  
  98.                 if(flag > 1.0)   //参数条件  
  99.                     merge(agg, parameters[0]);   //这里将迭代数据放入combiner进行合并  
  100.               } catch (NumberFormatException e) {  
  101.                 if (!warned) {  
  102.                   warned = true;  
  103.                   LOG.warn(getClass().getSimpleName() + " "  
  104.                       + StringUtils.stringifyException(e));  
  105.                 }  
  106.               }  
  107.   
  108.         }  
  109.   
  110.         @Override  
  111.         //这里的操作就是具体的聚合操作。  
  112.         public void merge(AggregationBuffer agg, Object partial) {  
  113.             if (partial != null) {  
  114.                 // 通过ObejctInspector取每一个字段的数据  
  115.                 if (inputOI != null) {  
  116.                     double p = PrimitiveObjectInspectorUtils.getDouble(partial,  
  117.                             inputOI);  
  118.                     LOG.info("add up 1:" + p);  
  119.                     ((SumAgg) agg).value += p;  
  120.                 } else {  
  121.                     double p = PrimitiveObjectInspectorUtils.getDouble(partial,  
  122.                             outputOI);  
  123.                     LOG.info("add up 2:" + p);  
  124.                     ((SumAgg) agg).value += p;  
  125.                 }  
  126.             }  
  127.         }  
  128.   
  129.   
  130.         @Override  
  131.         public Object terminatePartial(AggregationBuffer agg) {  
  132.                 return terminate(agg);  
  133.         }  
  134.           
  135.         @Override  
  136.         public Object terminate(AggregationBuffer agg){  
  137.             SumAgg myagg = (SumAgg) agg;  
  138.             result.set(myagg.value);  
  139.             return result;  
  140.         }  
  141.     }  
  142. }  

 

 在使用Hive的UDAF,需要使用ADD JAR语句,将UDAF方程上传到Hadoop Distributed Cache,让每一个DataNode都能共享到这个jar包。

然后才进行调用

Shell代码   收藏代码
  1. hive> add jar /home/daxingyu930/test_sum.jar;  
  2. hive> drop temporary function sum_test;  
  3. hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel';  
  4.   
  5. hive> create temporary function sum_test as 'com.test.udaf.GenericUdafMemberLevel2';  
  6.   
  7. hive> select sum_test(height,2.0) from student_height;  

 

附录:关于UDAF流程介绍 

init  当实例化UDAF evaluator的时候执行。
getNewAggregationBuffer  返回一个对象用来保存临时的聚合结果集。
iterate  将一条新的数据处理放到聚合内存块中(aggregation buffer)
terminateParital  返回现有的聚合好的一个持久化的路径,相当于数据对象。这些数据可以通过Hive的数据类型可来访问,这个数据对象可以被Java理解,如Integer,String,或者是Array,Map这种。
相当于第二次MapReduce的map阶段。
merge   将partital数据(分区汇总的数据),于terminateParital数据融合在一起
terminate 返回一个最终的数据聚合结果,是一个结果,或者是一个结果集。

在init阶段,hive会自动检测最终生成的object inspector。
并获取使用聚合函数所处的mode。

iterate和 terminalPartial 都是在map阶段
而terminate和merge 都是在reduce阶段。

merge则用来聚合结果集

注意,无论使用UDF和UDAF,尽可能少地使用new关键字,可以使用静态类。
这样可以减少JVM的GC操作,提高效率。



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [hive udaf 开发] 推荐:

Hive-UDAF开发指南

- - 开源软件 - ITeye博客
在用Hive进行ETL的时候,对于一些复杂的数据处理逻辑,往往不能用简单的HQL来解决,这个时候就需要使用UDAF了. 对于底层的内容还没有细看,先从应用的角度来说一下吧. 使用UDAF需要实现接口GenericUDAFResolver2,或者继承抽象类AbstractGenericUDAFResolver.

hive调优

- - 互联网 - ITeye博客
一、    控制hive任务中的map数: . 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);.

hive 优化 tips

- - CSDN博客推荐文章
一、     Hive join优化. 也可以显示声明进行map join:特别适用于小表join大表的时候,SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key. 2.     注意带表分区的join, 如:.

Hive中的join

- - CSDN博客云计算推荐文章
select a.* from a join b on a.id = b.id select a.* from a join b on (a.id = b.id and a.department = b.department). 在使用join写查询的时候有一个原则:应该将条目少的表或者子查询放在join操作符的左边.

hive优化(2)

- - 开源软件 - ITeye博客
Hive是将符合SQL语法的字符串解析生成可以在Hadoop上执行的MapReduce的工具. 使用Hive尽量按照分布式计算的一些特点来设计sql,和传统关系型数据库有区别,. 所以需要去掉原有关系型数据库下开发的一些固有思维. 1:尽量尽早地过滤数据,减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段.

hive优化

- - 开源软件 - ITeye博客
hive.optimize.cp=true:列裁剪. hive.optimize.prunner:分区裁剪. hive.limit.optimize.enable=true:优化LIMIT n语句. hive.limit.optimize.limit.file=10:最大文件数.   1.job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB).

Hive优化

- - 互联网 - ITeye博客
     使用Hive有一段时间了,目前发现需要进行优化的较多出现在出现join、distinct的情况下,而且一般都是reduce过程较慢.      Reduce过程比较慢的现象又可以分为两类:. 情形一:map已经达到100%,而reduce阶段一直是99%,属于数据倾斜. 情形二:使用了count(distinct)或者group by的操作,现象是reduce有进度但是进度缓慢,31%-32%-34%...一个附带的提示是使用reduce个数很可能是1.

hive bucket 桶

- - CSDN博客推荐文章
对于每一个表(table)或者分区,Hive可以进一步组织成桶. Hive也是针对某一列进行桶的组织. Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中. 采用桶能够带来一些好处,比如JOIN操作. 对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作. 那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量.

hive mapjoin使用

- - 淘剑笑的博客
今天遇到一个hive的问题,如下hive sql:. 该语句中B表有30亿行记录,A表只有100行记录,而且B表中数据倾斜特别严重,有一个key上有15亿行记录,在运行过程中特别的慢,而且在reduece的过程中遇有内存不够而报错. 为了解决用户的这个问题,考虑使用mapjoin,mapjoin的原理:.

hive优化

- - 互联网 - ITeye博客
1:尽量尽早地过滤数据,减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段. 2:尽量原子化操作,尽量避免一个SQL包含复杂逻辑. 可以使用中间表来完成复杂的逻辑. 3:单个SQL所起的JOB个数尽量控制在5个以下. 4:慎重使用mapjoin,一般行数小于2000行,大小小于1M(扩容后可以适当放大)的表才能使用,小表要注意放在join的左边(目前TCL里面很多都小表放在join的右边).