如何在 Elasticsearch 中查找并移除重复文档 | Elastic Blog

标签: | 发表时间:2020-06-01 13:58 | 作者:
出处:https://www.elastic.co

将数据导入 Elasticsearch 的很多系统都将利用 Elasticsearch 为新插入的文档自动生成 ID 值。但是,如果数据源将同一文档多次意外发送到 Elasticsearch,并且对于 Elasticsearch 插入的每个文档都使用了这种自动生成的 _id值,那么这个文档就会使用不同的 _id值在 Elasticsearch 中存储多次。如果发生此情况,可能就需要查找并移除此类重复文档。因此,在本篇博文中,我们将介绍如何通过以下两种方法从 Elasticsearch 中检测并移除重复文档:(1) 使用 Logstash;(2) 使用以 Python 语言编写的定制代码。

示例文档结构

就本篇博文而言,我们假设 Elasticsearch 集群中的文档具有以下结构。此结构对应的数据集包含多个表示股市交易的文档。

{
      "_index": "stocks",
      "_type": "doc",
      "_id":"6fo3tmMB_ieLOlkwYclP",
      "_version":1,
      "found": true,
      "_source": {
        "CAC":1854.6,
        "host":"Alexanders-MBP",
        "SMI":2061.7,
        "@timestamp":"2017-01-09T02:30:00.000Z",
        "FTSE":2827.5,
        "DAX":1527.06,
        "time":"1483929000",
        "message":"1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version":"1"
      }
    }

鉴于这个示例文档结构,在本篇博文中,我们主观假设:如果多个文档的 ["CAC", "FTSE", "SMI"]字段具有相同的值,则它们相互重复。

使用 Logstash 删除重复的 Elasticsearch 文档

Logstash 可用于从 Elasticsearch 索引中检测并移除重复文档。有关此项技术的描述,请参阅 介绍如何使用 Logstash 处理重复文档的博文,这里我们将仅演示一个应用此方法的具体示例。

在下面的示例中,我编写了一个简单的 Logstash 配置,从 Elasticsearch 集群上的索引中读取文档,然后使用 指纹筛选器根据 ["CAC", "FTSE", "SMI"]字段的哈希为每个文档计算一个唯一的 _id值,最后将每个文档写回到同一 Elasticsearch 集群上的新索引,这样重复的文档将被写入同一 _id,并随之被删除。

此外,只需稍加修改,同样的 Logstash 筛选器还可应用于写入新建索引的未来文档,以确保几乎实时地移除重复文档。为实现这一目的,可以更改以下示例中的输入部分,以接受来自实时输入源的文档,而不是从现有索引中拉取文档。

请注意,使用定制 _id值(即不是由 Elasticsearch 生成的 _id)将会对索引操作的 写入性能造成一定影响

另外值得注意的是,在理论上,根据所使用的哈希算法,这个方法可能会导致 _id值的 哈希冲突数不为零,这可能会使两个不同的文档被映射到同一 _id,从而导致其中一个文档丢失。对于大部分实践用例,发生哈希冲突的可能性极低。对不同哈希函数的详细分析不在本博文讨论范围之内,但应仔细考虑指纹筛选器中使用的哈希函数,因其对采集性能和哈希冲突数都将有影响。

下面介绍了一个使用指纹筛选器对现有索引进行重复文档删除的简单 Logstash 配置。

input {
  # Read all documents from Elasticsearch 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}
# This filter has been updated on February 18, 2019
filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
        concatenate_sources => true # <-- New line added since original post date
    }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

用于删除 Elasticsearch 重复文档的定制 Python 脚本

一种节省内存的方法

如果不使用 Logstash,则可以使用定制 Python 脚本有效地完成重复文档删除。对于这种方法,我们将计算定义为唯一标识文档的 ["CAC", "FTSE", "SMI"]字段的哈希。然后,我们将此哈希用作 Python 字典中的一个键,其中每个字典条目的关联值将是映射到同一哈希的文档 _ids的数组。

如果多个文档具有相同哈希,则可以删除映射到同一哈希的重复文档。或者,如果担心可能会发生 哈希冲突,则可以检查映射到同一哈希的文档内容,以查看文档是否确实相同,确认后再删除重复文档。

