hbase的cache与batch的理解_梦想还是要有的,万一实现了呢!-CSDN博客
cache:
在默认情况下,如果你需要从hbase中查询数据,在获取结果ResultScanner时,hbase会在你每次调用ResultScanner.next()操作时对返回的每个Row执行一次RPC操作。即使你使用ResultScanner.next(int nbRows)时也只是在客户端循环调用RsultScanner.next()操作,你可以理解为hbase将执行查询请求以迭代器的模式设计,在执行next()操作时才会真正的执行查询操作,而对每个Row都会执行一次 RPC操作。
因此显而易见的就会想如果我对多个Row返回查询结果才执行一次RPC调用,那么就会减少实际的通讯开销。这个就是hbase配置属性“hbase.client.scanner.caching”的由来,设置cache可以在hbase配置文件中显示静态的配置,也可以在程序动态的设置。
cache值得设置并不是越大越好,需要做一个平衡。cache的值越大,则查询的性能就越高,但是与此同时,每一次调用next()操作都需要花费更长的时间,因为获取的数据更多并且数据量大了传输到客户端需要的时间就越长,一旦你超过了maximum heap the client process 拥有的值,就会报outofmemoryException异常。当传输rows数据到客户端的时候,如果花费时间过长,则会抛出ScannerTimeOutException异常。
batch:
在cache的情况下,我们一般讨论的是相对比较小的row,那么如果一个Row特别大的时候应该怎么处理呢?要知道cache的值增加,那么在client process 占用的内存就会随着row的增大而增大。在hbase中同样为解决这种情况提供了类似的操作:Batch。可以这么理解:
cache是面向行的优化处理,batch是面向列的优化处理。
Batch它用来控制每次调用next()操作时会返回多少列,比如你设置setBatch(5),那么每一个Result实例就会返回5列,如果你的列数为17的话,那么就会获得四个Result实例,分别含有5,5,5,2个列。
下面会以表格的形式来帮助理解,假设我们拥有10Row,每个row拥有2个family,每个family拥有10个列。(也就是说每个Row含有20列)
caching | batch | Results | RPCs | Notes |
---|---|---|---|---|
1 | 1 | 200 | 201 | 额外的一个RPC是用来判断scan是否完成 |
200 | 1 | 200 | 2 | |
2000 | 100 | 10 | 1 | 超过的部分没有用处,但是判断scan也在那一个RPC 中完成 |
2 | 100 | 10 | 6 | 10/2 +1 (额外的判断开销) |
2 | 10 | 20 | 11 | |
5 | 100 | 10 | 3 | |
5 | 20 | 10 | 3 | |
10 | 10 | 20 | 3 |
RPCs=(Rows* Cols per Row) / Min(Cols per Row, Batch size) / Scanner caching
上图引用自hbase权威指南,是用来表示一个RPC call的构成。
package com.lxz.hbasemaven;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before;
import org.junit.Test;
public class HbaseScannerCaching {
private Connection conn;
@Before
public void initConn() throws Exception {
Configuration conf = HBaseConfiguration.create();
conn = ConnectionFactory.createConnection(conf);
}
@Test
public void get() throws Exception {
HTable table = (HTable) conn.getTable(TableName.valueOf("ns1:t1"));
Scan scan = new Scan();
//设置batch和cache
scan.setBatch(3);
scan.setCaching(1000);
scan.addFamily(Bytes.toBytes("f1"));
long start = System.currentTimeMillis();
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> it = scanner.iterator();
while (it.hasNext()) {
Result r = it.next();
String no = Bytes.toString(CellUtil.cloneValue(r.getColumnCells(Bytes.toBytes("f1"),
Bytes.toBytes("no")).get(0)));
String name = Bytes.toString(CellUtil.cloneValue(r.getColumnCells(Bytes.toBytes("f1"),
Bytes.toBytes("name")).get(0)));
int age = Bytes.toInt(CellUtil.cloneValue(r.getColumnCells(Bytes.toBytes("f1"),
Bytes.toBytes("age")).get(0)));
System.out.println(no + "," + name + "," + age);
}
long end = System.currentTimeMillis();
System.out.println("time:" + (end - start));
}
}