Beats:如何避免重复的导入数据

标签: beats 数据 | 发表时间:2021-03-02 09:32 | 作者:Elastic
出处:https://juejin.cn/backend

Beats 框架保证至少一次交付,以确保将事件发送到支持确认的输出(例如 Elasticsearch,Logstash,Kafka 和 Redis)时不会丢失任何数据。 如果一切按计划进行,那就太好了。 但是,如果 Filebeat 在处理过程中关闭,或者在确认事件之前断开了连接,则最终可能会有重复的数据。那么我们该如何避免重复被导入的数据呢?

 

什么原因导致 Elasticsearch 中出现重复项?

当输出被阻止时,Filebeat 中的重试机制将尝试重新发送事件,直到输出确认它们为止。 如果输出接收到事件,但是无法确认事件,则数据可能会多次发送到输出。 由于文档 ID 通常是在 Elasticsearch 从 Beats 接收数据后由其设置的,因此重复事件被索引为新文档。

示例

我们仿照之前文章 “ Beats:使用 Filebeat 导入 JSON 格式的日志文件” 来导入一个如下的 sample.json 文件:

sample.json

   { "id": "1", "user_name": "arthur","verified": false, "event": "logged_in"}
{ "id": "2", "user_name": "arthur", "verified": true, "event": "changed_state"}
复制代码

在上面的文档中,我们假定 "id" 字段为唯一值,也就是说没有任何文档的 "id" 值是和其它文档是一样的。虽然,这里只有两条文档,单足以说明问题。我们按照 “ Beats:使用 Filebeat 导入 JSON 格式的日志文件” 文章中所示的方法把文档导入 Elasticsearch:

   ./filebeat -e -c  filebeat.yml
复制代码

等文档导入完毕后,我们可以通过如下的命令来查看已经被导入的文档:

   GET logs_json/_search
复制代码

或者通过如下的命令来查看多少个文档:

   GET logs_json/_count
复制代码

上面的命令显示:

   {
  "count" : 2,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}
复制代码

它有两个文档在 logs_json 索引中。

在通常的情况下,Filebeat 框架可以保证至少一次的交付,但是在某些特殊的情况下,比如 filebeat 坏掉,而没有收到从 Elasticsearch 发回来的确认信息,那么下次启动 filebeat 时,它会重复发送之前已经发送过的文档。或者确实信息在传输过程中,由于某种原因没有被正确收到,那么 filebeat 也会在规定的时间内重新发送已经发送过的文档。在 Elasticsearch 为多次重复发送的文档分别自动分配唯一的 _id 值。我们可以通过如下的实验来进行验证。首先我们在 filebeat 安装的 registry 目录。

Filebeat 的 registry 文件存储 Filebeat 用于跟踪上次读取位置的状态和位置信息。 

  • data/registry 针对  .tar.gz and  .tgz 归档文件安装
  • /var/lib/filebeat/registry 针对 DEB 及 RPM 安装包
  • c:\ProgramData\filebeat\registry 针对 Windows zip 文件

针对我的本地安装:

   $ pwd
/Users/liuxg/elastic1/filebeat-7.11.0-darwin-x86_64
$ ls
LICENSE.txt            filebeat               module
NOTICE.txt             filebeat.reference.yml modules.d
README.md              filebeat.yml           sample.json
data                   filebeat1.yml
fields.yml             kibana
$ rm -rf data/registry/
复制代码

在 registry 中它保存了曾经已经被导入过文件的所有的信息。在上面我们把这个文件夹删除了,表明之前的记录都不存在,对于曾经导入过的文件,我们再次执行导入的动作,那么之前的文档也会被重新导入。这个模拟了确认信息丢失的情况。重新启动 filebeat 来导入刚才导入的文件:

   ./filebeat -e -c  filebeat.yml
复制代码

然后,我们在 Kibana 中再次获得被导入的文档的个数:

   GET logs_json/_count
复制代码
   {
  "count" : 4,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}
复制代码

从上面的数值中,我们可以看出来,文档的格式从之前的 2 变为 4,也就是说之前已经被导入的文档也被重新导入了。那么我们该如何解决这个问题呢?我们期望的结果是无论确认信息是否丢失,那么重新导入文档,我们不希望出现同样的文档。

在接下的实验中,我们在 Kibana 中删除刚才已经被导入的索引  logs_json:

   DELETE logs_json
复制代码

 

如何避免重复?

