<<上篇 | 首页 | 下篇>>

hbase用coprocessor实现二级索引 | 邓的博客

HBase在0.92之后引入了coprocessors,提供了一系列的钩子,让我们能够轻易实现访问控制和二级索引的特性。下面简单介绍下两种coprocessors,第一种是Observers,它实际类似于触发器,第二种是Endpoint,它类似与存储过程。由于这里只用到了Observers,所以只介绍Observers,想要更详细的介绍请查阅(https://blogs.apache.org/hbase/entry/coprocessor_introduction)。observers分为三种:

RegionObserver:提供数据操作事件钩子;

WALObserver:提供WAL(write ahead log)相关操作事件钩子;

MasterObserver:提供DDL操作事件钩子。

相关接口请参阅hbase api。

下面给出一个例子,该例子使用RegionObserver实现在写主表之前将索引数据先写到另外一个表:

1 package com.dengchuanhua.testhbase;
2  
3 import java.io.IOException;
4  import java.util.Iterator;
5  import java.util.List;
6  
7 import org.apache.hadoop.conf.Configuration;
8  import org.apache.hadoop.hbase.KeyValue;
9  import org.apache.hadoop.hbase.client.HTable;
10  import org.apache.hadoop.hbase.client.Put;
11  import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
12  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
13  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
14  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
15  
16 public class TestCoprocessor extends BaseRegionObserver {
17  
18 @Override
19  public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
20  final Put put, final WALEdit edit, final boolean writeToWAL)
21  throws IOException {
22  //set configuration
23  Configuration conf = new Configuration();
24  //need conf.set...
25  
26 HTable table = new HTable(conf, "indexTableName");
27  List<KeyValue> kv = put.get("familyName".getBytes(), "columnName".getBytes());
28  Iterator<KeyValue> kvItor = kv.iterator();
29  while (kvItor.hasNext()) {
30  KeyValue tmp = kvItor.next();
31  Put indexPut = new Put(tmp.getValue());
32  indexPut.add("familyName".getBytes(), "columnName".getBytes(), tmp.getRow());
33  table.put(indexPut);
34  }
35  table.close();
36  }
37  
38 }

写完后要加载到table里面去,先把该文件打包成test.jar并上传到hdfs的/demo路径下,然后操作如下:

1. disable ‘testTable’

2. alter ‘testTable’, METHOD=>’table_att’,’coprocessor’=>’hdfs:///demo/test.jar|com.dengchuanhua.testhbase.TestCoprocessor|1001′

3. enable ‘testTable’

然后往testTable里面插数据就会自动往indexTableName写数据了。

总结:本文主要介绍了一个用coprocessor实现二级索引的例子。

阅读全文……

[HBase] Hbase Coprocessors - 芒果先生Mango的专栏 - 博客频道 - CSDN.NET

本文是笔者学习过程中的简单笔记,日后会逐渐增加内容,主要参考资料是《Hbase The Definitive Guide》。

我们可以通过Filter来减少从Server到Client在网络上传输的数据总量,以提升效率。通过HBase的Coprocessor特性,我们甚至可以将计算(computation)移动到数据所在的节点。

Introduction to Coprocessors

coprocessor使你能够直接在每个region server上执行任意的代码。更精确地说,它提供一些通过事件触发的功能,以region为基础执行code;这很像关系型数据库系统中的procedures(存储过程)。

在使用coprocessor时,你需要基于特定的interface创建专门的类,以jar包的形式提供给region server (如:可以将jar包放到$HBASE_HOME/lib/目录下)。这些coprocessor类可以通过配置文件静态加载,也可以在程序代码中动态加载。

 

corpocessor 框架提供了两种coprocessor基类:

1.Observer

这种coprocessor跟触发器相像:当特定的时间发生时,回调函数就会执行。

RegionObserver

处理数据操纵事件(data manipulationevents),这种coprocessor是和表的region紧密相连的。可以看作DML Coprocessor

MasterObserver

处理数据管理事件,是cluster范围的coprocessor。可以看做DDL Coprocessor

WALObserver

处理 write-ahead log processing 事件

