apache hive udf 移动平均

标签: 云计算 | 发表时间:2013-03-06 16:21 | 作者:bicloud
出处:http://blog.sina.com.cn/bicloud
某人需要一个计算移动平均的udf
http://www.wangxiao.cn/UserT/GetHtml.aspx?id=/1093/1093-1-4-1-2-2008-83196083.html
移动平均
hive> add jar ./MovingAvgUDF.jar;
Added ./MovingAvgUDF.jar to class path
Added resource: ./MovingAvgUDF.jar
hive>  create temporary function MovingAvgUDF as 'com.bi.udf.MovingAvgUDF';
OK
Time taken: 0.641 seconds

hive> select col1 from a;
Automatically selecting local only mode for query
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Execution log at: /tmp/xxx/hive.log
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 0; number of reducers: 0
2013-03-06 16:07:25,053 null map = 100%,  reduce = 0%
Ended Job = job_local_0001
2013-03-06 04:07:25     End of local task; Time Taken: 3.063 sec.
OK
["12","23","23","21","45","65","23"]
Time taken: 4.574 seconds
hive> select MovingAvgUDF(col1, 5) from a;
Automatically selecting local only mode for query
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Execution log at: /tmp/xxx/hive.log
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 0; number of reducers: 0
2013-03-06 16:06:48,340 null map = 0%,  reduce = 0%
2013-03-06 16:06:49,342 null map = 100%,  reduce = 0%
Ended Job = job_local_0001
2013-03-06 04:06:49     End of local task; Time Taken: 3.99 sec.
OK
[12.0,17.5,19.333333333333332,19.75,24.8,35.4,35.4]
Time taken: 6.623 seconds
hive> select MovingAvgUDF(col1, 2) from a;
Automatically selecting local only mode for query
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Execution log at: /tmp/xxx/hive.log
Job running in-process (local Hadoop)
Hadoop job information for null: number of mappers: 0; number of reducers: 0
2013-03-06 16:33:43,930 null map = 100%,  reduce = 0%
Ended Job = job_local_0001
2013-03-06 04:33:44     End of local task; Time Taken: 2.848 sec.
OK
[12.0,17.5,23.0,22.0,33.0,55.0,44.0]
Time taken: 5.001 seconds

package com.bi.udf;

import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

@Description(
name = "moving_avg",
value = " return the moving average of a time series for a given timewindow"
)
public class MovingAvgUDF extends GenericUDF {

private ListObjectInspector listInspector;
private IntObjectInspector dayWindowInspector;

private List parseDoubleList(ListobjList) {
List arrList = new ArrayList();
for (Object obj : objList) {

Object dblObj = ((PrimitiveObjectInspector) (listInspector.getListElementObjectInspector())).getPrimitiveJavaObject(obj);
if (dblObj instanceof Number) {
Number dblNum = (Number) dblObj;
arrList.add(dblNum.doubleValue());
} else {
// // Try to coerce it otherwise
String dblStr = (dblObj.toString());
try {
Double dblCoerce = Double.parseDouble(dblStr);
arrList.add(dblCoerce);
} catch (NumberFormatException formatExc) {
formatExc.printStackTrace();
}
}

}
return arrList;
}

public List evaluate(ListtimeSeriesObj, int dayWindow) {

List timeSeries = this.parseDoubleList(timeSeriesObj);

List mvnAvgTimeSeries = new ArrayList(timeSeries.size());
double mvnTotal = 0.0;

for (int i = 0; i < timeSeries.size(); ++i) {
mvnTotal += timeSeries.get(i); 
if (i >= dayWindow) {
mvnTotal -= timeSeries.get(i - dayWindow);
double mvnAvg = mvnTotal / ((double) dayWindow);
mvnAvgTimeSeries.add(mvnAvg);
} else {
if (i > 0) { 
double mvnAvg = mvnTotal / ((double) i + 1.0);
mvnAvgTimeSeries.add(mvnAvg);
} else {
mvnAvgTimeSeries.add(mvnTotal); // /
}
}
}
return mvnAvgTimeSeries;
}

@Override
public Object evaluate(DeferredObject[] arg0) throws HiveException {
List argList = listInspector.getList(arg0[0].get());
int dayWindow = dayWindowInspector.get(arg0[1].get());
if (argList != null)
return evaluate(argList, dayWindow);
else
return null;
}

@Override
public String getDisplayString(String[] arg0) {
return "moving_avg(" + arg0[0] + ", " + arg0[1] + ")";
}

@Override
public ObjectInspector initialize(ObjectInspector[] arg0)
throws UDFArgumentException {
if (arg0.length != 2) {
throw new UDFArgumentLengthException("The function MovingAvgUDF(List, length) needs two arguments.");
}
this.listInspector = (ListObjectInspector) arg0[0];
this.dayWindowInspector = (IntObjectInspector) arg0[1];
return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaDoubleObjectInspector);
}
}


  青春就应该这样绽放   游戏测试:三国时期谁是你最好的兄弟!!   你不得不信的星座秘密