与其允许 Elasticsearch 设置文档 ID,不如在 Beats 中设置 ID。 该 ID 存储在 Beats@metadata._id 字段中,并用于在建立索引期间设置文档 ID。 这样,如果 Beats 将同一事件多次发送到 Elasticsearch,则 Elasticsearch 会覆盖现有文档,而不是创建一个新文档。 @metadata._id 字段随事件一起传递,因此你可以在 Filebeat 发布事件之后,然后在Elasticsearch 接收事件之前,使用它来设置文档 ID。 例如,请参阅 Logstash 管道示例。 有几种方法可以在 Beats 中设置文档 ID:

add_id 处理器:

如果你的数据没有自然键字段,并且你无法从现有字段派生唯一键,请使用 add_id 处理器。 本示例为每个事件生成一个唯一的 ID,并将其添加到 @metadata._id 字段中:

filebeat.yml

   filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /Users/liuxg/data/processors/sample.json
 
processors:
 - decode_json_fields:
     fields: ['message']
     target: ''
     overwrite_keys: true
 
 - drop_fields:
     fields: ["message", "ecs", "agent", "log"]

 - add_id: ~
 
setup.template.enabled: false
setup.ilm.enabled: false
 
output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "logs_json"
  bulk_max_size: 1000
复制代码

我们删除 registry 目录,然后执行如下的命令:

   ./filebeat -e -c  filebeat.yml
复制代码

在上面,我们通过 add_id 这个 processor 为我们的文档设定唯一的 id。最终在 Elasticsearch 中的文档 _id 是由 Filebeat 所生成的。如果我们再次删除 registry 并重新导入文档的话,那么在 Elasticsearch 中将会有重复的文档尽管我们可以通过这样的方式来设定最终文档的 _id。

为了下面的练习,我们在 Kibana 中删除 logs_json 索引,并同时删除 registry 目录。

 

fingerprint 处理器

这个方法和我之前在 Logstash 中介绍的方法是一样的。请详细阅读文章 “ Logstash:运用 fingerprint 过滤器处理重复的文档”。使用 fingerprint 处理器从一个或多个现有字段中派生唯一密钥。本示例使用 id 和 user_name 的值来派生唯一键,并将其添加到 @metadata._id 字段中:

filebeat.yml

   filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /Users/liuxg/data/processors/sample.json
 
processors:
 - decode_json_fields:
     fields: ['message']
     target: ''
     overwrite_keys: true
 
 - drop_fields:
     fields: ["message", "ecs", "agent", "log"]

 - fingerprint:
     fields: ["id", "user_name"]
     target_field: "@metadata._id"
 
setup.template.enabled: false
setup.ilm.enabled: false
 
output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "logs_json"
  bulk_max_size: 1000
复制代码

我们重新运行 filebeat 并导入数据到 Elasticsearch 中:

   ./filebeat -e -c  filebeat.yml
复制代码

我们通过如下的命令查询文档数:

   GET logs_json/_count
复制代码
   {
  "count" : 2,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}
复制代码

接下来,我们再次删除 registry 目录,但是保留 Elasticsearch 中的 logs_json 索引。我们再次运行 filebeat 重新导入 sample.json 文件。

   ./filebeat -e -c  filebeat.yml
复制代码

运行完过后,我们重新在 Kibana 中检查文档的个数:

   GET logs_json/_count
复制代码
   {
  "count" : 2,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}
复制代码

我们会发现这次文档的格式还是2,它表明虽然文档被重新导入,但是由于使用了 fingerprint,它生成了唯一 @metadata_id,这样当同一的文档被导入到 Elasticsearch 中后,它不会生成新的文档,而是更新之前的文档,所以没有新的文档生成。

 

encode_json_fields 处理器

解码包含自然键字段的 JSON 字符串时,请使用 encode_json_fields 处理器中的 document_id 设置。

针对我们的 sample.json 文件,我们假定 "id" 字段为唯一的值。 本示例从 JSON 字符串中获取 id 的值,并将其存储在 @metadata._id 字段中:

filebeat.yml

   filebeat.inputs:
- type: log
  enabled: true
  paths:
    - sample.json
 
processors:
 - decode_json_fields:
     fields: ['message']
     target: ''
     overwrite_keys: true
 
 - drop_fields:
     fields: ["message", "ecs", "agent", "log"]

 - decode_json_fields:
     document_id: "id"
     fields: ["message"]
     max_depth: 1
     target: ""
 
setup.template.enabled: false
setup.ilm.enabled: false
 
output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "logs_json"
  bulk_max_size: 1000
复制代码