2.Endpoint

 

The Coprocessor Class

所有的coprocessor类必须实现org.apache.hadoop.hbase.Coprocessor接口。

1.属性

PRIORITY_HIGHEST,PRIORITY_SYSTEM,PRIORITY_USER,PRIORITY_LOWEST四个静态常量表示coprocessor的优先级.值越低优先级越高。

2.方法

start(env)  stop(env) :这两个方法在coprocessor开始及退役的时候被调用(these two methods are called when the  coprocessor class is started,and eventually when it is decommissioned)

evn参数用来保存coprocessor整个生命周期的状态。

 

[java] view plaincopy
 
  1. package org.apache.hadoop.hbase;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. /** 
  6.  * Coprocess interface. 
  7.  */  
  8. public interface Coprocessor {  
  9.   static final int VERSION = 1;  
  10.   
  11.   /** Highest installation priority */  
  12.   static final int PRIORITY_HIGHEST = 0;  
  13.   /** High (system) installation priority */  
  14.   static final int PRIORITY_SYSTEM = Integer.MAX_VALUE / 4;  
  15.   /** Default installation priority for user coprocessors */  
  16.   static final int PRIORITY_USER = Integer.MAX_VALUE / 2;  
  17.   /** Lowest installation priority */  
  18.   static final int PRIORITY_LOWEST = Integer.MAX_VALUE;  
  19.   
  20.   /** 
  21.    * Lifecycle state of a given coprocessor instance. 
  22.    */  
  23.   public enum State {  
  24.     UNINSTALLED,  
  25.     INSTALLED,  
  26.     STARTING,  
  27.     ACTIVE,  
  28.     STOPPING,  
  29.     STOPPED  
  30.   }  
  31.   
  32.   // Interface  
  33.   void start(CoprocessorEnvironment env) throws IOException;  
  34.   
  35.   void stop(CoprocessorEnvironment env) throws IOException;  
  36. }  


Coprocessor Loading 加载coprocessor

 

静态加载和动态加载。

静态加载:在hbase-site.xml中做类似下面的配置

 

[html] view plaincopy
 
  1. <property>  
  2.     <name>hbase.coprocessor.region.classes</name>  
  3.     <value>coprocessor.RegionObserverExample,coprocessor.AnotherCoprocessor</value>  
  4. </property>  
  5. <property>  
  6.     <name>hbase.coprocessor.master.classes</name>  
  7.     <value>coprocessor.MasterObserverExample</value>  
  8. </property>  
  9. <property>  
  10.     <name>hbase.coprocessor.wal.classes</name>  
  11.     <value>coprocessor.WALObserverExample,bar.foo.MyWALObserver</value>  
  12. </property>  

 

 

 

动态加载:通过table descriptor提供的接口实现;看下面的例子,创建表testtable,动态加载RegionObserverExample到该表的region

 

 

[java] view plaincopy
 
  1. public class LoadWithTableDescriptorExample {  
  2.           
  3.     public static void main(String[] args) throws IOException  
  4.     {  
  5.         Configuration conf = HBaseConfiguration.create();  
  6.         FileSystem fs = FileSystem.get(conf);  
  7.         //coprocessor所在的jar包的存放路径  
  8.         Path path = new Path(fs.getUri() + Path.SEPARATOR +"test/coprocessor/"+  
  9.                 "test.jar");  
  10.         //HTableDescriptor  
  11.         HTableDescriptor htd = new HTableDescriptor("testtable");  
  12.         //addFamily  
  13.         htd.addFamily(new HColumnDescriptor("colfam1"));  
  14.         //  
  15.         //设置要加载的corpocessor  
  16.         htd.setValue("COPROCESSOR$1", path.toString() +  
  17.                 "|" + RegionObserverExample.class.getCanonicalName() +  
  18.                 "|" + Coprocessor.PRIORITY_USER);  
  19.         //  
  20.         HBaseAdmin admin = new HBaseAdmin(conf);  
  21.           
  22.         //创建表"testtable"  
  23.         admin.createTable(htd);  
  24.           
  25.         System.out.println("end");  
  26.     }  
  27. }  


