基于solr实现hbase的二级索引 - oO脾气不坏Oo

标签: solr hbase 索引 | 发表时间:2014-11-16 21:24 | 作者:oO脾气不坏Oo
出处:

一、目的

    了解hbase的都知道,由于hbase基于行健有序存储,在查询时使用行健十分高效,然后想要实现关系型数据库那样可以随意组合的多条件查询、查询总记录数、分页等就比较麻烦了。想要实现这样的功能,我们可以采用两种方法:

  1. 使用hbase提供的filter,
  2. 自己实现二级索引,通过二级索引查询多符合条件的行健,然后再查询hbase。

    第一种方法不多说了,使用起来很方便,但是局限性也很大,hbase的filter是直接扫记录的,如果数据范围很大,会导致查询速度很慢。所以如果能先使用行健把记录缩小到一个较小范围,那么就比较适合,否则就不适用了。此外该方法不能解决获取总数的为。
    第二种是适用范围就比较广泛了,不过根据实现二级索引的方式解决的问题也不同。这里我们选择solr主要是因为solr可以很轻松实现各种查询(本来就是全文检索引擎)。

二、实现方法

    其实hbase结合solr实现方法还是比较简单的,重点在于一些实现细节上。将hbase记录写入solr的关键就在于hbase提供的Coprocessor,Coprocessor提供了两个实现:endpoint和observer,endpoint相当于关系型数据库的存储过程,而observer则相当于触发器。说到这相信大家应该就明白了,我们要利用的就是observer。observer允许我们在记录put前后做一些处理,而我们就是通过postPut将记录同步写入solr(关于Coprocessor具体内容请自行查资料)。

    而写入solr这块就比较简单了,如果是单机就使用ConcurrentUpdateSolrServer,如果是集群就是用CloudSolrServer。不过这里需要注意的是由于CloudSolrServer不像ConcurrentUpdateSolrServer那样内置缓存,默认情况下hbase没写一条数据就会向solr提交一次,这样速度会非常慢(很可能hbase写完很久solr这边还在提交),因此要自己实现一个缓存池,根据hbase的写入速度动态调整,并批量向solr提交。

三、实现代码

    实现方法弄清处置后代码就很容易写了。首先看下Coprocessor的代码:

package com.uboxol.hbase.coprocessor;

import com.uboxol.model.VmMoney;
import com.uboxol.solr.SolrWriter;
import java.io.IOException;

/**
* Created with IntelliJ IDEA.
* User: guojing
* Date: 14-10-24
* Time: 上午11:08
* To change this template use File | Settings | File Templates.
*/
public class SolrIndexCoprocessorObserver extends BaseRegionObserver {
private static Logger log = Logger.getLogger(SolrIndexCoprocessorObserver.class);

@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
String rowKey = Bytes.toString(put.getRow());
try {
Cell cellInnerCode = put.get(Bytes.toBytes("data"), Bytes.toBytes("inner_code")).get(0);
String innerCode = new String(CellUtil.cloneValue(cellInnerCode));

Cell cellNodeId = put.get(Bytes.toBytes("data"), Bytes.toBytes("node_id")).get(0);
String nodeId = new String(CellUtil.cloneValue(cellNodeId));

Cell cellPayType = put.get(Bytes.toBytes("data"), Bytes.toBytes("pay_type")).get(0);
String payType = new String(CellUtil.cloneValue(cellPayType));

Cell cellCts = put.get(Bytes.toBytes("data"), Bytes.toBytes("cts")).get(0);
String cts = new String(CellUtil.cloneValue(cellCts));

Cell cellTraSeq = put.get(Bytes.toBytes("data"), Bytes.toBytes("tra_seq")).get(0);
String traSeq = new String(CellUtil.cloneValue(cellTraSeq));

cts=cts.replace("-","");
cts=cts.replace(" ","");
cts=cts.replace(":","");

VmMoney vm = new VmMoney();
vm.setCts(cts);
vm.setId(new Integer(id));
vm.setInnerCode(innerCode);
vm.setNodeId(new Integer(nodeId));
vm.setPayType(new Integer(payType));
vm.setRowKey(rowKey);
vm.setTraSeq(traSeq);

SolrWriter so = new SolrWriter();
so.addDocToCache(vm);
} catch (Exception ex){
log.info("write "+rowKey+" to solr fail:"+ex.getMessage());
ex.printStackTrace();
}
}