同样,我们在实验之前,删除 registry 目录,并删除 logs_json 索引,并再次启动 filebeat 导入数据。我们通过如下的命令获得索引文档的个数:

   GET logs_json/_count
复制代码
   {
  "count" : 2,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}
复制代码

我们再次删除 registry 目录,并同时导入数据。这次我们不删除 logs_json 索引,我们再次查看文档的个数是否增加:

   {
  "count" : 4,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}
复制代码

由于一些原因这种方法可能是由于有 bug,还不能指定文档的 _id。

 

JSON 输入设定

如果你要提取 JSON 格式的数据,并且该数据具有自然键字段,请使用 json.document_id 输入设置。

本示例从 JSON 文档中获取 id 的值,并将其存储在 @metadata._id字段中:

sample.json

   {"id": "1", "user_name": "arthur", "verified": false, "evt": "logged_in"}
{"id": "2", "user_name": "arthur", "verified": true, "evt": "changed_state"}
复制代码

filebeat.yml

   filebeat.inputs:
- type: log
  enabled: true
  tags: ["i", "love", "json"]
  json.message_key: evt
  json.keys_under_root: true
  json.add_error_key: true
  json.document_id: "id"
  fields:
    planet: liuxg
  paths:
    - sample.json
 
output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "json_logs1"
 
setup.ilm.enabled: false
setup.template.name: json_logs1
setup.template.pattern: json_logs1
复制代码

启动 filebeat 来导入数据,并查询索引 json_logs1 的文档个数:

   {
  "count" : 2,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}
复制代码

删除 registry 目录,并重新启动 filebeat 来导入数据:

   {
  "count" : 2,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }
}
复制代码
   GET json_logs1/_search
复制代码
   {
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "json_logs1",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2021-02-24T02:24:56.084Z",
          "evt" : "logged_in",
          "input" : {
            "type" : "log"
          },
          "fields" : {
            "planet" : "liuxg"
          },
          "agent" : {
            "id" : "e2b7365d-8953-453c-87b5-7e8a65a5bc07",
            "name" : "liuxg",
            "type" : "filebeat",
            "version" : "7.11.0",
            "hostname" : "liuxg",
            "ephemeral_id" : "7b309ac8-48d1-46d4-839f-70948fddd428"
          },
          "ecs" : {
            "version" : "1.6.0"
          },
          "host" : {
            "name" : "liuxg"
          },
          "log" : {
            "offset" : 0,
            "file" : {
              "path" : "/Users/liuxg/elastic1/filebeat-7.11.0-darwin-x86_64/sample.json"
            }
          },
          "user_name" : "arthur",
          "verified" : false,
          "tags" : [
            "i",
            "love",
            "json"
          ]
        }
      },
      {
        "_index" : "json_logs1",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "@timestamp" : "2021-02-24T02:24:56.084Z",
          "fields" : {
            "planet" : "liuxg"
          },
          "input" : {
            "type" : "log"
          },
          "host" : {
            "name" : "liuxg"
          },
          "agent" : {
            "ephemeral_id" : "7b309ac8-48d1-46d4-839f-70948fddd428",
            "id" : "e2b7365d-8953-453c-87b5-7e8a65a5bc07",
            "name" : "liuxg",
            "type" : "filebeat",
            "version" : "7.11.0",
            "hostname" : "liuxg"
          },
          "evt" : "changed_state",
          "user_name" : "arthur",
          "verified" : true,
          "log" : {
            "file" : {
              "path" : "/Users/liuxg/elastic1/filebeat-7.11.0-darwin-x86_64/sample.json"
            },
            "offset" : 74
          },
          "tags" : [
            "i",
            "love",
            "json"
          ],
          "ecs" : {
            "version" : "1.6.0"
          }
        }
      }
    ]
  }
}
复制代码

上面显示这种方法是可以行的。还是两个文档。也就是说明了我们可以通过这种方法来设置 Elasticsearch 中的文档 id。

相关 [beats 数据] 推荐:

Beats:如何避免重复的导入数据

- - 掘金 后端
Beats 框架保证至少一次交付,以确保将事件发送到支持确认的输出(例如 Elasticsearch,Logstash,Kafka 和 Redis)时不会丢失任何数据. 如果一切按计划进行,那就太好了. 但是,如果 Filebeat 在处理过程中关闭,或者在确认事件之前断开了连接,则最终可能会有重复的数据.

双核Beats音效 HTC Sensation XE解析