下面是RegionObserverExample类的实现, 编译通过后,将该类打包成test.jar,并上传到hdfs://master:9000/test/coprocessor目录下

 

 

[java] view plaincopy
 
  1. package coprocessor;  
  2.   
  3. import java.io.IOException;  
  4. import java.sql.Date;  
  5. import java.util.List;  
  6. import org.apache.commons.net.ntp.TimeStamp;  
  7. import org.apache.hadoop.hbase.KeyValue;  
  8. import org.apache.hadoop.hbase.client.Get;  
  9. import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;  
  10. import org.apache.hadoop.hbase.coprocessor.ObserverContext;  
  11. import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;  
  12. import org.apache.hadoop.hbase.util.Bytes;  
  13.   
  14. public class RegionObserverExample extends  
  15.     BaseRegionObserver {  
  16.       
  17.     public static final byte[] FIXED_ROW =  
  18.             Bytes.toBytes("@@@GETTIME@@@");  
  19.     //实现功能:用get查询 "@@@GETTIME@@@"行时,以字节数组形式返回系统时间  
  20.     @Override  
  21.     public void preGet(  
  22.             final ObserverContext<RegionCoprocessorEnvironment> e,  
  23.             final Get get, final List<KeyValue> results) throws  
  24.             IOException {  
  25.                 if (Bytes.equals(get.getRow(), FIXED_ROW)) {  
  26.                     KeyValue kv = new KeyValue(get.getRow(), FIXED_ROW,  
  27.                             FIXED_ROW,  
  28.                             Bytes.toBytes(System.currentTimeMillis()));  
  29.                     results.add(kv);  
  30.                 }  
  31.     }  
  32.       
  33.     public static void main(String args[]){  
  34.           
  35.         System.out.println("complete!");  
  36.     }  
  37. }  

 

 

Endpoints

  前面提到的RegionObserver例子通过已知的row key参数,将列计算功能添加到get请求期间。看起来这足以实现其他功能,比如恩能够返回所有给定列的value的和的聚合函数。然而,RegionObserver并不能实现上述功能,因为row key 决定了由哪个region处理request,这样就只能将计算请求(computation request)发送到单一的server上。

HBase为了克服上述RegionObserver的局限性,由coprocessor框架提供了一个动态调用实现(a dynamic call implementation),称作endpoint concept.

 

The CoprocessorProtocol interface

 

The BaseEndpointCoprocessor class

实现一个endpoint包括以下两个步骤

1.Extend the CoprocessorProtocol interface

2.Extend the BaseEndpointCoprocessor class

下面是一个小例子,实现功能:客户端通过远程调用检索每个region的行数和KeyValue的个数。

 

1.RowCountProtocol interface, code:

[java] view plaincopy
 
  1. public interface RowCountProtocol extends CoprocessorProtocol {  
  2.     //获取行数  
  3.     long getRowCount() throws IOException;  
  4.     //获取应用Filter后的结果集的行数  
  5.     long getRowCount(Filter filter) throws IOException;  
  6.     //获取KeyValue的个数  
  7.     long getKeyValueCount() throws IOException;  
  8. }  

2.RowCountEndPoint class, code:

