ElasticSearch的JAVA客户端操作

标签: elasticsearch java 客户端 | 发表时间:2016-12-24 09:57 | 作者:diegolau
出处:http://www.iteye.com

一、java操作ES有两种客户端:

1、TransportClient:轻量级的Client,使用Netty线程池,Socket连接到ES集群。本身不加入到集群,只作为请求的处理。

2、Node Client:客户端节点本身也是ES节点,加入到集群,和其他ElasticSearch节点一样。频繁的开启和关闭这类Node Clients会在集群中产生“噪音”。

 

二、TransportClient的基本使用

1、创建Client

public ElasticSearchService(String ipAddress, int port) {

        client = new TransportClient()

                .addTransportAddress(new InetSocketTransportAddress(ipAddress,

                        port));

 

    }

 

2、创建/删除Index和Type信息

// 创建索引

    public void createIndex() {

        client.admin().indices().create(new CreateIndexRequest(IndexName))

                .actionGet();

    }

 

    // 清除所有索引

    public void deleteIndex() {

        IndicesExistsResponse indicesExistsResponse = client.admin().indices()

                .exists(new IndicesExistsRequest(new String[] { IndexName }))

                .actionGet();

        if (indicesExistsResponse.isExists()) {

            client.admin().indices().delete(new DeleteIndexRequest(IndexName))

                    .actionGet();

        }

    }

    

    // 删除Index下的某个Type

    public void deleteType(){

        client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet();

    }

 

    // 定义索引的映射类型

    public void defineIndexTypeMapping() {

        try {

            XContentBuilder mapBuilder = XContentFactory.jsonBuilder();

            mapBuilder.startObject()

            .startObject(TypeName)

                .startObject("properties")

                    .startObject(IDFieldName).field("type", "long").field("store", "yes").endObject()

                    .startObject(SeqNumFieldName).field("type", "long").field("store", "yes").endObject()

                    .startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()

                    .startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()

                    .startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()

                    .startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()

                    .startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()

                    .startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject()

                .endObject()

            .endObject()

            .endObject();

 

            PutMappingRequest putMappingRequest = Requests

                    .putMappingRequest(IndexName).type(TypeName)

                    .source(mapBuilder);

            client.admin().indices().putMapping(putMappingRequest).actionGet();

        } catch (IOException e) {

            log.error(e.toString());

        }

 

    }

这里自定义了某个Type的索引映射(Mapping),默认ES会自动处理数据类型的映射:针对整型映射为long,浮点数为double,字符串映射为string,时间为date,true或false为boolean。

 

注意:针对字符串,ES默认会做“analyzed”处理,即先做分词、去掉stop words等处理再index。如果你需要把一个字符串做为整体被索引到,需要把这个字段这样设置:field("index", "not_analyzed")。

 

详情参考:https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html

 

 

3、索引数据

// 批量索引数据

    public void indexHotSpotDataList(List<Hotspotdata> dataList) {

        if (dataList != null) {

            int size = dataList.size();

            if (size > 0) {

                BulkRequestBuilder bulkRequest = client.prepareBulk();

                for (int i = 0; i < size; ++i) {

                    Hotspotdata data = dataList.get(i);

                    String jsonSource = getIndexDataFromHotspotData(data);

                    if (jsonSource != null) {

                        bulkRequest.add(client

                                .prepareIndex(IndexName, TypeName,

                                        data.getId().toString())

                                .setRefresh(true).setSource(jsonSource));

                    }

                }

 

                BulkResponse bulkResponse = bulkRequest.execute().actionGet();

                if (bulkResponse.hasFailures()) {

                    Iterator<BulkItemResponse> iter = bulkResponse.iterator();

                    while (iter.hasNext()) {

                        BulkItemResponse itemResponse = iter.next();

                        if (itemResponse.isFailed()) {

                            log.error(itemResponse.getFailureMessage());

                        }

                    }

                }

            }

        }

    }

 

    // 索引数据

    public boolean indexHotspotData(Hotspotdata data) {

        String jsonSource = getIndexDataFromHotspotData(data);

        if (jsonSource != null) {

            IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName,

                    TypeName).setRefresh(true);

            requestBuilder.setSource(jsonSource)

                    .execute().actionGet();

            return true;

        }

 

        return false;

    }

 

    // 得到索引字符串

    public String getIndexDataFromHotspotData(Hotspotdata data) {

        String jsonString = null;

        if (data != null) {

            try {

                XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();

                jsonBuilder.startObject().field(IDFieldName, data.getId())

                        .field(SeqNumFieldName, data.getSeqNum())

                        .field(IMSIFieldName, data.getImsi())

                        .field(IMEIFieldName, data.getImei())

                        .field(DeviceIDFieldName, data.getDeviceID())

                        .field(OwnAreaFieldName, data.getOwnArea())

                        .field(TeleOperFieldName, data.getTeleOper())

                        .field(TimeFieldName, data.getCollectTime())

                        .endObject();

                jsonString = jsonBuilder.string();

            } catch (IOException e) {

                log.equals(e);

            }

        }

 

        return jsonString;

 

    }

