HBASE 协处理器入门(转载)

标签: hbase 协处理器 | 发表时间:2014-12-20 11:13 | 作者:zhangxiong0301
出处:http://www.iteye.com

如果要统对hbase中的数据,进行某种统计,比如统计某个字段最大值,统计满足某种条件的记录数,统计各种记录特点,并按照记录特点分类(类似于sql的group by)~

常规的做法就是把hbase中整个表的数据scan出来,或者稍微环保一点,加一个filter,进行一些初步的过滤(对于rowcounter来说,就加了FirstKeyOnlyFilter),但是这么做来说还是会有很大的副作用,比如占用大量的网络带宽(当标级别到达千万级别,亿级别之后)尤为明显,RPC的量也是不容小觑的。

理想的方式应该是怎样?

拿row counter这个简单例子来说,我要统计总行数,如果每个region 告诉我他又多少行,然后把结果告诉我,我再将他们的结果汇总一下,不就行了么?
现在的问题是hbase没有提供这种接口,来统计每个region的行数,那是否我们可以自己来实现一个呢?
没错,正如本文标题所说,我们可以自己来实现一个Endpoint,然后让hbase加载起来,然后我们远程调用即可。

什么是Endpoint?

先弄清楚什么是hbase coprocessor

hbase有两种coprocessor,一种是Observer(观察者),类似于关系数据库的trigger(触发器),另外一种就是EndPoint,类似于关系数据库的存储过程。

观察者这里就多做介绍了,这里介绍Endpoint。

EndPoint是动态RPC插件的接口,它的实现代码被部署在服务器端(regionServer),从而能够通过HBase RPC调用。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个EndPoint,它们的实现代码会被目标region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。

怎么实现一个EndPoint

1. 定义一个新的protocol接口,必须继承CoprocessorProtocol.
2. 实现终端接口,继承抽象类BaseEndpointCoprocessor,改实现代码需要部署到
3. 在客户端,终端可以被两个新的HBase Client API调用 。单个region:HTableInterface.coprocessorProxy(Class<T> protocol, byte[] row) 。rigons区域:HTableInterface.coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable),这里的region是通过一个row来标示的,就是说,改row落到那个region,RPC就发给哪个region,对于start-end的,[start,end)范围内的region都会受到RPC调用。

如图 71e2816c-c109-475a-9d64-bc6b74e61443

