ElasticSearch-2.0.0集群安装配置与API使用实践
ElasticSearch是基于全文搜索引擎库Lucene构建的分布式搜索引擎,我们可以直接使用ElasticSearch实现分布式搜索系统的搭建与使用,都知道,Lucene只是一个搜索框架,它提供了搜索引擎操作的基本API,如果要实现一个能够使用的搜索引擎系统,还需要自己基于Lucene的API去实现,工作量很大,而且还需要很好地掌握Lucene的底层实现原理。
ElasticSearch是一个完整的分布式搜索引擎系统,它的一些基本特性包括如下:
- 全文检索
- 提供插件机制,可以共享重用插件的功能
- 分布式文件存储
- 分布式实时索引和搜索
- 实时统计分析
- 可以横向扩展,支持大规模数据的搜索
- 简单易用的RESTful API
- 基于Replication实现了数据的高可用特性
- 与其他系统的集成
- 支持结构化和非结构化数据
- 灵活的Schema设计(Mappings)
- 支持多编程语言客户端
我个人感觉,ElasticSearch尽量屏蔽底层Lucene相关的技术细节,让你根本无从感觉底层Lucene相关的内容,这样你可以省去了了解Lucene 的成本,学习曲线比较平缓,不像Solr,如果想要构造负责的查询(Query),还是要对Lucene有所了解的。另外,在分布式设计方面,ElasticSearch更轻量一些,用起来更简单,而使用Solr的分布式分片功能需要使用SolrCloud,它基于ZooKeeper来实现配置管理,以及Replication功能,而且Solr需要使用Web容器来部署,相对来说有点复杂一些(我个人之前使用的SolrCloud版本大概是3.1~3.5左右,比较早,现在可能更加完善了)。
基本概念
我们熟悉一下ElasticSearch中涉及到的一些基本概念:
- 索引(Index)
索引(Index)是文档的集合,它是根据实际业务逻辑进行划分的,通常会把相对独立且具有相似结构或者性质的数据作为文档,放在一起,形成一个索引,比如,用户相关信息可以作为一个索引,交易相关信息也可应作为另一个索引。
- 类型(Type)
类型(Type)是索引内部的一个逻辑划分,在一个索引内部可以定义多个类型(Type),类型将一个索引在逻辑上划分为多个集合,每个类型包含多个属性(字段)。比如,我们基于手机客户端应用App,创建一个了用户相关信息的索引,然后再在这个索引内部定义多个类型:基本信息类型、设备信息类型、行为信息类型,基本信息类型中包含用户编号、证件号码、名称、手机号码、年龄、出生日期,设备信息类型包括设备类型、设备名称、App版本号、渠道来源、系统版本、IMEI、mac地址,用户行为信息包含用户编号、事件编号、事件类型、时间、浏览页面代码、地区编码,这样有3个类型在一个索引当中。ElasticSearch中类型,与HBase中列簇(Column Family)的概念很相似。
- 文档(Document)
文档(Document)是索引的基本单元,它与关系数据库中的一条记录相类似,包含了一组属性信息,同时包含一个唯一标识这一组属性值的ID,通过该ID可以更新一个文档,也可以删除一个文档。
- 分片(Shards)&副本(Replicas)
一个索引是很多文档的集合,将一个索引进行分割,分成多个片段(一个索引的子集),每一个片段称为一个分片(Shard),这样划分可以很好地管理索引,跨节点存储,为分布式存储于搜索提供了便利。副本(Replica)是为了保证一个分片(Shard)的可用性,冗余复制存储,当一个分片对应的数据无法读取时,可以读取其副本,正常提供搜索服务。
集群安装配置
ElasticSearch集群安装配置非常容易,安装可以执行如下命令行:
wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.0.0/elasticsearch-2.0.0.zip unzip elasticsearch-2.0.0.zip
拿出集群的一个节点的进行配置,修改配置文件config/elasticsearch.yml的内容,如下所示:
# ======================== Elasticsearch Configuration ========================= # # NOTE: Elasticsearch comes with reasonable defaults for most settings. # Before you set out to tweak and tune the configuration, make sure you # understand what are you trying to accomplish and the consequences. # # The primary way of configuring a node is via this file. This template lists # the most important settings you may want to configure for a production cluster. # # Please see the documentation for further information on configuration options: # <http://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration.html> # # ---------------------------------- Cluster ----------------------------------- # # Use a descriptive name for your cluster: # cluster.name: dw_search_engine # # ------------------------------------ Node ------------------------------------ # # Use a descriptive name for the node: # node.name: esnode-01 # # Add custom attributes to the node: # # node.rack: r1 # # ----------------------------------- Paths ------------------------------------ # # Path to directory where to store the data (separate multiple locations by comma): # path.data: /data/dw_search_storage # # Path to log files: # path.logs: /tmp/es/logs # # ----------------------------------- Memory ----------------------------------- # # Lock the memory on startup: # # bootstrap.mlockall: true # # Make sure that the `ES_HEAP_SIZE` environment variable is set to about half the memory # available on the system and that the owner of the process is allowed to use this limit. # # Elasticsearch performs poorly when the system is swapping the memory. # # ---------------------------------- Network ----------------------------------- # # Set the bind adress to a specific IP (IPv4 or IPv6): # network.host: 10.10.2.62 # # Set a custom port for HTTP: # http.port: 9200 # # For more information, see the documentation at: # <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html> # # ---------------------------------- Gateway ----------------------------------- # # Block initial recovery after a full cluster restart until N nodes are started: # # gateway.recover_after_nodes: 3 # # For more information, see the documentation at: # <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-gateway.html> # # --------------------------------- Discovery ---------------------------------- # # Elasticsearch nodes will find each other via unicast, by default. # # Pass an initial list of hosts to perform discovery when new node is started: # The default list of hosts is ["127.0.0.1", "[::1]"] # discovery.zen.ping.unicast.hosts: ["es-01", "es-02"] # # Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1): # # discovery.zen.minimum_master_nodes: 3 # # For more information, see the documentation at: # <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html> # # ---------------------------------- Various ----------------------------------- # # Disable starting multiple nodes on a single system: # # node.max_local_storage_nodes: 1 # # Require explicit names when deleting indices: # # action.destructive_requires_name: true
其它节点的配置,在保证基本存储目录相同的前提下,可以根据需要修改如下几个参数:
node.name network.host http.port
最后,在每个节点上分别启动ElasticSearch,执行如下命令:
cd elasticsearch-2.0.0 bin/elasticsearch -d
然后可以查看Web管理界面,需要安装插件elasticsearch-head,后面会介绍,Web管理界面,如下所示:
上图中,我们已经创建了一个索引,可以看到节点的状态,及其分片(Shard)的情况。
RESTful API基本操作
尤其是在进行搜索的时候,为了使得其他系统能够与ElasticSearch搜索系统很好地解耦合,使用ElasticSearch提供的RESTful API是一种不错的选择。下面,我们介绍RESTful API的基本操作。
- 插件管理
插件的存放目录为elasticsearch-2.0.0/plugins/,插件都是基于该存储目录进行操作的。
安装插件:
bin/plugin install analysis-icu bin/plugin install mobz/elasticsearch-head
可以从不同的位置安装插件,上面第一个称为Core Elasticsearch plugin,它是Elasticsearch提供的,会从Elasticsearch上下载并安装;上面第一个是从Github上自动下载安装。还有其他的方式安装,如从特定的文件系统等进行安装。
列出插件:
bin/plugin list
删除插件:
bin/plugin remove analysis-icu
安装完一个插件,我们可以查看,例如查看elasticsearch_head插件,查看如下链接:
http://10.10.2.62:9200/_plugin/head/
- 创建索引
curl -XPUT 'http://10.10.2.62:9200/basis_device_info/'
创建的索引名称为basis_device_info,我们也可以不指定一个索引对应的Mappings,而是在索引的时候自动生成Mappings,所以如果没有指定一个索引的Mappings,则这个索引可以支持任何的Mappings。同样可知,一个索引可以自动地增加不同的type,非常灵活。
也可以指定索引的基本配置,如分片(Shard)数目、副本(Replica)数目,如下所示:
curl -XPUT 'http://10.10.2.62:9200/basis_device_info /' -d '{ "settings" : { "index" : { "number_of_shards" : 10, "number_of_replicas" : 1 } } }'
默认是5个分片,不进行复制,上面配置表示索引basis_device_info有10个分片,每个分片1个副本。
下面在创建索引的时候,指定设计的schema,即配置mappings,如下所示:
curl -XPUT 'http://10.10.2.62:9200/basis_device_info/' -d ' { "mappings": { "user": { "_all": { "enabled": false }, "properties": { "installid": { "type": "string" }, "appid": { "type": "string" }, "channel": { "type": "string", "index": "analyzed" }, "version": { "type": "string" }, "osversion": { "type": "string" }, "device_name": { "type": "string", "index": "analyzed" }, "producer": { "type": "string" }, "device_type": { "type": "string" }, "resolution": { "type": "string", "index": "analyzed" }, "screen_size": { "type": "string", "index": "analyzed" }, "mac": { "type": "string", "index": "not_analyzed" }, "idfa": { "type": "string" }, "idfv": { "type": "string", "index": "not_analyzed" }, "imei": { "type": "string", "index": "not_analyzed" }, "create_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss", "index": "not_analyzed" } } } } }'
上面创建了索引basis_device_info,同时type为user,有了mappings,我们就知道需要索引的数据的格式了。
- 删除索引
curl -XDELETE 'http://10.10.2.62:9200/basis_device_info/'
删除索引basis_device_info。
- 索引文档
curl -PUT 'http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FA' -d '{ "installid": "0000000L", "appid": "0", "udid": "CC49E748588490D41BFB89584007B0FA", "channel": "wulei1", "version": "3.1.2", "osversion": "8.1", "device_name": "iPhone Retina4 Simulator", "producer": "apple", "device_type": "1", "resolution": "640*1136", "screen_size": "320*568", "mac": "600308A20C5E", "idfa": "dbbbs-fdsfa-fafda-321saf", "idfv": "4283FAE1-19EB-4FA9-B739-8148F76BC8C3", "imei": "af-sfd0fdsa-fad-ff", "create_time": "2015-01-14 20:32:05" }'
基于我们前面创建的type为user的索引,索引一个文档,文档_id为CC49E748588490D41BFB89584007B0FA,文档内容为一个用户设备信息,使用JSON格式表示。
- 批量索引
批量索引,可以根据自己熟悉的编程语言或者脚本来实现,ElasticSearch也提供了一些客户端库。下面我们首先根据数据文件,构造成ElasticSearch索引支持的JSON格式,导出文件,然后通过curl工具去进行批量索引,实际上使用的是ElasticSearch提供的bulk API来实现的。
首先处理原始带索引数据,代码如下所示:
package org.shirdrn.es; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.Closeable; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import net.sf.json.JSONObject; import com.google.common.base.Throwables; public class EsIndexingClient { public static void closeQuietly(Closeable... closeables) { if(closeables != null) { for(Closeable closeable : closeables) { try { closeable.close(); } catch (Exception e) { } } } } public static void main(String[] args) { String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt"; String out = "C:\\Users\\yanjun\\Desktop\\basis_device_info.json"; File in = new File(f); BufferedReader reader = null; BufferedWriter writer = null; try { writer = new BufferedWriter(new FileWriter(out)); reader = new BufferedReader(new FileReader(in.getAbsoluteFile())); String line = null; while((line = reader.readLine()) != null) { String[] a = line.split("\t", -1); if(a.length == 16) { String udid = a[2]; JSONObject c = new JSONObject(); c.put("_index", "basis_device_info"); c.put("_type", "user"); c.put("_id", udid); JSONObject index = new JSONObject(); index.put("index", c); JSONObject doc = new JSONObject(); doc.put("installid", a[0]); doc.put("appid", a[1]); doc.put("udid", a[2]); doc.put("channel", a[3]); doc.put("version", a[4]); doc.put("osversion", a[5]); doc.put("device_name", a[6]); doc.put("producer", a[7]); doc.put("device_type", a[8]); doc.put("resolution", a[9]); doc.put("screen_size", a[10]); doc.put("mac", a[11]); doc.put("idfa", a[12]); doc.put("idfv", a[13]); doc.put("imei", a[14]); doc.put("create_time", a[15]); writer.write(index.toString() + "\n"); writer.write(doc.toString() + "\n"); } } } catch (Exception e) { throw Throwables.propagate(e); } finally { closeQuietly(reader, writer); } } }
运行代码,输出的数据文件为basis_device_info.json,该文件的格式了,示例如下所示:
{"index":{"_index":"basis_device_info","_type":"user","_id":"1c207122a4b2c9632212ab86bac10f60"}} {"installid":"00000002","appid":"0","udid":"1c207122a4b2c9632212ab86bac10f60","channel":"itings","version":"3.1.1","osversion":"4.1.2","device_name":"Lenovo P770","producer":"Lenovo","device_type":"0","resolution":"540*960","screen_size":"4.59","mac":"d4:22:3f:83:17:06","idfa":"","idfv":"","imei":"861166023335745","create_time":"2015-01-14 19:39:35"} {"index":{"_index":"basis_device_info","_type":"user","_id":"FA6B1B98E6FF4E6994A1505A996F6102"}} {"installid":"00000003","appid":"0","udid":"FA6B1B98E6FF4E6994A1505A996F6102","channel":"appstore","version":"3.1.1","osversion":"8.1.2","device_name":"iPhone 6Plus","producer":"apple","device_type":"1","resolution":"640*1136","screen_size":"320*568","mac":"020000000000","idfa":"84018625-A3C9-47A8-88D0-C57C12F80520","idfv":"9D1E2514-9DC8-47A8-ABD0-129FC0FB3171","imei":"","create_time":"2015-01-14 19:41:21"} {"index":{"_index":"basis_device_info","_type":"user","_id":"8c5fe70b2408f184abcbe4f34b8f23c3"}} {"installid":"00000004","appid":"0","udid":"8c5fe70b2408f184abcbe4f34b8f23c3","channel":"itings","version":"3.1.1.014","osversion":"4.2.2","device_name":"2014011","producer":"Xiaomi","device_type":"0","resolution":"720*1280","screen_size":"4.59","mac":"0c:1d:af:4f:48:9f","idfa":"","idfv":"","imei":"865763025472173","create_time":"2015-01-14 19:46:37"}
奇数编号行的内容为索引的指令信息,包括索引名称(_index)、类型(_type)、唯一标识(_id),偶数编号行的内容为实际待索引的文档数据。
然后,通过curl命令来进行批量索引,执行如下命令:
curl -s -XPOST http://10.10.2.62:9200/basis_device_info/_bulk --data-binary "@basis_device_info.json"
- 搜索文档
简单的搜索,可以通过GET方式搜索,如下所示:
http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FA http://10.10.2.62:9200/basis_device_info/user/_search?q=channel:B-hicloud
上面第一个根据唯一的_id进行搜索,结果返回0个或者1个文档;第二个通过指定GET方式参数,其中_search和q是ElasticSearch内置的接口关键字,通过指定字段名称和搜索关键词的方式进行搜索,结果以JSON格式返回。
- Request Body搜索
可以设置请求的body内容,能够支持更加复杂的查询条件然后请求搜索,如下所示:
curl -XGET 'http://10.10.2.245:9200/basis_device_info/user/_search' -d '{ "query" : { "term" : { "udid": "bc0af2ca66a96725b8b0e0056d4213b6" } } }'
结果示例,如下所示:
{"took":11,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":9.45967,"hits":[{"_index":"basis_device_info","_type":"user","_id":"bc0af2ca66a96725b8b0e0056d4213b6","_score":9.45967,"_source":{"installid":"00000FPq","appid":"0","udid":"bc0af2ca66a96725b8b0e0056d4213b6","channel":"B-hicloud","version":"3.1.1","osversion":"4.4.2","device_name":"H60-L02","producer":"HUAWEI","device_type":"0","resolution":"720*1184","screen_size":"4.64","mac":"ec:cb:30:c4:93:e3","idfa":"","idfv":"","imei":"864103021536104","create_time":"2015-01-18 01:29:16"}}]}}
- 基于Lucene查询语法搜索
如果熟悉Lucene查询(Query),可以构造通过构造复杂的Term关系字符串来进行搜索,示例如下所示:
curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d ' { "query": { "query_string": { "query": "(channel:baidu OR device_name:HUAWEI)" } } }'
查询query字符串的含义是:从channel字段搜索baidu,从device_name字段搜索HUAWEI,然后两者取并集,这实际上一个布尔查询,返回最终结果。
- 使用multi_match搜索
ElasticSearch支持给定搜索关键词,从多个字段中进行搜索,示例如下所示:
curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d ' { "query": { "multi_match" : { "query": "HTC", "fields": [ "channel", "device_name" ] } } }'
这样,只要在channel和device_name两个字段中出现关键词HTC,则都返回结果,结果应该是两个字段匹配上的文档集合的并集。
- 支持Filter搜索
可以在制定Filter进行搜索。例如下面是一个按照时间范围进行过滤,得到搜索结果的查询:
curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d ' { "query": { "filtered": { "query": { "match_all": {} }, "filter" : { "range" : { "create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" } } } } } }'
- 分页搜索
ElasticSearch支持分页搜索,可以通过在RESTful连接中指定size和from参数,来进行分页搜索,如下所示:
curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search?size=10&from=20' -d ' { "query": { "filtered": { "query": { "match_all": {} }, "filter" : { "range" : { "create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" } } } } } }'
上面搜索的含义是:按照时间范围搜索,从第20个文档开始,返回10个文档,相当于一页取10个文档。
Java客户端
如果熟悉Java语言,而不想使用脚本等其他方式操作ElasticSearch搜索集群,则可以使用ElasticSearch提供的Java客户端API来编码实现,能够更加灵活地控制。ElasticSearch提供的Java客户端支持全部常用操作,如更新索引、索引文档、搜索文档、删除索引等等操作,而且还支持其他一些功能,如同步异步模式、explain查询等,下面我们通过代码来了解一下。
如果使用Maven管理Java代码,可以在pom.xml文件中加入如下依赖:
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>2.0.0</version> </dependency>
创建一个ElasticSearch客户端,代码如下所示:
// create & configure client Settings settings = Settings.settingsBuilder() .put("cluster.name", "dw_search_engine") .put("client.transport.sniff", true) .build(); final Client client = TransportClient.builder().settings(settings).build() .addTransportAddress(newAddress("es-01", 9300)) .addTransportAddress(newAddress("es-02", 9300));
可以将你的ElasticSearch集群的节点通过上面的addTransportAddress方法,都与Client对象关联起来,这样在操作ElasticSearch集群中的索引/更新/删除/搜索文档的时候,就能够自动感知。上面newAddress方法如下:
private static InetSocketTransportAddress newAddress(String host, int port) throws UnknownHostException { return new InetSocketTransportAddress(InetAddress.getByName(host), port); }
另外,也可以通过在配置文件elasticsearch.yml中指定相关配置,例如:
cluster.name: dw_search_engine client.transport.sniff: true client.transport.ping_timeout: 10s client.transport.nodes_sampler_interval: 10s
那么,创建客户端需要从配置文件中读取配置内容,具体可以查看官方文档。
- 准备工作
索引的时候,我们是从一个本地文件中读取数据,并构建索引文档需要的格式,然后请求ElasticSearch集群执行索引操作,下面代码是一些基本准备工作:
final String index = "basis_device_info"; final String type = "user"; // index documents String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt"; File in = new File(f);
从文件中,每次读取一行记录,然后构建一个JSON格式字符串,通过XContentBuilder来表示,代码如下所示:
protected static XContentBuilder createSource(String[] a) throws IOException { return jsonBuilder() .startObject() .field("installid", a[0]) .field("appid", a[1]) .field("udid", a[2]) .field("channel", a[3]) .field("version", a[4]) .field("osversion", a[5]) .field("device_name", a[6]) .field("producer", a[7]) .field("device_type", a[8]) .field("resolution", a[9]) .field("screen_size", a[10]) .field("mac", a[11]) .field("idfa", a[12]) .field("idfv", a[13]) .field("imei", a[14]) .field("create_time", a[15]) .endObject(); }
下面我们从API的功能入手,分别详细说明,并附加代码展示用法。
- 创建索引
可以直接通过Java客户端库来创建索引,代码如下所示:
protected static void createIndex(final Client client, String index) { Map<String, Object> indexSettings = Maps.newHashMap(); indexSettings.put("number_of_shards", "4"); indexSettings.put("number_of_replicas", "1"); CreateIndexRequest createIndexRequest = new CreateIndexRequest( index, Settings.settingsBuilder().put(indexSettings).build()); CreateIndexResponse createIndexResponse = client.admin().indices().create(createIndexRequest).actionGet(); System.out.println(createIndexResponse); }
- 创建Mappings
通过Java客户端创建Mappings,相对比较复杂一点,需要拼接对应的JSON字符串,实现代码如下所示:
protected static void createMappings(final Client client, String index) throws IOException, InterruptedException, ExecutionException { XContentBuilder basisInfoMapping = jsonBuilder() .startObject() .startObject("_all") .field("enabled", "false") .endObject() .startObject("properties") .startObject("id") .field("type", "string") .endObject() .startObject("name") .field("type", "string") .field("index", "analyzed") .endObject() .startObject("age") .field("type", "int") .endObject() .startObject("birthday") .field("type", "date") .field("format", "yyyy-MM-dd HH:mm:ss") .field("index", "not_analyzed") .endObject() .endObject() .endObject(); XContentBuilder deviceInfoMapping = jsonBuilder() .startObject() .startObject("_all") .field("enabled", "false") .endObject() .startObject("properties") .startObject("udid") .field("type", "string") .endObject() .startObject("device_name") .field("type", "string") .field("index", "analyzed") .endObject() .startObject("privoder") .field("type", "string") .field("index", "analyzed") .endObject() .startObject("os_version") .field("type", "string") .endObject() .endObject() .endObject(); PutMappingRequest putMappingRequest = Requests.putMappingRequest(index) .type("basic_info") .source(basisInfoMapping) .type("device_info") .source(deviceInfoMapping); System.out.println(putMappingRequest.indicesOptions()); PutMappingResponse putMappingResponse = client.admin().indices().putMapping(putMappingRequest).get(); System.out.println(putMappingResponse); }
上面代码创建了一个名称为app_user_info的索引,该索引具有basic_info和device_info这2个type,可以通过elasticsearch_head插件,在Web管理页面上查看对应的索引信息。
- 索引单个文档
从文件中读取数据,一条记录构造一个文档,然后执行索引,代码如下所示:
protected static void indexDocs(final Client client, final String index, final String type, File in) { BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(in.getAbsoluteFile())); String line = null; while((line = reader.readLine()) != null) { String[] a = line.split("\t", -1); if(a.length == 16) { String udid = a[2]; IndexResponse response = client .prepareIndex(index, type, udid) .setSource(createSource(a)) .get(); System.out.println(response.toString()); } } } catch (Exception e) { throw Throwables.propagate(e); } finally { closeQuietly(reader); } }
- 批量索引
批量索引有多种方式,首先,通过Bulk API进行索引,我们自己控制每一个batch的大小,代码如下所示:
protected static void indexBulk(final Client client, final String index, final String type, File in) { BulkRequestBuilder bulkRequest = client.prepareBulk(); final int batchSize = 100; int counter = 0; BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(in.getAbsoluteFile())); String line = null; while((line = reader.readLine()) != null) { String[] a = line.split("\t", -1); if(a.length == 16) { String udid = a[2]; IndexRequestBuilder indexRequestBuilder = client .prepareIndex(index, type, udid) .setSource(createSource(a)); bulkRequest.add(indexRequestBuilder); if(++counter >= batchSize) { System.out.println(!bulkRequest.get().hasFailures()); counter = 0; bulkRequest = client.prepareBulk(); } } } } catch (Exception e) { throw Throwables.propagate(e); } finally { System.out.println(!bulkRequest.get().hasFailures()); closeQuietly(reader); } }
另一种方式,是根据ElasticSearch提供的Bulk Processor来实现,只需要设置相关参数,就可以实现批量索引,这种方式更加灵活,示例如下所示:
protected static void indexUsingBulkProcessor(final Client client, final String index, final String type, File in) throws InterruptedException { String name = "device_info_processor"; int bulkActions = 1000; ByteSizeValue bulkSize = new ByteSizeValue(100, ByteSizeUnit.MB); TimeValue flushInterval = TimeValue.timeValueSeconds(60); int concurrentRequests = 12; // create bulk processor final BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { public void afterBulk(long id, BulkRequest req, BulkResponse resp) { System.out.println("id=" + id + ", resp=" + resp); } public void afterBulk(long id, BulkRequest req, Throwable cause) { System.out.println("id=" + id + ", req=" + req + ", cause=" + cause); } public void beforeBulk(long id, BulkRequest req) { System.out.println("id=" + id + ", req=" + req); } }) .setName(name) .setBulkActions(bulkActions) .setBulkSize(bulkSize) .setFlushInterval(flushInterval) .setConcurrentRequests(concurrentRequests) .build(); // index documents BufferedReader reader = null; try { reader = new BufferedReader(new FileReader(in.getAbsoluteFile())); String line = null; while((line = reader.readLine()) != null) { String[] a = line.split("\t", -1); if(a.length == 16) { String udid = a[2]; bulkProcessor.add(new IndexRequest(index, type, udid).source(createSource(a))); } } } catch (Exception e) { throw Throwables.propagate(e); } finally { closeQuietly(reader); // close bulk processor bulkProcessor.awaitClose(60, TimeUnit.SECONDS); } }
可以通过实现自定义的BulkProcessor.Listener,它提供了Hook的功能,比如,索引某个文档失败的话,可以在Hook方法中增加处理,实现重试的功能;再比如,如果索引成功,给其他系统服务一个回调,等等。
- 更新文档
更新文档中的某些字段,需要指定id的值,以及需要更新的字段的值,代码如下所示:
protected static void updateDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException { String id = "60e90ddcb1a61622028b8d92112a646c"; UpdateRequest updateRequest = new UpdateRequest(index, type, id); updateRequest.doc(jsonBuilder() .startObject() .field("channel", "h-google") .field("appid", "1") .endObject()); UpdateResponse response = client.update(updateRequest).get(); System.out.println(response); }
如果更新文档的时候,文档不存在,则需要先执行索引操作,再进行更新操作,将这两个操作合并到一起,使用upsert操作,代码如下所示:
protected static void upsertDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException { String id = "fdd5ff7f56b613f0acb2c20a1ebc35e4"; IndexRequest indexRequest = new IndexRequest(index, type, id).source(jsonBuilder() .startObject() .field("installid", "00000BSe") .field("appid", "0") .field("udid", "fdd5ff7f56b613f0acb2c20a1ebc35e4") .field("channel", "A-wandoujia") .field("version", "3.1.1") .field("resolution", "960*540") .field("mac", "00:08:22:be:1b:b7") .field("device_type", "0") .field("device_name", "HTC") .field("producer", "alps") .field("create_time", "2015-01-17 17:15:36") .endObject()); UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(jsonBuilder() .startObject() .field("resolution", "540*960") .field("channel", "h-baidu") .field("version", "3.1.1") .field("imei", "861622010000056") .endObject()) .upsert(indexRequest); UpdateResponse response = client.update(updateRequest).get(); System.out.println(response); }
- 删除文档
删除文档,需要指定文档的id的值,代码如下所示:
protected static void deleteDoc(final Client client, final String index, final String type) { String id = "60e90ddcb1a61622028b8d92112a646c"; DeleteResponse response = client.prepareDelete(index, type, id).get(); System.out.println(response); }
- 搜索文档
搜索文档,可以根据需要构造指定的查询(Query),可以设置过滤器等等,然后提交搜索,示例代码如下所示:
protected static void searchDocs(final Client client, final String index, final String type) { SearchResponse response = client .prepareSearch(index) .setTypes(type) .setQuery(QueryBuilders.termQuery("device_name", "xiaomi")) .setPostFilter(QueryBuilders.rangeQuery("create_time").from("2015-01-16 00:00:00").to("2015-01-16 23:59:59")) .setFrom(30).setSize(10).setExplain(true) .execute() .actionGet(); System.out.println(response); }
查询(Query)的构造有很多的方式,比如构造布尔查询,指定与、或、非关系,然后提交搜索。执行搜索,可以设置搜索文档的起始偏移位置以及每次取多少个结果文档,这便能实现分页功能。
其他话题
ElasticSearch最经典的软件栈组合就是ELK(ElasticSearch Logstash Kibana),其中ElasticSearch提供了实时查询分析数据的功能,是一个非常通用的搜索引擎系统,而Logstash是一个日志管理工具,能够收集日志,对日志进行管理,Kibana是一个基于页面的前端展示工具,非常方便地使ElasticSearch中的数据可视化,具体使用起来如何,如果感兴趣可以尝试一下。
另外,ElasticSearch也被好多开源大数据系统所拥抱,比如Cloudera的CDH也整合了ElasticSearch作为搜索系统,ElasticSearch也可以和其他系统,如Hadoop、HBase等进行整合,使用领域比较广泛。
参考链接
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/index.html
- https://www.elastic.co/downloads/elasticsearch
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/_basic_concepts.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/setup.html
- https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/index.html
- https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/installation.html
- https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/analysis-icu.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices-create-index.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices-delete-index.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/index-modules.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/mapping.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/mapping-params.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-search.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-uri-request.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-request-body.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/docs.html
- https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/integrations.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/docs-bulk.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/modules-discovery.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/misc-cluster.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-filter-context.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-match-all-query.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-dis-max-query.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-bool-query.html
- https://www.elastic.co/guide/en/elasticsearch/reference/2.0/full-text-queries.html
- https://github.com/mobz/elasticsearch-head
- https://www.elastic.co/blog/found-java-clients-for-elasticsearch
- https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/client.html
- https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/node-client.html
- https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/java-docs.html
- https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/java-docs-bulk-processor.html