Elasticsearch:深入理解 Dissect ingest processor

标签: elasticsearch 理解 dissect | 发表时间:2021-03-16 12:40 | 作者:Elastic
出处:https://juejin.cn/backend

Grok 处理器类似, dissect 处理器也从文档中的单个文本字段中提取结构化字段。 但是,与 Grok 处理器不同,解析不使用正则表达式。 这使得 Dissect 的语法更加简单,并且在某些情况下比 Grok Processor 更快。

Dissect 将单个文本字段与定义的模式匹配。在我之前的文章 “ Elastic可观测性 - 运用 pipeline 使数据结构化” 中我们已经对 Grok 及 Dissect 处理器做了介绍。在今天的文章中,我们想更深入地了解 dissect 处理器。在今天的讲解中,我将以一些例子来进行展示。

 

动手实践

简单的一个例子

我们先以一个简单的例子啦进行展示:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp} [%{loglevel}] %{status}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2019-09-29T00:39:02.912Z [Debug] MyApp stopped"
      }
    }
  ]
}
复制代码

在上面,我们通过 pattern 来对 message 进行提取。在 disssect 中,特别需要注意的是空格的使用。如果空格不匹配,那么也会造成错误。上面的结果是:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29T00:39:02.912Z",
          "loglevel" : "Debug",
          "message" : "2019-09-29T00:39:02.912Z [Debug] MyApp stopped",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T04:40:40.894589Z"
        }
      }
    }
  ]
}
复制代码

显然它提取出来 loglevel, message 以及 status。请注意,我们也丢掉了里面的 [ 及 ] 字符。

 

跳过字段

由于 dissect 是一种确切地匹配,但是在实际的使用中,我们可能并不想要某个字段出现在我们的文档中,虽然它可以被结构化。我们看一下如下的一个例子:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp} [%{?loglevel}] %{status}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2019-09-29T00:39:02.912Z [Debug] MyApp stopped"
      }
    }    
  ]
}
复制代码

在上面的例子中,我们使用了 %{?loglevel},它表明我们不需要 loglevel  出现在我们的结果中:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29T00:39:02.912Z",
          "message" : "2019-09-29T00:39:02.912Z [Debug] MyApp stopped",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T04:47:24.7823Z"
        }
      }
    }
  ]
}
复制代码

显然在这个输出中,没有了之前的 loglevel 这个字段了。

 

处理多个空格

Dissect 处理器是非常严格的。它需要完全匹配的空格,否则解析将不会成功,比如:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp} %{status}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2019-09-29  MyApp stopped"
      }
    }    
  ]
}
复制代码

在上面,我们故意在 MyApp stopped 之前多加了一个空格,那么上面解析的结果是:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29",
          "message" : "2019-09-29  MyApp stopped",
          "status" : ""
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:01:58.065065Z"
        }
      }
    }
  ]
}
复制代码

从上面的结果中可以看出来,它完全解析不了我们的 message。status 字段显示为空。那么我们该如何处理这个呢?
我们可以使用向右的 padding 修饰符 -> 忽略 padding:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp->} %{status}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2019-09-29  MyApp stopped"
      }
    }    
  ]
}
复制代码

上面的运行结果是:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29",
          "message" : "2019-09-29  MyApp stopped",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:07:23.294188Z"
        }
      }
    }
  ]
}
复制代码

我们也可以使用一个空的键来跳过不想要的空格:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "[%{@timestamp}]%{->}[%{status}]"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "[2019-09-29] [MyApp stopped]"
      }
    },
    {
      "_source": {
        "message": "[2019-09-29]  [MyApp stopped]"
      }
    }    
  ]
}
复制代码

在上面我们使用了 %{->} 来匹配不想要的空格。在上面,我们使用了两个文档,一个文档含有一个空格,另外一个文档含有两个空格。运行的结果如下:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29",
          "message" : "[2019-09-29] [MyApp stopped]",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:21:14.752694Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29",
          "message" : "[2019-09-29]  [MyApp stopped]",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:21:14.752701Z"
        }
      }
    }
  ]
}
复制代码

 

追加字段

在很多的情况下,我们甚至可以把很多的字段追加到一个字段中去,比如:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp} %{+@timestamp} %{+@timestamp} %{loglevel} %{status}",
          "append_separator": " "
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "Oct 29 00:39:02 Debug MyApp stopped"
      }
    }    
  ]
}
复制代码

在上面,我们的时间表达式是 Oct 29 00:39:02。它是由三个字符串组成的。我们通过 %{@timestamp} %{+@timestamp} %{+@timestamp} 来把这三个字符串组合成一个 @timestamp 字段。运行上面的结果是:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "Oct 29 00:39:02",
          "loglevel" : "Debug",
          "message" : "Oct 29 00:39:02 Debug MyApp stopped",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:27:29.785206Z"
        }
      }
    }
  ]
}
复制代码

