<< 如何中断正在执行IO的 Quartz 作业 | 首页 | 你能用Paper.js做什么? >>

Blur 上手 - 建于Hadoop 和 Lucene上的搜索工具

Getting Started with "Blur" - Search on Top of Hadoop and Lucene.

Blur是一个新的Apache 2.0许可的软件项目,提供了建于Hadoop和Lucene之上一个搜索功能。elasticsearch和Solr已经存在,为什么建立新的东西?虽然这些项目运作良好,不过他们没有与一个坚实的Hadoop生态系统集成。Blur始建专门针对大数据,从一开始考虑到可扩展性,冗余和性能,同时利用Hadoop堆栈中已经存在的所有优势。

 

一年半前,我的项目开始使用Hadoop的数据处理。很早,我们有网络问题,这使我们的HDFS集群网络连接充其量参差不齐。在一个周末,我们逐步失去了在集群的数据节点90中的47个网络连接。当我们在星期一早上的时候,我注意到,MapReduce系统是有点呆滞,但仍在工作。当我检查HDFS中,我看到我们的能力下降了约50%。集群上运行的fsck后,我惊奇地发现,上周末灾难性的失败但似乎文件系统仍然健康。这方面的经验,给我留下了深刻的印象。就在那时,我有个想法,以某种方式利用冗余和HDFS的容错来建立搜索系统的下一个版本,我才刚刚开始(重新)写。

 

我已经写了一个已在生产系统中几年的分片式Lucene的服务器。 Lucene的工作非常出色,做了一切我们需要的搜索工作。我们面临的问题是,它是运行在大铁箱是不是多余的,并不能很容易地扩展。看到Hadoop的一流的的可伸缩特征后,我决定寻找结合已经成熟和令人印象深刻的Lucene的功能设置与内置在Hadoop平台的可扩展性和冗余。因此这个实验项目Blur被创建。

 

Blur解决的最大的技术问题/功能:

 

  • 整个数据集的快速大规模索引
  • 自动分片Server故障转移
  • 通过Lucene的NRT实现近实时更新的兼容性
  • Lucene的FDT的文件压缩,同时保持随机存取性能
  • Lucene的的WAL(预写日志)提供数据的可靠性
  • Lucene直接R/W到HDFS中(seek写的问题)
  • Lucene的目录缓存块的随机存取性能

 

数据模型

 

在Blur的数据存储在包含行的表中。行必须有一个唯一的行ID,并包含一个或多个记录。记录有一个独特的记录ID(行内唯一)和逻辑上弥补了单个记录的列进行分组的列家族。列包含一个名称和一个值,一个记录可以包含多个列具有相同的名称。

 

01.{
02.rowid:"[email protected]",
03.records:[
04.{
05.recordid:"324182347",
06.family:"messages",
07.columns:[
08.{name:"to",value:"[email protected]"},
09.{name:"to",value:"[email protected]"},
10.{name:"subject",value:"important!"},
11.{name:"body",value:"This is a very important email...."}
12.]
13.}, {
14.recordid:"234387219",
15.family:"messages",
16.columns:[
17.{name:"to",value:"[email protected]"},
18.{name:"subject",value:"This is cool!"},
19.{name:"body",value:"Check this out....."}
20.]
21.}, {
22.recordid:"234123412",
23.family:"contacts",
24.columns:[
25.{name:"name",value:"Jon Doe"},
26.{name:"email",value:"[email protected]"}
27.]
28.}
29.]
30.}

 

 

架构

 

Blur使用Hadoop的MapReduce框架索引数据,Hadoop的HDFS文件系统用于存储索引。Thrift 用于所有的进程间通信同时 Zookeeper 被用于了解系统状态和存储元数据。Blur的架构是由两种类型的服务器进程:

 

  • Blur控制服务器
  • Blur分片服务器

分片服务器,从所有当前在线的表提供0个或多个碎片服务。在每个碎片服务器中哪些碎片在线是通过在Zookeeper的状态信息来计算的。如果碎片服务器宕机,通过与Zookeeper的余下的碎片服务器交互检测到故障,并确定他们的丢失的碎片需要从HDFS得到服务。

 

控制服务器提供了集群单一的入口点(逻辑),撒出查询,收集回复,并提供一个单一的响应。控制器和分片服务器暴露相同的 Thrift API,这有助于方便调试。它还允许开发人员启动一个单一的分片服务器,并与它进行交互,以与大型集群同样的方式。许多控制服务器可以(并且应该)冗余运行。控制器作为网关承担服务于分片服务器所有数据。

 

更新/加载数据

目前有两种方法来加载和更新数据。首先是通过MapReduce的大量载入,第二个是用Thrift通过突变调用。

 

大量载入 MapReduce的范例

 