1
2
3
public interface CounterProtocol extends CoprocessorProtocol {
     public long count( byte [] start, byte [] end) throws IOException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CounterEndPoint extends BaseEndpointCoprocessor implements CounterProtocol {
 
     @Override
     public long count( byte [] start, byte []end) throws IOException {
         // aggregate at each region
         Scan scan = new Scan();
         long numRow = 0 ;
 
         InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
                 .getScanner(scan);
         try {
             List<KeyValue> curVals = new ArrayList<KeyValue>();
             boolean hasMore = false ;
             do {
                 curVals.clear();
                 hasMore = scanner.next(curVals);
                 if (Bytes.compareTo(curVals.get( 0 ).getRow(), start)< 0 ) {
                     continue ;
                 }
                 if (Bytes.compareTo(curVals.get( 0 ).getRow(), end)>= 0 ) {
                     break ;
                 }
                 numRow++;
             } while (hasMore);
         } finally {
             scanner.close();
         }
         return numRow;
     }
 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CounterEndPointDemo {
     public static void main(String[] args) throws IOException, Throwable {
         final String startRow = args[ 0 ];
         final String endRow = args[ 1 ];
 
         @SuppressWarnings ( "resource" )
         HTableInterface table = new HTable(HBaseConfiguration.create(), "tc" );
         Map< byte [], Long> results;
 
         // scan: for all regions
         results = table.coprocessorExec(CounterProtocol. class , startRow.getBytes(),
                 endRow.getBytes(), new Batch.Call<CounterProtocol, Long>() {
                     public Long call(CounterProtocol instance) throws IOException {
                         return instance.count(startRow.getBytes(), endRow.getBytes());
                     }
                 });
 
         long total = 0 ;
         for (Map.Entry< byte [], Long> e : results.entrySet()) {
             System.out.println(e.getValue());
             total += e.getValue();
         }
 
         System.out.println( "total:" + total);
     }
}

整个程序的框架其实又是另外一个mapreduce,只是运行在region server上面,reduce运行在客户端,其中map计算量较大,reduce计算量很小!

另外需要提醒的是:
protocol的返回类型,可以是基本类型。
如果是一个自定义的类型需要实现org.apache.hadoop.io.Writable接口。
关于详细的支持类型,请参考代码hbase源码:org.apache.hadoop.hbase.io.HbaseObjectWritable

怎么部署?

1. 通过hbase-site.xml增加

1
2
3
4
< property >
   < name >hbase.coprocessor.region.classes</ name >
   < value >xxxx.CounterEndPoint </ value >
</ property >
  1. 如果要配置多个,就用逗号(,)分割。
  2. 包含此类的jar必须位于hbase的classpath
  3. 这种coprocessor是作用于所有的表,如果你只想作用于部分表,请使用下面一种方式。

2. 通过shell方式
增加:

1
2
3
4
5
6
hbase(main):005:0> alter 't1' , METHOD => 'table_att' ,
'coprocessor' => ' hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2'
Updating all regions with the new schema...
1 /1 regions updated.
Done.
0 row(s) in 1.0730 seconds

coprocessor格式为:
[FilePath]|ClassName|Priority|arguments
arguments: k=v[,k=v]+

  1. 其中FilePath是hdfs路径,例如/tmp/zhenhe/cp/zhenhe-1.0.jar
  2. ClassNameEndPoint实现类的全名
  3. Priority为,整数,框架会根据这个数据决定多个cp的执行顺序
  4. Arguments,传给cp的参数
  5. 如果hbase的classpath包含改类,FilePath可以留空

卸载:

  1. 先describe “tableName‘,查看你要卸载的cp的编号
  2. 然后alter 't1', METHOD => 'table_att_unset', NAME=> 'coprocessor$3',coprocessor$3可变。

应用场景

这是一个最简单的例子,另外还有很多统计场景,可以用在这种方式实现,有如下好处:

  1. 节省网络带宽
  2. 减少RPC调用(scan的调用随着CacheSzie的变小而线性增加),减轻hbase压力
  3. 可以提高统计效率,那我之前写过的一个groupby类型的例子来说,大约可以提高50%以上的统计速度。

其他应用场景?

  1. 一个保存着用户信息的表,可以统计每个用户信息(counter job)
  2. 统计最大值,最小值,平均值,参考:https://issues.apache.org/jira/browse/HBASE-1512
  3. 批量删除记录,批量删除某个时间戳的记录

参考:

1. http://blogs.apache.org/hbase/entry/coprocessor_introduction
2. https://issues.apache.org/jira/browse/HBASE-1512



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [hbase 协处理器] 推荐:

HBASE 协处理器入门(转载)

- - 数据库 - ITeye博客
如果要统对hbase中的数据,进行某种统计,比如统计某个字段最大值,统计满足某种条件的记录数,统计各种记录特点,并按照记录特点分类(类似于sql的group by)~. 常规的做法就是把hbase中整个表的数据scan出来,或者稍微环保一点,加一个filter,进行一些初步的过滤(对于rowcounter来说,就加了FirstKeyOnlyFilter),但是这么做来说还是会有很大的副作用,比如占用大量的网络带宽(当标级别到达千万级别,亿级别之后)尤为明显,RPC的量也是不容小觑的.

hbase介绍

- AreYouOK? - 淘宝数据平台与产品部官方博客 tbdata.org
hbase是bigtable的开源山寨版本. 是建立的hdfs之上,提供高可靠性、高性能、列存储、可伸缩、实时读写的数据库系统. 它介于nosql和RDBMS之间,仅能通过主键(row key)和主键的range来检索数据,仅支持单行事务(可通过hive支持来实现多表join等复杂操作). 主要用来存储非结构化和半结构化的松散数据.

Riak对比HBase

- - NoSQLFan
文章来自 Riak官方wiki,是一篇Riak与HBase的对比文章. Riak官方的对比通常都做得很中肯,并不刻意偏向自家产品. 对比的Riak版本是1.1.x,HBase是0.94.x. Riak 与 HBase 都是基于 Apache 2.0 licensed 发布. Riak 的实现是基于 Amazon 的 Dynamo 论文,HBase 是基于 Google 的 BigTable.

[转]HBase简介

- - 小鸥的博客
   Hbase是一个分布式开源数据库,基于Hadoop分布式文件系统,模仿并提供了基于Google文件系统的Bigtable数据库的所有功能. 其目标是处理非常庞大的表,可以用普通的计算机处理超过10亿行数据,并且有数百万列元素组成的数据表. Hbase可以直接使用本地文件系统或者Hadoop作为数据存储方式,不过为了提高数据可靠性和系统的健壮性,发挥Hbase处理大数据量等功能,需要使用Hadoop作为文件系统.

HBase表设计

- - 互联网 - ITeye博客
默认情况下,在创建HBase表的时候会自动创建一个region分区,当导入数据的时候,所有的HBase客户端都向这一个region写数据, 直到这 个region足够大了才进行切分. 一种可以加快批量写入速度的方法是通过预先创建一些空的regions,这样当数据写入HBase时,会按 照 region分区情况,在集群内做数据的负载均衡.

HBase Memstore配置

- - 行业应用 - ITeye博客
HBase Memstore配置. 本文为翻译,原英文地址:http://blog.sematext.com/2012/07/16/hbase-memstore-what-you-should-know/.     当regionserver(以下简称RS)收到一个写请求,会将这个请求定位到某个特定的region.

hbase原理

- - CSDN博客云计算推荐文章
1.hbase利用hdfs作为其文件存储系统,利用mapreduce来处理数据,利用zookeeper作为协调工具. 2.行键(row key),类似于主键,但row key是表自带的. 3.列族(column family) ,列(也称作标签/修饰符)的集合,定义表的时候指定的,列是在插入记录的时候动态增加的.

hbase锁机制

- - 数据库 - ITeye博客
博文说明:1、研究版本hbase0.94.12;2、贴出的源代码可能会有删减,只保留关键的代码.   hbase的锁是采用jdk的ReentrantReadWriteLock类实现.   一、HRegion有两种锁:lock、updatesLock,这两种锁均是ReentrantReadWriteLock类的实例,基本上所有的region操作均需要获取lock的read共享锁,在获取了lock的read锁后,如果是增加或者删除等影响数据内容的操作则还需要获取updatesLock的read锁.

Hbase入门

- - CSDN博客云计算推荐文章
Hbase 全称是Hadoop DataBase ,是一种开源的,可伸缩的,高可靠,高性能,面向列的分布式存储系统. 类似于Google的BigTable,其分布式计算采用MapReduce,通过MapReduce完成大块数据加载和全表扫描操作. 文件存储系统是HDFS,通过Zookeeper来完成状态管理协同服务.

[原]HBase StoreFile Compaction

- - 芒果先生Mango的专栏
Store File的合并策略比较复杂,涉及多个参数,合并策略的好坏,直接影响HBase的读写性能. 发现这篇博文:http://blog.csdn.net/azhao_dn/article/details/8867036 对Compaction描述的言简意赅:. hbase为了防止小文件(被刷到磁盘的menstore)过多,以保证保证查询效率,hbase需要在必要的时候将这些小的store file合并成相对较大的store file,这个过程就称之为compaction.