检测算法分析

对于一个大小为 50GB 的索引,如果假设索引包含的文档平均大小为 0.4 KB,则该索引中将有 1.25 亿个文档。在这种情况下,如果使用 128 位 md5 哈希将删除重复的数据结构存储在内存中,则所需内存量约为 128 位 x 1.25 亿 = 2GB 内存,再加上 160 位 _ids还需要 160 位 x 1.25 亿 = 2.5 GB 内存。因此,这一算法将需要约 4.5GB RAM 才能将所有相关数据结构保留在内存中。此时,如果可以应用下节所讨论的方法,则可以大大减少内存占用。

算法增强功能

在这部分中,我们将介绍一个算法增强功能,用于减少内存使用量并持续移除新的重复文档。

如果您存储的是时序数据,并且知道重复文档仅在短时间内彼此重复,则可通过对索引中的文档子集重复执行此算法(其中每个子集对应一个不同的时间范围)来改进其内存占用情况。例如,如果您有一年的数据,则可以使用日期时间字段上的 范围查询(在 筛选器上下文中,用于获得最佳性能),一次一周地遍历您的数据集。这就要求该算法执行 52 次(针对每周数据集执行一次),在这种情况下,这种方法可以将极端情况下的内存占用减少 52 倍。

在上例中,您可能会担心检测不到跨周的重复文档。让我们假设您知道重复文档的间隔不能超过 2 小时,那么,您需要确保每次执行的算法都包含与前一次执行算法分析的最后一组文档重叠 2 小时的文档。因此,在上述的每周示例中,您需要查询 170 小时(1 周 + 2 小时)的时序文档,才可确保不漏掉任何重复文档。

如果要持续定期从索引中清除重复文档,则可以对最近收到的文档执行这个算法。与上述逻辑相同,确保分析中包含最近接收的文档,并且与稍旧文档足够重叠,以确保不会无意中漏掉重复文档。

用于检测重复文档的 Python 代码

以下代码演示了如何高效评估文档,以查看它们是否相同,然后根据需要删除。但是,为了防止意外删除文档,在这个示例中,我们不会实际执行删除操作。包含此功能很容易,

也可以 在 github 中找到用于从 Elasticsearch 中删除重复文档的代码。

#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}
# The following line defines the fields that will be
# used to determine if a document is a duplicate
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]
# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])
        _id = item["_id"]
        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
        # If the hashval is new, then we will create a new key
        # in the dict_of_duplicate_docs, which will be
        # assigned a value of an empty array.
        # We then immediately push the _id onto the array.
        # If hashval already exists, then
        # we will just push the new _id onto the existing array
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})
    # Get the scroll ID
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    # Before scroll, process current batch of hits
    populate_dict_of_duplicate_docs(data['hits']['hits'])
    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')
        # Process current batch of hits
        populate_dict_of_duplicate_docs(data['hits']['hits'])
        # Update the scroll ID
        sid = data['_scroll_id']
        # Get the number of results that returned in the last scroll
        scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
    # Search through the hash of doc values to see if any
    # duplicate hashes have been found
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # Get the documents that have mapped to the current hashval
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # In this example, we just print the duplicate docs.
            # This code could be easily modified to delete duplicates
            # here instead of printing them
            print("doc=%s\n" % doc)
def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()
main()

结论

在本篇博文中,我们演示了两种在 Elasticsearch 中删除重复文档的方法。第一种方法使用 Logstash 移除重复文档,第二种方法使用定制 Python 脚本查找并移除重复文档。

如果对 Elasticsearch 重复文档删除或任何其他 Elasticsearch 相关主题有疑问,请在 讨论论坛中查看各种宝贵见解和信息。

相关 [elasticsearch 文档 elastic] 推荐:

如何在 Elasticsearch 中查找并移除重复文档 | Elastic Blog

- -
将数据导入 Elasticsearch 的很多系统都将利用. Elasticsearch 为新插入的文档自动生成 ID 值. 但是,如果数据源将同一文档多次意外发送到 Elasticsearch,并且对于 Elasticsearch 插入的每个文档都使用了这种自动生成的. _id值,那么这个文档就会使用不同的.