请注意在上面的例子中,我们使用了 append_separator,并配置它为空字符串。否则在我们的结果中三个字符串将被级联起来,从而变成 Oct2900:39:02。这个在实际的使用中,可能并不是我们想要的结果。

 

提前 key-value

我们可以使用 %{*field} 当做 key,并把 %{&field} 当做 value 来匹配:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor key-value",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp} %{*field1}=%{&field1} %{*field2}=%{&field2}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2019009-29T00:39:02.912Z host=AppServer status=STATUS_OK"
      }
    }
  ]
}
复制代码

上面的运行结果是:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019009-29T00:39:02.912Z",
          "host" : "AppServer",
          "message" : "2019009-29T00:39:02.912Z host=AppServer status=STATUS_OK",
          "status" : "STATUS_OK"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:34:30.47561Z"
        }
      }
    }
  ]
}
复制代码

 

挑战自己

从上面的练习中,可能你已经感觉到这个 dissect 处理器是非常有用的,而且也是非常简单易用的。那么我们现在来做一个真正实用的一个例子:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

上面是一个 haproxy 的例子。信息很长。我们该如何使用 processor 来处理上面的信息并使之成为一个结构化的文档呢?

我们可以使用 dissect 处理器。按照我们上面所学的东西,我们可以先这么处理:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

在上面,我们把前面的三个字符串连接成为一个 timestamp 的字段。运行上面的命令:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "timestamp" : """Mar2201:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:38:44.674567Z"
        }
      }
    }
  ]
}
复制代码

显然前面的三个字符串连成一个字符串,并且它很贪婪。它把后面所有的字符串都匹配到这个字符串中。我们需要重新进行修改:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host}",
          "append_separator": " "
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

我们添加了 append_separator,并使用 %{host} 来匹配后面所有的字符串:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "host" : """localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "timestamp" : "Mar 22 01:27:39"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:41:53.667182Z"
        }
      }
    }
  ]
}
复制代码

显然这次,我们可以清楚地看到 timestamp 这个字段,但是 host 字段还是一个很长的字符串。我们接着处理:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{rest}",
          "append_separator": " "
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

在上面,我们提取 process 以及其 id,并把其它的内容放入到 %{rest} 中去:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "rest" : """ Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "process" : "haproxy",
          "host" : "localhost",
          "id" : "14415",
          "message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "timestamp" : "Mar 22 01:27:39"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:46:11.833548Z"
        }
      }
    }
  ]
}
复制代码

从上面的 rest 中,我们可以看出来前面的部分是一个 status,而后面的是一个 kv 类型的数据。我们可以使用 kv processor 来对它进行处理。

我们首先来提取 status:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{status}, %{rest}",
          "append_separator": " "
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

运行上面的命令:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "rest" : """reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "process" : "haproxy",
          "host" : "localhost",
          "id" : "14415",
          "message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "status" : " Server updates /appServer02 is UP",
          "timestamp" : "Mar 22 01:27:39"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:50:18.300969Z"
        }
      }
    }
  ]
}
复制代码

