使用ElasticSearch完成百万级数据查询附近的人功能 - tianyaleixiaowu的专栏 - CSDN博客

标签: | 发表时间:2018-11-08 13:45 | 作者:
出处:https://blog.csdn.net
我们来看一下使用ElasticSearch完成大数据量查询附近的人功能,搜索N米范围的内的数据。

准备环境
本机测试使用了ElasticSearch最新版5.5.1,SpringBoot1.5.4,spring-data-ElasticSearch2.1.4.
新建Springboot项目,勾选ElasticSearch和web。
pom文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
 
<groupId>com.tianyalei</groupId>
<artifactId>elasticsearch</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
 
<name>elasticsearch</name>
<description>Demo project for Spring Boot</description>
 
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
 
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
 
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
 
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jna</groupId>
<artifactId>jna</artifactId>
<version>3.0.9</version>
</dependency>
</dependencies>
 
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
 
 
</project>
新建model类Person
package com.tianyalei.elasticsearch.model;
 
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.GeoPointField;
 
import java.io.Serializable;
 
/**
 * model类
 */
@Document(indexName="elastic_search_project",type="person",indexStoreType="fs",shards=5,replicas=1,refreshInterval="-1")
public class Person implements Serializable {
    @Id
    private int id;
 
    private String name;
 
    private String phone;
 
    /**
     * 地理位置经纬度
     * lat纬度,lon经度 "40.715,-74.011"
     * 如果用数组则相反[-73.983, 40.719]
     */
    @GeoPointField
    private String address;
 
    public int getId() {
        return id;
    }
 
    public void setId(int id) {
        this.id = id;
    }
 
    public String getName() {
        return name;
    }
 
    public void setName(String name) {
        this.name = name;
    }
 
    public String getPhone() {
        return phone;
    }
 
    public void setPhone(String phone) {
        this.phone = phone;
    }
 
    public String getAddress() {
        return address;
    }
 
    public void setAddress(String address) {
        this.address = address;
    }
}
我用address字段表示经纬度位置。注意,使用String[]和String分别来表示经纬度时是不同的,见注释。
import com.tianyalei.elasticsearch.model.Person;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
 
public interface PersonRepository extends ElasticsearchRepository<Person, Integer> {
 
}
看一下Service类,完成插入测试数据的功能,查询的功能我放在Controller里了,为了方便查看,正常是应该放在Service里
package com.tianyalei.elasticsearch.service;
 
import com.tianyalei.elasticsearch.model.Person;
import com.tianyalei.elasticsearch.repository.PersonRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.stereotype.Service;
 
import java.util.ArrayList;
import java.util.List;
 
@Service
public class PersonService {
    @Autowired
    PersonRepository personRepository;
    @Autowired
    ElasticsearchTemplate elasticsearchTemplate;
 
    private static final String PERSON_INDEX_NAME = "elastic_search_project";
    private static final String PERSON_INDEX_TYPE = "person";
 
    public Person add(Person person) {
        return personRepository.save(person);
    }
 
    public void bulkIndex(List<Person> personList) {
        int counter = 0;
        try {
            if (!elasticsearchTemplate.indexExists(PERSON_INDEX_NAME)) {
                elasticsearchTemplate.createIndex(PERSON_INDEX_TYPE);
            }
            List<IndexQuery> queries = new ArrayList<>();
            for (Person person : personList) {
                IndexQuery indexQuery = new IndexQuery();
                indexQuery.setId(person.getId() + "");
                indexQuery.setObject(person);
                indexQuery.setIndexName(PERSON_INDEX_NAME);
                indexQuery.setType(PERSON_INDEX_TYPE);
 
                //上面的那几步也可以使用IndexQueryBuilder来构建
                //IndexQuery index = new IndexQueryBuilder().withId(person.getId() + "").withObject(person).build();
 
                queries.add(indexQuery);
                if (counter % 500 == 0) {
                    elasticsearchTemplate.bulkIndex(queries);
                    queries.clear();
                    System.out.println("bulkIndex counter : " + counter);
                }
                counter++;
            }
            if (queries.size() > 0) {
                elasticsearchTemplate.bulkIndex(queries);
            }
            System.out.println("bulkIndex completed.");
        } catch (Exception e) {
            System.out.println("IndexerService.bulkIndex e;" + e.getMessage());
            throw e;
        }
    }
}
注意看bulkIndex方法,这个是批量插入数据用的,bulk也是ES官方推荐使用的批量插入数据的方法。这里是每逢500的整数倍就bulk插入一次。