[java] view plaincopy
 
  1. public class RowCountEndPoint extends BaseEndpointCoprocessor implements  
  2.         RowCountProtocol {  
  3.   
  4.     public RowCountEndPoint() {  
  5.         // TODO Auto-generated constructor stub  
  6.     }  
  7.   
  8.     @Override  
  9.     public long getRowCount() throws IOException {  
  10.         // TODO Auto-generated method stub  
  11.         return this.getRowCount(new FirstKeyOnlyFilter());  
  12.     }  
  13.   
  14.     @Override  
  15.     public long getRowCount(Filter filter) throws IOException {  
  16.         // TODO Auto-generated method stub  
  17.   
  18.           
  19.         return this.getRowCount(filter,false);  
  20.     }  
  21.   
  22.     @Override  
  23.     public long getKeyValueCount() throws IOException {  
  24.         // TODO Auto-generated method stub  
  25.         return this.getRowCount(null,true);  
  26.     }  
  27.       
  28.       
  29.     public long getRowCount(Filter filter,boolean countKeyValue) throws IOException {  
  30.         // TODO Auto-generated method stub  
  31.         Scan scan =new Scan();  
  32.         scan.setMaxVersions(1);  
  33.         if(filter !=null){  
  34.             scan.setFilter(filter);  
  35.         }  
  36.           
  37.         RegionCoprocessorEnvironment environment=  
  38.                 (RegionCoprocessorEnvironment) this.getEnvironment();  
  39.           
  40.         //使用内部scanner做扫描。  
  41.         InternalScanner scanner = environment.getRegion().getScanner(scan);  
  42.         //  
  43.         long result=0;  
  44.           
  45.         //计数  
  46.         try{  
  47.             boolean done=false;  
  48.             List<KeyValue> curValue = new ArrayList<KeyValue>();  
  49.             do{  
  50.                 curValue.clear();  
  51.                 done=scanner.next(curValue);  
  52.                 result+=countKeyValue?curValue.size():1;  
  53.                   
  54.             }while(done);  
  55.               
  56.         }catch(Exception e){  
  57.             e.printStackTrace();  
  58.         }finally{  
  59.             scanner.close();  
  60.         }  
  61.   
  62.         return result;  
  63.     }  
  64.   
  65.     /** 
  66.      * @param args 
  67.      */  
  68.     public static void main(String[] args) {  
  69.         // TODO Auto-generated method stub  
  70.         System.out.println("success!");  
  71.     }  
  72. }  

 

 

3.

3.1将上述类打包到my_coprocessor.jar, copy到各个RegionServer节点的 $HBASE_HOME/lib目录下;

3.2修改$HBASE_HOME/conf/hbase-site.xml配置文件,添加如下信息:

 

[java] view plaincopy
 
  1. <property>  
  2.                 <name>hbase.coprocessor.region.classes</name>  
  3.                 <value>  
  4.                         coprocessor.RegionObserverExample,  
  5.                         coprocessor.RowCountEndPoint  
  6.                 </value>  
  7.         </property>  

3.3 重启HBase Cluster

 

4.通过客户端调用之前定义的EndPoint Coprocessor

 

[java] view plaincopy
 
  1. public class EndPointExample {  
  2.   
  3.     /** 
  4.      * @author mango_song 
  5.      * @param args 
  6.      * @throws IOException  
  7.      */  
  8.     public static void main(String[] args) throws IOException {  
  9.         // TODO Auto-generated method stub  
  10.           
  11.         Configuration conf = HBaseConfiguration.create();  
  12.           
  13.         HTable table =new HTable(conf,"test");  
  14.           
  15.         try {  
  16.             //  
  17.             /*table.coprocessorExec 函数的描述信息: 
  18.              *  <RowCountProtocol, Long> Map<byte[], Long> org.apache.hadoop.hbase.client.HTable.coprocessorExec( 
  19.              *      Class<RowCountProtocol> protocol, 
  20.              *      byte[] startKey, byte[] endKey, 
  21.              *      Call<RowCountProtocol, Long> callable) 
  22.              *           throws IOException, Throwable 
  23.                     Invoke the passed org.apache.hadoop.hbase.client.coprocessor.Batch.Call 
  24.                     against the CoprocessorProtocol instances running in the selected regions. 
  25.                     All regions beginning with the region containing the startKey row,  
  26.                     through to the region containing the endKey row (inclusive) will be used. 
  27.                     If startKey or endKey is null, the first and last regions in the table, 
  28.                     respectively, will be used in the range selection. 
  29.             Specified by: coprocessorExec(...) in HTableInterface 
  30.             Parameters: 
  31.                 protocol the CoprocessorProtocol implementation to call 
  32.                 startKey start region selection with region containing this row 
  33.                 endKey select regions up to and including the region containing this row 
  34.                 callable wraps the CoprocessorProtocol implementation method calls made per-region 
  35.             Returns: 
  36.                 a Map of region names to org.apache.hadoop.hbase.client.coprocessor.Batch.Call.call(Object) return values 
  37.             Throws: 
  38.                 IOException 
  39.                 Throwable 
  40.              */  
  41.             Map<byte[], Long> results=table.coprocessorExec(  
  42.                     RowCountProtocol.class,  
  43.                     null,   
  44.                     null,   
  45.                     new Batch.Call<RowCountProtocol, Long>() {  
  46.   
  47.                         @Override  
  48.                         public Long call(RowCountProtocol instance)  
  49.                                 throws IOException {  
  50.                             // TODO Auto-generated method stub  
  51.                             return instance.getRowCount();  
  52.                         }  
  53.                     }  
  54.             );  
  55.               
  56.             long total =0;  
  57.             //打印出每个region的行数及总行数  
  58.             for(Map.Entry<byte[], Long> entry:results.entrySet() ){  
  59.                 total += entry.getValue();  
  60.                 System.out.println("Region: "+Bytes.toString(entry.getKey()) +  
  61.                         ", Count: "+entry.getValue());  
  62.             }  
  63.               
  64.             System.out.println("Total Count: "+total);  
  65.               
  66.               
  67.         } catch (Throwable e) {  
  68.             // TODO Auto-generated catch block  
  69.             e.printStackTrace();  
  70.         }     
  71.     }  
  72. }  