01.public class BlurMapReduce {
02.public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
03.Configuration configuration = new Configuration();
04.String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
05.if (otherArgs.length != 2) {
06.System.err.println("Usage: blurindexer <in> <out>");
07.System.exit(2);
08.}
09. 
10.AnalyzerDefinition ad = new AnalyzerDefinition();
11. 
12.TableDescriptor td = new TableDescriptor();
13.td.setShardCount(16);
14.// Location in HDFS
15.td.setTableUri("hdfs://<;namenode>:<port>/blur/tables/test-table");
16.td.setAnalyzerDefinition(ad);
17. 
18.BlurTask blurTask = new BlurTask();
19.blurTask.setTableDescriptor(td);
20.blurTask.setSpinLockPath("/copy-locks");
21.blurTask.setZookeeperConnectionStr("localhost");
22.blurTask.setMaxNumberOfConcurrentCopies(10);
23. 
24.// The copy locks are used to throttle how many concurrent
25.// copies from the reducers are occuring at the same time.
26.// This is normally needed because the indexing cluster is
27.// typically larger in size than the blur cluster.
28. 
29.Job job = blurTask.configureJob(new Configuration());
30.job.setJarByClass(BlurExampleMapper.class);
31.job.setMapperClass(BlurExampleMapper.class);
32.job.setInputFormatClass(TextInputFormat.class);
33.job.setOutputFormatClass(TextOutputFormat.class);
34. 
35.FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
36.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1], "job-" + System.currentTimeMillis()));
37.System.exit(job.waitForCompletion(true) ? 0 : 1);
38.}
39. 
40.public static class BlurExampleMapper extends BlurMapper<LongWritable, Text> {
41.@Override
42.protected void map(LongWritable k, Text value, Context context) throwsIOException, InterruptedException {
43.// Reset record
44._record.clearColumns();
45. 
46.// Set row id
47._record.setRowId("rowid");
48. 
49.// Set record id
50._record.setRecordId("recordid");
51. 
52.// Set column family
53._record.setColumnFamily("cf1");
54. 
55._record.addColumn("name", "value");
56. 
57.// Set the key which is usual the rowid
58.byte[] bs = _record.getRowId().getBytes();
59._key.set(bs, 0, bs.length);
60.context.write(_key, _record);
61._recordCounter.increment(1);
62.context.progress();
63.}
64.}
65.}

Data Mutation Thrift Example

 

01.import static com.nearinfinity.blur.utils.BlurUtil.*;
02. 
03.public class ThriftMutationExample {
04.public static void main(String[] args) throws BlurException, TException, IOException {
05.final RowMutation mutation = newRowMutation("test-table", "rowid-1234",
06.newRecordMutation("column-family", "recordid-5678",
07.newColumn("columnname", "value")));
08. 
09.BlurClientManager.execute("controller1:40010", new BlurCommand<Void>() {
10.@Override
11.public Void call(Client client) throws BlurException, TException {
12.client.mutate(mutation);
13.return null;
14.}
15.});
16.}
17.}

 

 

 

 

搜索数据

 

任何Blur数据模型中的元素是通过正常的Lucene的语义检索:analyzers。Analyzers 定义在Blur表中。

 

标准Lucene查询语法是搜索 Blur 默认的方式。如果标准语法以外的任何需要,可以直接用Java对象创建一个Lucene的查询,并通过专家查询API提交他们。

 

行内的列家庭分组允许跨列家庭类似什么,你会得到一个内部联接两个表之间共享相同的键(或在rowid)。对于有多个列家族复杂的数据模型,这使得有一个非常强大的搜索能力。

 

下面的示例搜索“value”作为一个完整的全文检索。如果我想在列家族"famB"中的单个字段"colA"中搜索“value”,查询应该类似“famB.colA:value”。

 

01.public class ThriftSearchExample {
02. 
03.public static void main(String[] args) throws BlurException, TException, IOException {
04.BlurResults blurResults = BlurClientManager.execute("controller1:40010",new BlurCommand<BlurResults>() {
05.@Override
06.public BlurResults call(Client client) throws BlurException, TException {
07.BlurQuery blurQuery = new BlurQuery();
08.SimpleQuery simpleQuery = new SimpleQuery();
09.simpleQuery.setQueryStr("value");
10.blurQuery.setSimpleQuery(simpleQuery);
11.blurQuery.setSelector(new Selector());
12.return client.query("test-table", blurQuery);
13.}
14.});
15.for (BlurResult result : blurResults.getResults()) {
16.// do something with the result
17.}
18.}
19.}

 

 

 

读取数据

 

取数可以通过按行或按记录。通过指定的ROWID或recordid创建一个选择的对象,指定列家庭或你想返回的列。如果没有指定,返回整个行或记录。

 

01.public class ThriftFetchExample {
02.public static void main(String[] args) throws BlurException, TException, IOException {
03.Row row = BlurClientManager.execute("controller1:40010", newBlurCommand<Row>() {
04.@Override
05.public Row call(Client client) throws BlurException, TException {
06.Selector selector = new Selector();
07.selector.setRowId("rowid-1234");
08.FetchResult fetchRow = client.fetchRow("test-table", selector);
09.FetchRowResult rowResult = fetchRow.getRowResult();
10.return rowResult.getRow();
11.}
12.});
13.}
14.}

 

 

现状

 

Blur正接近它的第一个版本0.1,是相对稳定的。首次发布的候选应是在未来几周内可供下载。在此期间,可以在github上检查出来:

 

https://github.com/nearinfinity/blur

http://blur.io

来源:英文原文           中文编译:IT瘾       转载请保留原文链接

 

 

标签 : , ,



发表评论 发送引用通报