相关 [apache hive udf] 推荐:

apache hive udf 移动平均

- - 冰火岛
某人需要一个计算移动平均的udf.   青春就应该这样绽放   游戏测试:三国时期谁是你最好的兄弟.

hive中udf读写hbase

- - CSDN博客推荐文章
在大数据开发过程中经常会遇到,将hive中处理后的结果写入hbase中,每次都要写java程序会非常浪费时间,我们就想了一个办法 ,用hive的udf来实现. 只需要调用同一个udf,将表名字段名以及每一个字段的值作为udf的参数,就可以实现写hbase了. 这样大大的节省了开发时间,提升了开发效率.

hive调优

- - 互联网 - ITeye博客
一、    控制hive任务中的map数: . 1.    通常情况下,作业会通过input的目录产生一个或者多个map任务. 主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);.

hive 优化 tips

- - CSDN博客推荐文章
一、     Hive join优化. 也可以显示声明进行map join:特别适用于小表join大表的时候,SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key. 2.     注意带表分区的join, 如:.

Hive中的join

- - CSDN博客云计算推荐文章
select a.* from a join b on a.id = b.id select a.* from a join b on (a.id = b.id and a.department = b.department). 在使用join写查询的时候有一个原则:应该将条目少的表或者子查询放在join操作符的左边.

hive优化(2)

- - 开源软件 - ITeye博客
Hive是将符合SQL语法的字符串解析生成可以在Hadoop上执行的MapReduce的工具. 使用Hive尽量按照分布式计算的一些特点来设计sql,和传统关系型数据库有区别,. 所以需要去掉原有关系型数据库下开发的一些固有思维. 1:尽量尽早地过滤数据,减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段.

hive优化

- - 开源软件 - ITeye博客
hive.optimize.cp=true:列裁剪. hive.optimize.prunner:分区裁剪. hive.limit.optimize.enable=true:优化LIMIT n语句. hive.limit.optimize.limit.file=10:最大文件数.   1.job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB).

Hive优化

- - 互联网 - ITeye博客
     使用Hive有一段时间了,目前发现需要进行优化的较多出现在出现join、distinct的情况下,而且一般都是reduce过程较慢.      Reduce过程比较慢的现象又可以分为两类:. 情形一:map已经达到100%,而reduce阶段一直是99%,属于数据倾斜. 情形二:使用了count(distinct)或者group by的操作,现象是reduce有进度但是进度缓慢,31%-32%-34%...一个附带的提示是使用reduce个数很可能是1.

hive bucket 桶

- - CSDN博客推荐文章
对于每一个表(table)或者分区,Hive可以进一步组织成桶. Hive也是针对某一列进行桶的组织. Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中. 采用桶能够带来一些好处,比如JOIN操作. 对于JOIN操作两个表有一个相同的列,如果对这两个表都进行了桶操作. 那么将保存相同列值的桶进行JOIN操作就可以,可以大大较少JOIN的数据量.

hive mapjoin使用

- - 淘剑笑的博客
今天遇到一个hive的问题,如下hive sql:. 该语句中B表有30亿行记录,A表只有100行记录,而且B表中数据倾斜特别严重,有一个key上有15亿行记录,在运行过程中特别的慢,而且在reduece的过程中遇有内存不够而报错. 为了解决用户的这个问题,考虑使用mapjoin,mapjoin的原理:.