ES事务日志的持久化变更 | Elasticsearch: 权威指南 | Elastic

- -
translog 也被用来提供实时 CRUD. 当你试着通过ID查询、更新、删除一个文档,它会在尝试从相应的段中检索之前, 首先检查 translog 任何最近的变更. 这意味着它总是能够实时地获取到文档的最新版本. 如果没有用  fsync 把数据从文件系统缓存刷(flush)到硬盘,我们不能保证数据在断电甚至是程序正常退出之后依然存在.

Elasticsearch 5.x 字段折叠的使用实现去重分页 - Elastic 中文社区

- -
 在 Elasticsearch 5.x 有一个字段折叠(Field Collapsing,. #22337)的功能非常有意思,在这里分享一下,. 字段折叠是一个很有历史的需求了,可以看这个 issue,编号. #256,最初是2010年7月提的issue,也是讨论最多的帖子之一(240+评论),熬了6年才支持的特性,你说牛不牛,哈哈.

elasticsearch文档-字段的mapping

- - 开源软件 - ITeye博客
elasticsearch文档-字段的mapping. Mapping是指定义如何将document映射到搜索引擎的过程,比如一个字段是否可以查询以及如何分词等,一个索引可以存储含有不同"mapping types"的documents,ES允许每个mapping type关联多个mapping定义.

elasticsearch 文档 - 轩脉刃

- - 博客园_首页
elasticsearch 文档. 索引中最基本的单元叫做文档 document. "content": "汽车常见故障的解决办法有哪些. } 文档中下划线开头的是es自带的字段. _id 代表文档id,如果插入文档的时候没有设置id的话,那么es会自动生成一个唯一id. _score 这个不是文档自带的,而是进行搜索的时候返回的,代表这个文档和搜索的相关匹配分值.

Elasticsearch自定义文档得分并排序

- - JenkinWang's Blog
大多数情况下,我们需要对查询结果排序,比方说按最新时间降序、按金额降序等. 我们只需要对相应的字段 sort 即可. 但有时候也会出现一些复杂的情况,比方说有A、B、C、D、E类数据,他想让你给这类数据重新定义优先级,按照B、E、D、A、C的顺序展示,并且每类数据内部按时间降序. 然而最近我们也提出了一个类似这样的需求,查阅相关文档后,发现Elasticsearch里的 function_socre函数可以实现这一功能, 遂将此学习内容做一个记录.

熬夜爆肝整理的一份elasticsearch中文文档手册

- - SegmentFault 最新的文章
由于本文篇幅较长,想要获取PDF,请关注‘公众号-菜鸟成长学习笔记’回复"es手册"即可领取文件. Elaticsearch,简称为 ES, ES 是一个开源的高扩展的分布式全文搜索引擎,Elasticsearch 是面向文档型数据库,一条数据在这里就是一个文档. ES是一个文档型数据库,在与传统的关系型数据库上,存在着一定的差异.

主流全文索引工具的比较( Lucene, Sphinx, solr, elastic search)

- - 企业架构 - ITeye博客
前几天的调研(  Rails3下的 full text search (全文本搜索, 全文匹配. ) ), 我发现了两个不错的候选: . lucene  (solr, elasticsearch 都是基于它) . 把看到的有价值的文章记录在这里: . 回答1.  Result relevance ranking is the default.

Elastic-Job - 分布式定时任务框架

- - 企业架构 - ITeye博客
摘要: Elastic-Job是ddframe中dd-job的作业模块中分离出来的分布式弹性作业框架. 去掉了和dd-job中的监控和ddframe接入规范部分. 该项目基于成熟的开源产品Quartz和Zookeeper及其客户端Curator进行二次开发. ddframe其他模块也有可独立开源的部分,之前当当曾开源过dd-soa的基石模块DubboX.

[译]elasticsearch mapping

- - an74520的专栏
es的mapping设置很关键,mapping设置不到位可能导致索引重建. 请看下面各个类型介绍^_^. 每一个JSON字段可以被映射到一个特定的核心类型. JSON本身已经为我们提供了一些输入,支持 string,  integer/ long,  float/ double,  boolean, and  null..