ES支持批量和单个数据索引。

 

4、查询获取数据

// 获取少量数据100个

    private List<Integer> getSearchData(QueryBuilder queryBuilder) {

        List<Integer> ids = new ArrayList<>();

        SearchResponse searchResponse = client.prepareSearch(IndexName)

                .setTypes(TypeName).setQuery(queryBuilder).setSize(100)

                .execute().actionGet();

        SearchHits searchHits = searchResponse.getHits();

        for (SearchHit searchHit : searchHits) {

            Integer id = (Integer) searchHit.getSource().get("id");

            ids.add(id);

        }

        return ids;

    }

 

    // 获取大量数据

    private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) {

        List<Integer> ids = new ArrayList<>();

        // 一次获取100000数据

        SearchResponse scrollResp = client.prepareSearch(IndexName)

                .setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000))

                .setQuery(queryBuilder).setSize(100000).execute().actionGet();

        while (true) {

            for (SearchHit searchHit : scrollResp.getHits().getHits()) {

                Integer id = (Integer) searchHit.getSource().get(IDFieldName);

                ids.add(id);

            }

            scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())

                    .setScroll(new TimeValue(600000)).execute().actionGet();

            if (scrollResp.getHits().getHits().length == 0) {

                break;

            }

        }

 

        return ids;

 

    }

这里的QueryBuilder是一个查询条件,ES支持分页查询获取数据,也可以一次性获取大量数据,需要使用Scroll Search。

5、聚合(Aggregation Facet)查询 

// 得到某段时间内设备列表上每个设备的数据分布情况<设备ID,数量>

    public Map<String, String> getDeviceDistributedInfo(String startTime,

            String endTime, List<String> deviceList) {

 

        Map<String, String> resultsMap = new HashMap<>();

 

        QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList);

        QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime);

        QueryBuilder queryBuilder = QueryBuilders.boolQuery()

                .must(deviceQueryBuilder).must(rangeBuilder);

 

        TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg").size(Integer.MAX_VALUE)

                .field(DeviceIDFieldName);

        SearchResponse searchResponse = client.prepareSearch(IndexName)

                .setQuery(queryBuilder).addAggregation(termsBuilder)

                .execute().actionGet();

        Terms terms = searchResponse.getAggregations().get("DeviceIDAgg");

        if (terms != null) {

            for (Terms.Bucket entry : terms.getBuckets()) {

                resultsMap.put(entry.getKey(),

                        String.valueOf(entry.getDocCount()));

            }

        }

        return resultsMap;

 

    }

Aggregation查询可以查询类似统计分析这样的功能:如某个月的数据分布情况,某类数据的最大、最小、总和、平均值等。

详情参考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html

 

 

本文参考:http://www.cnblogs.com/luxiaoxun/p/4869509.html



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [elasticsearch java 客户端] 推荐:

ElasticSearch的JAVA客户端操作

- - 编程语言 - ITeye博客
一、java操作ES有两种客户端:. 1、TransportClient:轻量级的Client,使用Netty线程池,Socket连接到ES集群. 本身不加入到集群,只作为请求的处理. 2、Node Client:客户端节点本身也是ES节点,加入到集群,和其他ElasticSearch节点一样. 频繁的开启和关闭这类Node Clients会在集群中产生“噪音”.