运行结果如下,可以看出test表共由三个region组成,每个region拥有的行数分别为9,13,78

 

 

[plain] view plaincopy
 
  1. 13/01/26 18:59:53 INFO zookeeper.ClientCnxn: Opening socket connection to server master/172.21.15.21:2181. Will not attempt to authenticate using SASL (无法定位登录配置)  
  2. 13/01/26 18:59:53 INFO zookeeper.ClientCnxn: Socket connection established to master/172.21.15.21:2181, initiating session  
  3. 13/01/26 18:59:53 INFO zookeeper.ClientCnxn: Session establishment complete on server master/172.21.15.21:2181, sessionid = 0x13c6a82639f000c, negotiated timeout = 40000  
  4. Region: test,,1358337586380.f3e04b8b43d073a509e9a374f643277a., Count: 9  
  5. Region: test,209,1358337769870.be5a99319eca6f2881ccd73789bfafb0., Count: 13  
  6. Region: test,222,1358337769870.94685f417a95e91d0c9185a95974f866., Count: 78  
  7. Total Count: 100  


Batch类提供了一个更方便的方法来获取远程endpoint, Batch.forMethod() ,你可以得到一个配置好的Batch.Call实例用来传递到远程的region servers. 下面对EndPointExample做了修改,看起来是不是好看多了~~

[java] view plaincopy
 
  1. Batch.Call call =Batch.forMethod(RowCountEndPoint.class, "getKeyValueCount");  
  2.               
  3.             Map<byte[], Long> results=table.coprocessorExec(  
  4.                     RowCountProtocol.class,  
  5.                     null,   
  6.                     null,   
  7.                     call  
  8.             );  

 

然而,通过直接implementing Batch.Call 更加灵活和强大,(you can perform additional processing on the results ,implementing Batch.call directly will provide  more power and  flexibility.)  下面的例子,同时获取rowCount和keyvalueCount

[java] view plaincopy
 
  1. Map<byte[],Pair<Long,Long>> results=table.coprocessorExec(  
  2.         RowCountProtocol.class,  
  3.         null,  
  4.         null,  
  5.         new Batch.Call<RowCountProtocol,Pair<Long,Long>>() {  
  6.   
  7.             @Override  
  8.             public Pair<Long, Long> call(RowCountProtocol instance)  
  9.                     throws IOException {  
  10.                 // TODO Auto-generated method stub  
  11.                 return new Pair<Long, Long>(  
  12.                         instance.getRowCount(),  
  13.                         instance.getKeyValueCount()  
  14.                         );  
  15.             }     
  16.         }  
  17. );  
  18.           
  19. //  
  20. long totalRows=0;  
  21. long totalKeyValues=0;  
  22. for(Map.Entry<byte[], Pair<Long,Long>> entry:results.entrySet() ){  
  23.       
  24.     totalRows+=entry.getValue().getFirst();  
  25.     totalKeyValues+=entry.getValue().getSecond();  
  26.       
  27.     System.out.println("region="+Bytes.toString(entry.getKey())+  
  28.             "  ,  rowCount="+entry.getValue().getFirst()+  
  29.             "  ,  keyValueCount="+entry.getValue().getSecond());  
  30. }  
  31. System.out.println("totalRows="+totalRows+  
  32.         ",totalKeyValues="+totalKeyValues);  

 

 