@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
String rowKey = Bytes.toString(delete.getRow());
try {
SolrWriter so = new SolrWriter();
so.deleteDoc(rowKey);
} catch (Exception ex){
log.info("delete "+rowKey+" from solr fail:"+ex.getMessage());
ex.printStackTrace();
}
}
}

    里边代码很简单,就是在hbase记录写入后和删除后调用SolrWriter进行处理。下边看下SolrWriter的实现:

package com.uboxol.solr;

import com.uboxol.model.VmMoney;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class SolrWriter {
private static Logger log = Logger.getLogger(SolrWriter.class);

public static String urlSolr = ""; //solr地址
private static String defaultCollection = ""; //默认collection
private static int zkClientTimeOut =0 ;//zk客户端请求超时间
private static int zkConnectTimeOut =0;//zk客户端连接超时间
private static CloudSolrServer solrserver = null;

private static int maxCacheCount = 0; //缓存大小,当达到该上限时提交
private static Vector<VmMoney> cache = null; //缓存
public static Lock commitLock =new ReentrantLock(); //在添加缓存或进行提交时加锁

private static int maxCommitTime = 60; //最大提交时间,s

static {
Configuration conf = HBaseConfiguration.create();
urlSolr = conf.get("hbase.solr.zklist", "192.168.12.1:2181,192.168.12.2:2181,192.168.12.3:2181");
defaultCollection = conf.get("hbase.solr.collection","collection1");
zkClientTimeOut = conf.getInt("hbase.solr.zkClientTimeOut", 10000);
zkConnectTimeOut = conf.getInt("hbase.solr.zkConnectTimeOut", 10000);
maxCacheCount = conf.getInt("hbase.solr.maxCacheCount", 10000);
maxCommitTime = conf.getInt("hbase.solr.maxCommitTime", 60*5);

log.info("solr init param"+urlSolr+" "+defaultCollection+" "+zkClientTimeOut+" "+zkConnectTimeOut+" "+maxCacheCount+" "+maxCommitTime);
try {
cache=new Vector<VmMoney>(maxCacheCount);

solrserver = new CloudSolrServer(urlSolr);
solrserver.setDefaultCollection(defaultCollection);
solrserver.setZkClientTimeout(zkClientTimeOut);
solrserver.setZkConnectTimeout(zkConnectTimeOut);

//启动定时任务,第一次延迟10执行,之后每隔指定时间执行一次
Timer timer=new Timer();
timer.schedule(new CommitTimer(),10*1000,maxCommitTime*1000);
} catch (Exception ex){
ex.printStackTrace();
}

}

/**
* 批量提交
*/
public void inputDoc(List<VmMoney> vmMoneyList) throws IOException, SolrServerException {
if (vmMoneyList == null || vmMoneyList.size() == 0) {
return;
}
List<SolrInputDocument> doclist= new ArrayList<SolrInputDocument>(vmMoneyList.size());
for (VmMoney vm : vmMoneyList) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", vm.getId());
doc.addField("node_id", vm.getNodeId());
doc.addField("inner_code", vm.getInnerCode());
doc.addField("pay_type", vm.getPayType());
doc.addField("rowkey", vm.getRowKey());
doc.addField("cts", vm.getCts());
doc.addField("tra_seq", vm.getTraSeq());

doclist.add(doc);
}
solrserver.add(doclist);
}

/**
* 单条提交
*/
public void inputDoc(VmMoney vmMoney) throws IOException, SolrServerException {
if (vmMoney == null) {
return;
}
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", vmMoney.getId());
doc.addField("node_id", vmMoney.getNodeId());
doc.addField("inner_code", vmMoney.getInnerCode());
doc.addField("pay_type", vmMoney.getPayType());
doc.addField("rowkey", vmMoney.getRowKey());
doc.addField("cts", vmMoney.getCts());
doc.addField("tra_seq", vmMoney.getTraSeq());

solrserver.add(doc);

}