- 洞箫 - cnBeta.COM
目前,智能手机的发展可谓日新月异,进入2011年,主流旗舰级手机进入了双核时代,作为Android平台的先头部队,HTC也在年初发布了旗下首款双核手机Sensation,由于精致的做工和出色的性能受到了很多用户的青睐. 如今,HTC又推出了这款手机的升级版产品――HTC Sensation XE,并将其处理器主频提升至1.5GHz,性能将更加强劲.

4.7+1.5GHz HTC Beats手机上手体验

- xing - cnBeta.COM
作为Sensation这条产品线的第三款产品,HTC Sensation XL同时也成为了HTC与Beats合作之后的第二款手机. 经过了Sensation XE与Titan之后,Sensation XL面向更加细分的消费人群,更大的屏幕、超薄的机身、当然还是Beats耳机,音乐发烧友请HOLD住.

HTC Sensation XE 是第一款正式的 Beats 智能手机

- SotongDJ - 谷安——谷奥Android专题站
HTC 重金投到 Beats 公司之后(HTC 之前收购了 Beats Audio 公司)就不断有传言说 HTC 在炮制采用 Betas 技术的产品,而现据称基于 Android 的 HTC Sensation XE 将成为首款采用 Beats 技术的智能手机. 这个 HTC Sensation XE 不只是整合了 Beats 技术,它(在 HTC Sensation 的基础上)还有几方面的变化:1.5GHz 双核处理器,4.3 英寸 qHD 显示屏,1750 mAh 电池以及 8GB 或 16GB microSD 卡.

开发者成功移植 Beats Audio 到 HTC Evo 3D [ROM]

- SotongDJ - 谷安——谷奥Android专题站
XDA 的开发者 Virus 成功为 Evo 3D 手机炮制出了首个 Beats Audio ROM. 这个 ROM 本质上是从 Sensation XE 的 ROM 移植过来并使其兼容 Sprint 的 Evo 3D 的. 这也是第一个带有 HTC 新的 Beats 技术的 EVO 3D ROM,相信很多用户肯定想尝试一番.

HTC Sensation XE 首款 Beats Audio 音樂手機 上市價 19,900 元

- Cary - T客邦
HTC 在今年八月策略投資 Beat Electronics,引起業界的震撼,代表著未來即將推出具有 Beats 音效的手機,才過不到一個月,HTC 就發表首款 Beats 手機 Sensation XE,然而,九月底在台灣就舉辦了發表會,消費者十月中上旬在各經銷商就可以買到 Sensation XE,十一月也能夠在中華電信使用資費方案購買,搶先享受 Beats 所帶來的音效享受.

HTC 将向开发者提供 Beats OpenSense 的音效 API 接口

- SINCERE - Engadget 中国版
在 8 月份的时候,HTC 和 Beats Electronics 走在了一起,之后我们看到了搭配 Beats Audio 技术的机型推出,在这之前,HTC 提供了众多的 SDK 给开发者. 现在是时候端出 Beats Audio 的 API 开发工具了,HTC 官方确认即将推出,这个工具允许第三方开发者挖掘 Beats Audio 的潜力,用以增强应用软件的音效体验.

HTC Desire HD 收到 Beats Audio 功能 Rom,给 Sensation XE 意外一击

- yi - 谷安——谷奥Android专题站
我们都知道 HTC 收购了 Beats Audio 并试图给 HTC Android 手机带来更好的音质. 在过去数周我们已经听到过关于这方面的很多消息以及刚刚宣布的带 Beats Audio 功能的 HTC Sensation XE 手机. 看起来开发者给了 HTC 意外的一击,因为他们为 Desire HD 带来了一个集成有 Beats Audio 功能的 ROM.

HTC Vigor 規格再次流出:720p 螢幕、1.5GHz 雙核心處理器、Beats Audio

- Adam - Engadget 中文版
雖然我們已經非常肯定,在這世界的某處是有著傳說中 HTC Vigor 的存在,但這次我們總算得到一些比較可靠的資料了(對,上次那個是假的. 不過在更確切的證據前,我們還是先相信一半好了...(尤其希望這個機背不是最終版本的).

三个技巧,活用 Twitter 搜索的高级功能 - 爱范儿 · Beats of Bits - 发现创新价值的科技媒体

- 大白Two - www.ifanr.com
Twitter 是信息的洪流,世界的脉搏. 大家都关注 Twitter 上“现在发生了什么”,可惜在协议到期后, Google 的实时搜索服务终止了对 Twitter 的支持. 那么,就好好挖掘 Twitter 自己的搜索功能吧,下面几个技巧会帮助你. 1.在搜索结果中去掉链接:“xxx -filter:links”.