显然,我们可以得到 status 这个字段。在接下来的 rest 字段中显然是一个 key-value 这样的信息。我们可以使用  kv processor 来进行处理:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{status}, %{rest}",
          "append_separator": " "
        }
      },
      {
        "kv": {
          "field": "rest",
          "field_split": ", ",
          "value_split": ":"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

在上面我们添加了一个叫做 kv 的处理器:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "rest" : """reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "reason" : " Layer7 check passed",
          "process" : "haproxy",
          "code" : "2000",
          "check duration" : "3ms.",
          "message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "host" : "localhost",
          "id" : "14415",
          "status" : " Server updates /appServer02 is UP",
          "timestamp" : "Mar 22 01:27:39",
          "info" : "\"OK\""
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T06:00:37.990909Z"
        }
      }
    }
  ]
}
复制代码

从上面的结果中,我们可以看出来我们得到了所有的想要的字段。我们接下来删除那个不想要的 message 及 rest 字段:

   POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{status}, %{rest}",
          "append_separator": " "
        }
      },
      {
        "kv": {
          "field": "rest",
          "field_split": ", ",
          "value_split": ":"
        }
      },
      {
        "remove": {
          "field": "message"
        }
      },
      {
        "remove": {
          "field": "rest"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

在上面,我运用 remove 处理器删除了 message 以及 rest 字段:

   {
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "reason" : " Layer7 check passed",
          "process" : "haproxy",
          "code" : "2000",
          "check duration" : "3ms.",
          "host" : "localhost",
          "id" : "14415",
          "status" : " Server updates /appServer02 is UP",
          "timestamp" : "Mar 22 01:27:39",
          "info" : "\"OK\""
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:59:44.138394Z"
        }
      }
    }
  ]
}
复制代码

从上面的一步一步的过程中,我们可以看出来如何对一个非结构化的数据进行结构化。

相关 [elasticsearch 理解 dissect] 推荐:

Elasticsearch:深入理解 Dissect ingest processor

- - 掘金 后端
Grok 处理器类似, dissect 处理器也从文档中的单个文本字段中提取结构化字段. 但是,与 Grok 处理器不同,解析不使用正则表达式. 这使得 Dissect 的语法更加简单,并且在某些情况下比 Grok Processor 更快. Dissect 将单个文本字段与定义的模式匹配. 在我之前的文章 “ Elastic可观测性 - 运用 pipeline 使数据结构化” 中我们已经对 Grok 及 Dissect 处理器做了介绍.

干货 |《深入理解Elasticsearch》读书笔记

- - CSDN博客推荐文章
由于之前已经梳理过Elasticsearch基础概念且在项目中实战过Elasticsearch的增删改查、聚类、排序等相关操作,对ES算是有了一定的认知. 但是,仍然对于一些底层的原理认知模糊,特买来《深入理解Elasticsearch》过了一遍,将书中一些细节知识点结合官网文档梳理如下. 1——4章偏应用,跟着敲一遍代码基本就能理解原理.

[译]elasticsearch mapping

- - an74520的专栏
es的mapping设置很关键,mapping设置不到位可能导致索引重建. 请看下面各个类型介绍^_^. 每一个JSON字段可以被映射到一个特定的核心类型. JSON本身已经为我们提供了一些输入,支持 string,  integer/ long,  float/ double,  boolean, and  null..

Elasticsearch as Database - taowen - SegmentFault

- -
【北京上地】滴滴出行基础平台部招聘 Elasticsearch 与 Mysql binlog databus 开发工程师. 内推简历投递给: [email protected]. 推销Elasticsearch. 时间序列数据库的秘密(1)—— 介绍. 时间序列数据库的秘密(2)——索引.

ElasticSearch 2 的节点调优(ElasticSearch性能)

- - 行业应用 - ITeye博客
一个ElasticSearch集群需要多少个节点很难用一种明确的方式回答,但是,我们可以将问题细化成一下几个,以便帮助我们更好的了解,如何去设计ElasticSearch节点的数目:. 打算建立多少索引,支持多少应用. elasticsearch版本: elasticsearch-2.x. 需要回答的问题远不止以上这些,但是第五个问题往往是容易被我们忽视的,因为单个ElasticSearch集群有能力支持多索引,也就能支持多个不同应用的使用.

Elasticsearch:使用 Elasticsearch 进行语义搜索

- - 掘金 后端
在数字时代,搜索引擎在通过浏览互联网上的大量可用信息来检索数据方面发挥着重要作用. 此方法涉及用户在搜索栏中输入特定术语或短语,期望搜索引擎返回与这些确切关键字匹配的结果. 虽然关键字搜索对于简化信息检索非常有价值,但它也有其局限性. 主要缺点之一在于它对词汇匹配的依赖. 关键字搜索将查询中的每个单词视为独立的实体,通常会导致结果可能与用户的意图不完全一致.

elasticsearch的javaAPI之query

- - CSDN博客云计算推荐文章
elasticsearch的javaAPI之query API. the Search API允许执行一个搜索查询,返回一个与查询匹配的结果(hits). 它可以在跨一个或多个index上执行, 或者一个或多个types. 查询可以使用提供的 query Java API 或filter Java API.

Elasticsearch基础教程

- - 开源软件 - ITeye博客
转自:http://blog.csdn.net/cnweike/article/details/33736429.     Elasticsearch有几个核心概念. 从一开始理解这些概念会对整个学习过程有莫大的帮助.     接近实时(NRT).         Elasticsearch是一个接近实时的搜索平台.

ElasticSearch索引优化

- - 行业应用 - ITeye博客
ES索引的过程到相对Lucene的索引过程多了分布式数据的扩展,而这ES主要是用tranlog进行各节点之间的数据平衡. 所以从上我可以通过索引的settings进行第一优化:. 这两个参数第一是到tranlog数据达到多少条进行平衡,默认为5000,而这个过程相对而言是比较浪费时间和资源的. 所以我们可以将这个值调大一些还是设为-1关闭,进而手动进行tranlog平衡.

elasticsearch集群搭建

- - zzm
之前对于CDN的日志处理模型是从 . logstash agent==>>redis==>>logstash index==>>elasticsearch==>>kibana3,对于elasticsearch集群搭建,可以把索引进行分片存储,一个索引可以分成若干个片,分别存储到集群里面,而对于集群里面的负载均衡,副本分配,索引动态均衡(根据节点的增加或者减少)都是elasticsearch自己内部完成的,一有情况就会重新进行分配.