public void deleteDoc(List<String> rowkeys) throws IOException, SolrServerException {
if (rowkeys == null || rowkeys.size() == 0) {
return;
}
solrserver.deleteById(rowkeys);
}

public void deleteDoc(String rowkey) throws IOException, SolrServerException {

solrserver.deleteById(rowkey);
}

/**
* 添加记录到cache,如果cache达到maxCacheCount,则提交
*/
public static void addDocToCache(VmMoney vmMoney) {
commitLock.lock();
try {
cache.add(vmMoney);
log.info("cache commit maxCacheCount:"+maxCacheCount);
if (cache.size() >= maxCacheCount) {
log.info("cache commit count:"+cache.size());
new SolrWriter().inputDoc(cache);
cache.clear();
}
} catch (Exception ex) {
log.info(ex.getMessage());
} finally {
commitLock.unlock();
}
}

/**
* 提交定时器
*/
static class CommitTimer extends TimerTask {
@Override
public void run() {
commitLock.lock();
try {
if (cache.size() > 0) { //大于0则提交
log.info("timer commit count:"+cache.size());
new SolrWriter().inputDoc(cache);
cache.clear();
}
} catch (Exception ex) {
log.info(ex.getMessage());
} finally {
commitLock.unlock();
}
}
}
}

    SolrWriter的重点就在于addDocToCache方法和定时器CommitTimer,addDocToCache会在hbase每次插入数据时将记录插入缓存,并且判断是否达到上限,如果达到则将缓存内所用数据提交到solr,此外CommitTimer 则会每隔一段时间提交一次,以保证缓存内所有数据最终写入solr。
    其他一些辅助代码就不贴了,可自行到github查看: hbase-solr-coprocessor (代码仅作参考,由于业务不同不能直接运行)

四、部署

    这里重点说下hbase的Coprocessor部署的一些问题。部署步骤如下:

  • 将Coprocessor代码打成jar包,拷贝到所有hbase的region server上,注意jdk一定要1.6,高版本可能会导致无法加载
  • 将hbase的hbase.coprocessor.abortonerror设置成true,待确定Coprocessor运行正常后在改为false。此步骤非必要,但是如果Coprocessor有问题会导致所有region无法启动
  • 由于我们实现的Coprocessor是region级的,所以不需要启动,直接通过hbase shell即可加载:
    disable 'tablename'

    alter 'tablename',METHOD => 'table_att','coprocessor'=>'jar包路径,本地使用file:///开头,hdfs上的则用hdfs:///开头|1001|参数,多个逗号隔开'

    enable 'tablename'

五、总结

    这次hbase+solr的部署前后花了不少时间,其实理论方面都很简单,让人感觉轻而易举,但是实际实现的过程中就会遇到不少问题,就比如写入缓存之类的,如果不去测试,就很容易被忽略。


本文链接: 基于solr实现hbase的二级索引,转载请注明。

相关 [solr hbase 索引] 推荐:

基于solr实现hbase的二级索引 - oO脾气不坏Oo

- - 博客园_首页
    了解hbase的都知道,由于hbase基于行健有序存储,在查询时使用行健十分高效,然后想要实现关系型数据库那样可以随意组合的多条件查询、查询总记录数、分页等就比较麻烦了. 想要实现这样的功能,我们可以采用两种方法:. 使用hbase提供的filter,. 自己实现二级索引,通过二级索引查询多符合条件的行健,然后再查询hbase.

[原]自学大数据:基于Solr实现HBase的文本索引

- -
最近接触的项目中,需要针对HBase的数据进行索引查询,主要支持中文查询,分页查询等. 在此情况下,学习了搜索服务器solr. 总结了一些方法和经验,正好可以分享个大家,鼓励自己,共同学习. HBase目前只支持对rowkey的一级索引,对于二级索引还不支持,当然可以把所有要索引的字段都拼接到rowkey中,根据hbase的filter功能进行查询,但是这样操作数据会涉及到全表扫描,效率很低,速度慢,不利于后期扩展.