Elasticsearch:在 Java 客户端中使用 truststore 来创建 HTTPS 连接

- - 掘金 后端
在我之前的文章 “ Elasticsearch:使用 Elasticsearch Java client 8.0 来连接带有 HTTPS 的集群” 里,我使用了两种方法,一直是使用 token 的方式,另外一种是使用 PEM 证书来进行连接的. 在实际的使用中,有时我们的证书是以 PCKS12 格式的 truststore 格式的,那么我们该如何建立这种 HTTPS 的连接呢.

elasticsearch java API------批量添加索引

- - 行业应用 - ITeye博客
elasticsearch java API------批量添加索引.         person.setName("张三" + i);  .         person.setSex("男");  .         String index = "user"; // 相当于数据库名  .         String type = "tb_person"; // 相当于表名  .

elasticsearch RESTful搜索引擎-(java jest 使用[入门])

- - zzm
elasticsearch简称ES. 好吧下面我介绍下jest(第三方工具),个人认为还是非常不错的...想对ES用来更好,多多研究源代码吧...迟点,会写一些关于ES的源代码研究文章,现在暂时还是入门的阶段.哈..(不敢,不敢). 它是ES的java客户端,基于http restful.... jest是开源的....其他就不清楚了,看源代码吧..哈..

[转]Elasticsearch Java虚拟机配置详解(转)

- - 云端分布式搜索技术
Java6(Mustang),是2006年早些时候出来的,至今仍然应用在众多生产环境中,现在终于走到了尽头. 已经没有什么理由阻止迁移到Java7(Dolphin)上了. 这也促使我想写一篇关于在ElasticSearch上配置Java6和7的细微差异的博文. Elasticsearch对Java虚拟机进行了预先的配置.

Elasticsearch过滤与聚合的先后顺序java实现

- - 开源软件 - ITeye博客
一、Elasticsearch的聚合. ES的聚合相当于关系型数据库里面的group by,例如查找在性别字段男女人数的多少并且按照人数的多少进行排序,在使用mysql的时候,可以使用如下的句子. 在ES里面想要实现这种的语句,就叫做聚合,比如这种的聚合使用DSL语句的话如下所示:. 这样就可以实现最以上例子中的group by的功能,当然这只是最简单的聚合的使用,在ES里面的聚合有多重多样的,比如说有度量聚合,可以用来计算某一个字段的平均值最大值等,在此给出一个简单的度量聚合的例子.

JAVA的Corba客户端 - 服务端 通信实现

- - 博客园_首页
先在Eclipse建立两个Java Project. **这里注意, } 后面的 ";"一定要写上,不然会编译错误. 右键这个HelloApp.idl 文件. 点击,Change to parent folder in Shell. 点击OK , Eclipse会打开Shell视图,是命令行视图,其实和CMD是一个作用..

Windows7+Eclipse环境下Hbase Java客户端的开发

- - zzm
Centos 下Hbase0.98.10-hadoop2 集群的配置. 在Eclipse中创建Maven的工程. 将集群的hbase-site.xml文件放到工程的classes目录下. C:\windows\system32\drivers\etc文件,将Hbase集群的IP以及域名配置到该文件中.

java实现JT809协议数据对接客户端

- - Java - 编程语言 - ITeye博客
最近使用JT809协议进行数据对接,遇到了不少问题,度娘谷歌都不好使,找不到很好的客户端实现代码的例子,只能苦逼的自己闷头弄,现在特意写篇帖子,希望能帮助一些人. 背景:跟某公司做数据对接,将本公司的一些信息推送到接收端. 要求:建立tcp链接,使用接收端提供的用户名密码等信息 先登录,登录成功后推送数据,数据采用JT809标准协议.

[原]Jaeger的客户端采样配置(Java版)

- - 程序员欣宸的博客
这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos. 采样很好理解:使用Jaeger时,未必需要将所有请求都上报到Jaeger,有时候只要抽取其中一部分观察即可,这就是按照一定策略进行采样;. Jaeger采样配置分为客户端和服务端两种配置,默认用的是服务端配置.