当然,我们也可以通过coprocessorProxy()方法获取endpoint的client-side 代理,通过该代理,可以在给定的row key所在的region做你想要的操作 (如果row key不存在,则该对应的region为rowkey范围包含该row key的region)。

[java] view plaincopy
 
  1. RowCountProtocol protocol=table.coprocessorProxy(RowCountProtocol.class, Bytes.toBytes("202"));  
  2.               
  3.             long rowsInRegion = protocol.getRowCount();  
  4.               
  5.             System.out.println("Region Row Count: "+rowsInRegion);  

 

 

另一种动态加载方法,通过modifytable修改表方式:

  1. public static void main(String[] args) throws MasterNotRunningException,  
  2.             Exception {  
  3.         // TODO Auto-generated method stub  
  4.         byte[] tableName = Bytes.toBytes("userinfo");  
  5.         Configuration conf = HBaseConfiguration.create();  
  6.         HBaseAdmin admin = new HBaseAdmin(conf);  
  7.         admin.disableTable(tableName);  
  8.   
  9.         HTableDescriptor htd = admin.getTableDescriptor(tableName);  
  10.         htd.addCoprocessor(AggregateImplementation.class.getName(), new Path("hdfs://master68:8020/sharelib/aggregate.jar"), 1001, null);  
  11.         //htd.removeCoprocessor(RowCountEndpoint.class.getName());  
  12.         admin.modifyTable(tableName, htd);  
  13.         admin.enableTable(tableName);  
  14.         admin.close();  
  15.           
  16.     }  

 

阅读全文……

HBase Coprocessor 剖析与编程实践 - 林场 - 博客园

1.起因(Why HBase  Coprocessor)

HBase作为列族数据库最经常被人诟病的特性包括:无法轻易建立“二级索引”,难以执行求和、计数、排序等操作。比如,在旧版本的(<0.92)Hbase中,统计数据表的总行数,需要使用Counter方法,执行一次MapReduce Job才能得到。虽然HBase在数据存储层中集成了MapReduce,能够有效用于数据表的分布式计算。然而在很多情况下,做一些简单的相加或者聚合计算的时候,如果直接将计算过程放置在server端,能够减少通讯开销,从而获得很好的性能提升。于是,HBase在0.92之后引入了协处理器(coprocessors),实现一些激动人心的新特性:能够轻易建立二次索引、复杂过滤器(谓词下推)以及访问控制等。

2.灵感来源( Source of Inspration)

HBase协处理器的灵感来自于Jeff Dean 09年的演讲( P66-67)。它根据该演讲实现了类似于bigtable的协处理器,包括以下特性:

  • 每个表服务器的任意子表都可以运行代码
  • 客户端的高层调用接口(客户端能够直接访问数据表的行地址,多行读写会自动分片成多个并行的RPC调用)
  • 提供一个非常灵活的、可用于建立分布式服务的数据模型
  • 能够自动化扩展、负载均衡、应用请求路由
HBase的协处理器灵感来自bigtable,但是实现细节不尽相同。HBase建立了一个框架,它为用户提供类库和运行时环境,使得他们的代码能够在HBase region server和master上处理。

3.细节剖析(Implementation)

协处理器分两种类型,系统协处理器可以全局导入region server上的所有数据表,表协处理器即是用户可以指定一张表使用协处理器。协处理器框架为了更好支持其行为的灵活性,提供了两个不同方面的插件。一个是观察者(observer),类似于关系数据库的触发器。另一个是终端(endpoint),动态的终端有点像存储过程。

 3.1观察者(Observer)