基于cdh5.7.5集群的solr(4.10.3-cdh5.7.5)+hbase(1.2.0-cdh5.7.5)构建二级索引方案

- - 行业应用 - ITeye博客
作为开发人员,在阅读该篇博客之前最好已具备以下基本技能:. 了解Linux命令、HBase Shell、Solr schema.xml文件的常用的标签含义、CDH集群基本操作(所需服务安装、卸载、配置). 在Hbase中,表的RowKey 按照字典排序, Region按照RowKey设置split point进行shard,通过这种方式实现的全局、分布式索引.

Solr与HBase架构设计 - aitanjupt

- - 博客园_首页
摘要:本篇是本人在做一个大数据项目. ,对于系统架构总结的一点想法,如何在保证存储量的情况下,又能保证数据的检索速度. 前提:      Solr、SolrCloud提供了一整套的数据检索方案,HBase提供了完善的大数据存储机制. 需求:      1、对于添加到HBase中的结构化数据,能够检索出来.

基于Solr的HBase多条件查询测试

- - 互联网 - ITeye博客
某电信项目中采用HBase来存储用户终端明细数据,供前台页面即时查询. HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级 的快 速检索,对于多字段的组合查询却无能为力. 针对HBase的多条件查询也有多种方案,但是这些方案要么太复杂,要么效率太低,本文只对基于Solr的 HBase多条件查询方案进行测试和验证.

5分钟搞定solr索引主从同步

- - 鲁塔弗的博客
solr配置主从同步非常简单,比mysql/redis 要简单的多,5分钟搞定. solr索引同步的是core对core,以core为基本同步单元. 一个solr instance可以装载多个core,每个core拥有不同的schema,core与实例的关系等同于mysql中table和database的关系.

主流全文索引工具的比较( Lucene, Sphinx, solr, elastic search)

- - 企业架构 - ITeye博客
前几天的调研(  Rails3下的 full text search (全文本搜索, 全文匹配. ) ), 我发现了两个不错的候选: . lucene  (solr, elasticsearch 都是基于它) . 把看到的有价值的文章记录在这里: . 回答1.  Result relevance ranking is the default.

hbase中二级索引的实现--ihbase

- - ITeye博客
一般来说,对数据库建立索引,往往需要单独的数据结构来存储索引的数据.在为hbase建立索引时,可以另外建立一张索引表,查询时先查询索引表,然后用查询结果查询数据表. 这个图左边表示索引表,右边是数据表. 但是对于hbase这种分布式的数据库来说,最大的问题是解决索引表和数据表的本地性问题,hbase很容易就因为负载均衡,表split等原因把索引表和数据表的数据分布到不同的region server上,比如下图中,数据表和索引表就出现在了不同的region server上.

全文检索引擎Solr系列—–全文检索基本原理

- - ImportNew
场景:小时候我们都使用过新华字典,妈妈叫你翻开第38页,找到“坑爹”所在的位置,此时你会怎么查呢. 毫无疑问,你的眼睛会从38页的第一个字开始从头至尾地扫描,直到找到“坑爹”二字为止. 这种搜索方法叫做 顺序扫描法. 对于少量的数据,使用顺序扫描是够用的. 但是妈妈叫你查出坑爹的“坑”字在哪一页时,你要是从第一页的第一个字逐个的扫描下去,那你真的是被坑了.

全文检索引擎Solr系列——整合中文分词组件IKAnalyzer

- - ImportNew
IK Analyzer是一款结合了词典和文法分析算法的中文分词组件,基于字符串匹配,支持用户词典扩展定义,支持细粒度和智能切分,比如:. 张三 | 说的 | 确实 | 在理. 张三 | 三 | 说的 | 的确 | 的 | 确实 | 实在 | 在理. 整合IK Analyzer比mmseg4j要简单很多, 下载解压缩IKAnalyzer2012FF_u1.jar放到目录:E:\solr-4.8.0\example\solr-webapp\webapp\WEB-INF\lib,修改配置文件schema.xml,添加代码:.