package com.tianyalei.elasticsearch.controller;
 
import com.tianyalei.elasticsearch.model.Person;
import com.tianyalei.elasticsearch.service.PersonService;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.index.query.GeoDistanceQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.sort.GeoDistanceSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
 
@RestController
public class PersonController {
    @Autowired
    PersonService personService;
    @Autowired
    ElasticsearchTemplate elasticsearchTemplate;
 
    @GetMapping("/add")
    public Object add() {
        double lat = 39.929986;
        double lon = 116.395645;
 
        List<Person> personList = new ArrayList<>(900000);
        for (int i = 100000; i < 1000000; i++) {
            double max = 0.00001;
            double min = 0.000001;
            Random random = new Random();
            double s = random.nextDouble() % (max - min + 1) + max;
            DecimalFormat df = new DecimalFormat("######0.000000");
            // System.out.println(s);
            String lons = df.format(s + lon);
            String lats = df.format(s + lat);
            Double dlon = Double.valueOf(lons);
            Double dlat = Double.valueOf(lats);
 
            Person person = new Person();
            person.setId(i);
            person.setName("名字" + i);
            person.setPhone("电话" + i);
            person.setAddress(dlat + "," + dlon);
 
            personList.add(person);
        }
        personService.bulkIndex(personList);
 
//        SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(QueryBuilders.queryStringQuery("spring boot OR 书籍")).build();
//        List<Article> articles = elas、ticsearchTemplate.queryForList(se、archQuery, Article.class);
//        for (Article article : articles) {
//            System.out.println(article.toString());
//        }
 
        return "添加数据";
    }
 
    /**
     *
     geo_distance: 查找距离某个中心点距离在一定范围内的位置
     geo_bounding_box: 查找某个长方形区域内的位置
     geo_distance_range: 查找距离某个中心的距离在min和max之间的位置
     geo_polygon: 查找位于多边形内的地点。
     sort可以用来排序
     */
    @GetMapping("/query")
    public Object query() {
        double lat = 39.929986;
        double lon = 116.395645;
 
        Long nowTime = System.currentTimeMillis();
        //查询某经纬度100米范围内
        GeoDistanceQueryBuilder builder = QueryBuilders.geoDistanceQuery("address").point(lat, lon)
                .distance(100, DistanceUnit.METERS);
 
        GeoDistanceSortBuilder sortBuilder = SortBuilders.geoDistanceSort("address")
                .point(lat, lon)
                .unit(DistanceUnit.METERS)
                .order(SortOrder.ASC);
 
        Pageable pageable = new PageRequest(0, 50);
 
        NativeSearchQueryBuilder builder1 = new NativeSearchQueryBuilder().withFilter(builder).withSort(sortBuilder).withPageable(pageable);
        SearchQuery searchQuery = builder1.build();
 
        //queryForList默认是分页,走的是queryForPage,默认10个
        List<Person> personList = elasticsearchTemplate.queryForList(searchQuery, Person.class);
 
        System.out.println("耗时:" + (System.currentTimeMillis() - nowTime));
        return personList;
    }
}
看Controller类,在add方法中,我们插入90万条测试数据,随机产生不同的经纬度地址。
在查询方法中,我们构建了一个查询100米范围内、按照距离远近排序,分页每页50条的查询条件。如果不指明Pageable的话,ESTemplate的queryForList默认是10条,通过源码可以看到。
启动项目,先执行add,等待百万数据插入,大概几十秒。
然后执行查询,看一下结果。

第一次查询花费300多ms,再次查询后时间就大幅下降,到30ms左右,因为ES已经自动缓存到内存了。
可见,ES完成地理位置的查询还是非常快的。适用于查询附近的人、范围查询之类的功能。

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
后记,在后来的使用中,Elasticsearch2.3版本时,按上面的写法出现了geo类型无法索引的情况,进入es的为String,而不是标注的geofiled。在此记录一下解决方法,将String类型修改为GeoPoint,且是org.springframework.data.elasticsearch.core.geo.GeoPoint包下的。然后需要在创建index时,显式调用一下mapping方法,才能正确的映射为geofield。
如下
if (!elasticsearchTemplate.indexExists("abc")) {
elasticsearchTemplate.createIndex("abc");
elasticsearchTemplate.putMapping(Person.class);
}

--------------------- 

相关 [elasticsearch 百万 数据] 推荐:

使用ElasticSearch完成百万级数据查询附近的人功能 - tianyaleixiaowu的专栏 - CSDN博客

- -
我们来看一下使用ElasticSearch完成大数据量查询附近的人功能,搜索N米范围的内的数据. 本机测试使用了ElasticSearch最新版5.5.1,SpringBoot1.5.4,spring-data-ElasticSearch2.1.4.. 新建Springboot项目,勾选ElasticSearch和web.

ElasticSearch —修改ES数据

- -
ElasticSearch能够以接近实时的速度提供数据操作和搜索功能. 在默认情况下,从索引/更新/删除数据到出现在搜索结果之间,你可能会感受到有1秒的延迟时间(刷新间隔). 这是与SQL等其他平台的一个重要区别,这些平台在完成事务之后,它们的数据立即可用. 先前,我们已经知道如何索引一个单个的文档.

Elasticsearch 数据备份、迁移

- - 枯惠
在时候我们面临将Elasticsearch的数据进行迁移亦或是数据备份的场景,此时我们可以使用 elasticsearch-dump这个工具来实现:. mappings从production环境复制到staging环境. 数据从production环境复制到staging. template数据导出导入.

通过HBase Observer同步数据到ElasticSearch

- - SegmentFault 最新的文章
Observer希望解决的问题. HBase是一个分布式的存储体系,数据按照RowKey分成不同的Region,再分配给RegionServer管理. 但是RegionServer只承担了存储的功能,如果Region能拥有一部分的计算能力,从而实现一个HBase框架上的MapReduce,那HBase的操作性能将进一步提升.

Elasticsearch、Kibana数据导出实战

- -
以下两个导出问题来自Elastic中文社区. 问题1、kibana怎么导出查询数据. 问题2:elasticsearch数据导出. 就像数据库数据导出一样,elasticsearch可以么. 或者找到它磁盘上存放数据的位置,拷贝出来,放到另一个es服务器上或者转成自己要的数据格式. 实际业务实战中,大家或多或少的都会遇到导入、导出问题.

谈Elasticsearch下分布式存储的数据分布

- - IT瘾-geek
  对于一个分布式存储系统来说,数据是分散存储在多个节点上的. 如何让数据均衡的分布在不同节点上,来保证其高可用性. 所谓均衡,是指系统中每个节点的负载是均匀的,并且在发现有不均匀的情况或者有节点增加/删除时,能及时进行调整,保持均匀状态. 本文将探讨Elasticsearch的数据分布方法,文中所述的背景是Elasticsearch 5.5.

ElasticSearch 亿级数据检索深度优化

- - IT瘾-dev
数据平台已迭代三个版本,从头开始遇到很多常见的难题,终于有片段时间整理一些已完善的文档,在此分享以供所需朋友的实现参考,少走些弯路,在此篇幅中偏重于ES的优化,关于HBase,Hadoop的设计优化估计有很多文章可以参考,不再赘述. 在一业务系统中,部分表每天的数据量过亿,已按天分表,但业务上受限于按天查询,并且DB中只能保留3个月的数据(硬件高配),分库代价较高.

数据同步工具 Elasticsearch-datatran v6.2.9 发布

- - 开源中国-软件更新资讯
Elasticsearch版本兼容性:支持各种Elasticsearch版本(1.x,2.x,5.x,6.x,7.x,+)之间相互数据迁移. 数据同步改进:完善ip2region和geoip数据库热加载机制. Restclient改进:升级httpcliet组件版本到最新的官方版本4.5.13. Restclient改进:升级fastxml jackson databind版本2.9.10.8.

ElasticSearch 双数据中心建设在新网银行的实践

- - IT瘾-dev
本文公众号读者飞熊的投稿,本文主要讲述了ElasticSearch 双数据中心建设在新网银行的实践. 飞熊,目前就职于新网银行大数据中心,主要从事大数据实时计算和平台开发相关工作,对Flink ,Spark 以及ElasticSearch等大数据技术有浓厚兴趣和较深入的理解. 新网银行是作为西部首家互联网银行,一直践行依靠数据和技术驱动业务的发展理念.

什么?!Elasticsearch也可以用作向量数据库?

- -
本文将探讨如何在 Elasticsearch 中优化向量搜索,并提供了一个详细的教程. 一、Elasticsearch 神经搜索流程. Elasticsearch 从 2022 年 2 月发布的 8.0 版本开始,提供了基于向量的搜索和自然语言处理(NLP)功能. 下图清楚地展示了向量搜索引擎的工作原理.