观察者的设计意图是允许用户通过插入代码来重载协处理器框架的upcall方法,而具体的事件触发的callback方法由HBase的核心代码来执行。协处理器框架处理所有的callback调用细节,协处理器自身只需要插入添加或者改变的功能。

以HBase0.92版本为例,它提供了三种观察者接口:

  • RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、Delete、Scan等。
  • WALObserver:提供WAL相关操作钩子。
  • MasterObserver:提供DDL-类型的操作钩子。如创建、删除、修改数据表等。

这些接口可以同时使用在同一个地方,按照不同优先级顺序执行.用户可以任意基于协处理器实现复杂的HBase功能层。HBase有很多种事件可以触发观察者方法,这些事件与方法从HBase0.92版本起,都会集成在HBase API中。不过这些API可能会由于各种原因有所改动,不同版本的接口改动比较大,具体参考Java Doc

RegionObserver工作原理,如图1所示。更多关于Observer细节请参见HBaseBook的第9.6.3章节

regionobserver.png

图1 RegionObserver工作原理

 

3.2终端(Endpoint)

终端是动态RPC插件的接口,它的实现代码被安装在服务器端,从而能够通过HBase RPC唤醒。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个终端,它们的实现代码会被目标region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。终端的使用,如下面流程所示:

  1. 定义一个新的protocol接口,必须继承CoprocessorProtocol.
  2. 实现终端接口,该实现会被导入region环境执行。
  3. 继承抽象类BaseEndpointCoprocessor.
  4. 在客户端,终端可以被两个新的HBase Client API调用 。单个region:HTableInterface.coprocessorProxy(Class<T> protocol, byte[] row) 。rigons区域:HTableInterface.coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable)

整体的终端调用过程范例,如图2所示:

rpc.png

图2 终端调用过程范例

4.编程实践(Code Example)

在该实例中,我们通过计算HBase表中行数的一个实例,来真实感受协处理器 的方便和强大。在旧版的HBase我们需要编写MapReduce代码来汇总数据表中的行数,在0.92以上的版本HBase中,只需要编写客户端的代码即可实现,非常适合用在WebService的封装上。

4.1启用协处理器 Aggregation(Enable Coprocessor Aggregation)

我们有两个方法:1.启动全局aggregation,能过操纵所有的表上的数据。通过修改hbase-site.xml这个文件来实现,只需要添加如下代码:

<property>
   <name>hbase.coprocessor.user.region.classes</name>
   <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
 </property>

2.启用表aggregation,只对特定的表生效。通过HBase Shell 来实现。

(1)disable指定表。hbase> disable 'mytable'

(2)添加aggregation hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'

(3)重启指定表 hbase> enable 'mytable'

4.2统计行数代码(Code Snippet)

复制代码
public class MyAggregationClient {    
private static final byte[] TABLE_NAME = Bytes.toBytes("mytable");  
private static final byte[] CF = Bytes.toBytes("vent");  
public static void main(String[] args) throws Throwable {  
Configuration customConf = new Configuration();  
customConf.setStrings("hbase.zookeeper.quorum", "node0,node1,node2"); //提高RPC通信时长
customConf.setLong("hbase.rpc.timeout", 600000); //设置Scan缓存
customConf.setLong("hbase.client.scanner.caching", 1000); 
Configuration configuration = HBaseConfiguration.create(customConf);  
AggregationClient aggregationClient = new AggregationClient( configuration);  
Scan scan = new Scan(); //指定扫描列族,唯一值
scan.addFamily(CF);  
long rowCount = aggregationClient.rowCount(TABLE_NAME, null, scan);  
System.out.println("row count is " + rowCount);   
}  
}
复制代码

 5.参考文献(References)

[1]Lai, et al.,(2012-02-01),"Coprocessor Introduction : Apache HBase".Available:https://blogs.apache.org/hbase/entry/coprocessor_introduction

[2]Apache.(2012-08-10),"The Apache HBase  Reference Guide".Available:http://hbase.apache.org/book.html#coprocessors

阅读全文……