Elasticsearch:rollup - 索引管理_Elastic-CSDN博客_elasticsearch rollup

标签: | 发表时间:2020-06-21 11:14 | 作者:
出处:https://blog.csdn.net

汇总作业 ( rollup jobs)是一项定期任务,它将来自索引模式指定的索引中的数据进行汇总,然后将其汇总到新的索引中。 汇总索引是紧凑存储数月或数年历史数据以供可视化和报告使用的好方法。用到rollup的情况是我们有很多的历史数据,而且通常会比较大。通过使用 rollup 功能,我们可以把很多针对大量数据的统计变为针对经过 rollup 后的索引操作,从而使得数据的统计更加有效。在实际的应用中:

  • 在很多的情况下,保留历史数据不是一种最佳的选择。这是因为时序数据随着时间的推移,它们的价值没有那么高,需要删除这些数据以节省成本
  • 但是我们还是希望能够保留它们的统计数据用于分析

 Elasticsearch 自6.3 版本推出 rollup功能,它可以帮我们:

  • 保留旧数据,并以一种紧凑和聚合的方式来存储
  • 仅仅保留我们感兴趣的数据

在下面,我们来用一个具体的例子来展示如何使用rollup的。

准备数据

我们首先需要找到一个数据比较大一点的索引。我们可以参考我之前的文章 “ Logstash 入门教程 (二)” 。在那篇文章中,我们直接跳到文章的最后一节把整个文件都导入到Elasticticsearch中。

我们可以看到我们的cars索引有达到201M的大小,而且它的总文档数达到 30 万。在接下来我们将使用 rollup 的功能对这个索引进行处理。在数据导入后,我们必须创建一个index pattern,并在Discover中进行显示:

 

 

从上面的图中,我们可以看出来,整个索引的数据是从2019.5.9到2019.6.07进行采集的。

 

创建 rollup job

我们打开 Kibana 界面:

点击 Create rollup job按钮:

从上面我们可以看出来这个 rollup job 针对一个时间系列的索引。把屏幕向下滚动。

从上面我们可以看出来:rollup 是一个在后台不断运行的一个任务。它会周期性地定时做这项工作,以使得最新已有的数据得到处理。在上面,我选择在每个时钟过15分钟时进行一次rollup,比如在1:15分做一次,2:15分做一次,3:15分做一次,依次类推。这个依赖于你自己索引的大小及项目的性质决定的。

 

点击上面的 Next  按钮:

这个是针对 Date histogram 的配置。点击 Next 按钮:

这个是针对 terms 的 选择。点击 Add terms fields 按钮:

我们选择上面的 geoip.country_code2:

按照同样的方法,我们添加 agent.hostname:

点击 Next 按钮:

这个页面时可选项。如果我们感兴趣的话,那么我们点击 Add  histogram fields:

我们选择 Bytes 字段:

点击 Next 按钮:

这个是对 Metrics 的配置。我们点击 Add metrics fields按钮:

我们选择 bytes。由于 Metrics 只是针对数值类型的字段,在上面我们可以看到所有的字段都是数值类型的。

我们接着勾上我们喜欢的 metrics 项。再点击 Next 按钮:

 

在这个页面我们可以看到这个 rollup 的 概览。如果你觉得不太满意,你可以点击 Back 按钮然后再进行重新配置。如果满意的话,我们点击 Save 按钮。如果你还想马上就开始这个 job 的话,勾上 Start job now。我们点击 Save,并勾上 Start job now:

上面的状态显示这个job已经开始工作了。我们可以点击 Manage 按钮来对这个 job 进行管理:

它可以让我们停止这个 job 或者 克隆这个job。目前,我们既不想停止,也不想克隆。我们在这个界面还可以点击上面的几个tab来查看这个job的详细信息。

特别有意思的是我们甚至可以看到这个 job 的JSON 表达方式。

退出这个Dialog:

从上面,我们可以看出来我们的 Job 正在运行中。 

 

在 Kibana 中进行统计

我在Kibana中,通过如下的命令来查看所有的索引:

      GET _cat/indices

结果显示:

我们可以看到一个新的索引:apache_rollup。它的文件显示的非常之小。只有21.8M。是不是觉得不可思议啊。它相比之前的那个apache_elastic_example 来说,小了非常多。

我们通过对这个索引来对我们所关心的数据进行统计分析。当我们对这个 rollup 的索引进行分析时,参照链接 https://www.elastic.co/guide/en/elasticsearch/reference/master/rollup-search.html,我们可以对它进行如下的方式的搜索:

我们经过 rollup 的处理后,那么对于我们的数据的统计来说有没有什么影响呢?

我们先来做一些检测:

找出最大值

      GET apache_rollup/_rollup_search
{
  "size": 0,
  "aggs": {
    "my_max": {
      "max": {
        "field": "bytes"
      }
    }
  }
}

返回结果:

      {
  "took" : 9,
  "timed_out" : false,
  "terminated_early" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    },
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "my_max" : {
      "value" : 8.2090432E7
    }
  }
}

上面是根据 rollup 的索引得到的结果。我们来使用最原始的索引:

      GET apache_elastic_example/_search
{
  "size": 0,
  "aggs": {
    "my_max": {
      "max": {
        "field": "bytes"
      }
    }
  }
}
      {
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "my_max" : {
      "value" : 8.209043E7
    }
  }
}

显然返回的结果是一样的。

找出最小值

      GET apache_rollup/_rollup_search
{
  "size": 0,
  "aggs": {
    "my_min": {
      "min": {
        "field": "bytes"
      }
    }
  }
}

返回的结果:

      {
  "took" : 7,
  "timed_out" : false,
  "terminated_early" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    },
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "my_min" : {
      "value" : 1.0
    }
  }
}

使用原始的数据:

      GET apache_elastic_example/_search
{
  "size": 0,
  "aggs": {
    "my_min": {
      "min": {
        "field": "bytes"
      }
    }
  }
}
      {
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "my_min" : {
      "value" : 1.0
    }
  }
}

返回的结果是一样的。

找出平均值

      GET apache_rollup/_rollup_search
{
  "size": 0,
  "aggs": {
    "my_avg": {
      "avg": {
        "field": "bytes"
      }
    }
  }
}
      {
  "took" : 13,
  "timed_out" : false,
  "terminated_early" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    },
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "my_avg" : {
      "value" : 2372996.9378802367
    }
  }
}

使用原始的数据:

      GET apache_elastic_example/_search
{
  "size": 0,
  "aggs": {
    "my_avg": {
      "avg": {
        "field": "bytes"
      }
    }
  }
}

返回的结果是:

      {
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "my_avg" : {
      "value" : 2372996.957136324
    }
  }
}

结果的差异是非常之小的。

找出文档最多的前5个国家的名称

      GET apache_rollup/_rollup_search
{
  "size":0,
  "aggs" : {
    "countries": {
      "terms": {
        "field": "geoip.country_code2.keyword",
        "size": 5
      }
    }
  }
}
      "aggregations" : {
    "countries" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 66600,
      "buckets" : [
        {
          "key" : "US",
          "doc_count" : 122800
        },
        {
          "key" : "FR",
          "doc_count" : 19226
        },
        {
          "key" : "DE",
          "doc_count" : 17415
        },
        {
          "key" : "NL",
          "doc_count" : 14720
        },
        {
          "key" : "CN",
          "doc_count" : 14581
        }
      ]
    }
  }

使用最原始的数据:

      GET apache_elastic_example/_search
{
  "size":0,
  "aggs" : {
    "countries": {
      "terms": {
        "field": "geoip.country_code2.keyword",
        "size": 5
      }
    }
  }
}
      "aggregations" : {
    "countries" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 109662,
      "buckets" : [
        {
          "key" : "US",
          "doc_count" : 122800
        },
        {
          "key" : "FR",
          "doc_count" : 19226
        },
        {
          "key" : "DE",
          "doc_count" : 17415
        },
        {
          "key" : "NL",
          "doc_count" : 14720
        },
        {
          "key" : "CN",
          "doc_count" : 14581
        }
      ]
    }
  }

这两个的统计结果是一样的。

更为复杂的统计

我们可以尝试一下的统计:

      GET apache_rollup/_rollup_search
{
  "size": 0,
  "query": {
    "term": {
      "agent.hostname.keyword" : {
        "value": "liuxg"
      }
    }
  },
  "aggs" :{
    "daily" : {
      "date_histogram" :{
        "field": "@timestamp",
        "calendar_interval": "1d",
        "time_zone": "UTC"
      },
      "aggs": {
        "avg_byete" : {
          "avg": {
            "field": "bytes"
          }
        }
      }
    }
  }
}

上面是对 rollup 索引进行的统计:

      "aggregations" : {
    "daily" : {
      "meta" : { },
      "buckets" : [
        {
          "key_as_string" : "2019-05-08T00:00:00.000Z",
          "key" : 1557273600000,
          "doc_count" : 688,
          "avg_byete" : {
            "value" : 126608.41791044777
          }
        },
        {
          "key_as_string" : "2019-05-09T00:00:00.000Z",
          "key" : 1557360000000,
          "doc_count" : 7721,
          "avg_byete" : {
            "value" : 317026.4027212793
          }
        },
        {
          "key_as_string" : "2019-05-10T00:00:00.000Z",
          "key" : 1557446400000,
          "doc_count" : 9044,
          "avg_byete" : {
            "value" : 551951.1379390019
          }
        },
        {
          "key_as_string" : "2019-05-11T00:00:00.000Z",
          "key" : 1557532800000,
          "doc_count" : 9412,
          "avg_byete" : {
            "value" : 261766.84161836826
          }
        },
     ...
}

我们对最原始的索引来进行统计:

      GET apache_elastic_example/_search
{
  "size": 0,
  "query": {
    "term": {
      "agent.hostname.keyword": {
        "value": "liuxg"
      }
    }
  },
  "aggs": {
    "daily": {
      "date_histogram": {
        "field": "@timestamp",
        "calendar_interval": "1d",
        "time_zone": "UTC"
      },
      "aggs": {
        "avg_bytes": {
          "avg": {
            "field": "bytes"
          }
        }
      }
    }
  }
}

显示结果:

      "aggregations" : {
    "daily" : {
      "buckets" : [
        {
          "key_as_string" : "2019-05-08T00:00:00.000Z",
          "key" : 1557273600000,
          "doc_count" : 688,
          "avg_bytes" : {
            "value" : 126608.41940298508
          }
        },
        {
          "key_as_string" : "2019-05-09T00:00:00.000Z",
          "key" : 1557360000000,
          "doc_count" : 7721,
          "avg_bytes" : {
            "value" : 317026.4042642727
          }
        },
        {
          "key_as_string" : "2019-05-10T00:00:00.000Z",
          "key" : 1557446400000,
          "doc_count" : 9044,
          "avg_bytes" : {
            "value" : 551951.1417513863
          }
        },
        {
          "key_as_string" : "2019-05-11T00:00:00.000Z",
          "key" : 1557532800000,
          "doc_count" : 9412,
          "avg_bytes" : {
            "value" : 261766.84351315204
          }
        },
       ...
   }

从上面的结果显示,通过这两种方式进行的统计的结果非常一致。

从某种程度上讲,我们甚至可以通过Index cycle management的方法只保留一段时间的数据(比如最近的6个月的数据),而通过rollup 方法继续可以对之前的数据进行预设的统计。

 

通过 API 的方法实现

上面的方法通过界面非常直观。但是Elastic也提供API的方法来设置。比如:

      PUT _rollup/job/apache_rollup_job2
{
  "index_pattern": "apache_elastic_example*",
  "rollup_index": "apache_rollup2",
  "cron": "*/10 * * * * ?",
  "page_size": 1000,
  "groups": {
    "date_histogram": {
      "field": "@timestamp",
      "fixed_interval": "1h",
      "delay": "1d",
      "time_zone": "UTC"
    },
    "terms": {
      "fields": [
        "geoip.country_code2.keyword",
        "agent.hostname.keyword"
      ]
    }
  },
  "metrics": [
    {
      "field": "bytes",
      "metrics": [
        "min",
        "max",
        "sum",
        "avg"
      ]
    }
  ]
}

在上面,我们设置了几乎和界面一样的实现,只不过在这里,我们每隔10分钟来进行一次 rollup。在这里,我们把数据存于到 apache_rollup2 这个索引中。

这个apache_rollup2 的大小更小。但是它和apache_rollup 是一样的效果。我们可以对这个索引做同样的搜索:

      GET apache_rollup2/_rollup_search
{
  "size": 0,
  "aggs": {
    "my_avg": {
      "avg": {
        "field": "bytes"
      }
    }
  }
}
      "took" : 2,
  "timed_out" : false,
  "terminated_early" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    },
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "my_avg" : {
      "value" : 2372996.9558440386
    }
  }
}

使用最原始的索引:
 

      GET apache_elastic_example/_search
{
  "size": 0,
  "aggs": {
    "my_avg": {
      "avg": {
        "field": "bytes"
      }
    }
  }
}
      {
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "my_avg" : {
      "value" : 2372996.957136324
    }
  }
}

关于其它的比较,我这里就不详述了。你们可以自己来实践。

如果我们想删除一个 rollup 的任务,我们必须先停止,然后,删除它的 rollup 索引,再进行删除的动作。这些有的可以在之前的那个UI里通过 Manage 按钮来实现。通过 API的方式来删除一个 rollup,请参考如下的步骤:

      POST _rollup/job/apache_rollup_job/_stop?wait_for_completion=true&timeout=10s
DELETE apache_rollup
DELETE _rollup/job/apache_rollup_job

 

参考:

【1】 https://www.youtube.com/watch?v=I5-9x_pQ-Y0&t=302s&pbjreload=10

【2】 https://www.elastic.co/guide/en/elasticsearch/reference/current/rollup-get-job.html

【3】 https://www.elastic.co/guide/en/elasticsearch/reference/master/rollup-search.html

相关 [elasticsearch rollup 索引] 推荐:

Elasticsearch:rollup - 索引管理_Elastic-CSDN博客_elasticsearch rollup

- -
rollup jobs)是一项定期任务,它将来自索引模式指定的索引中的数据进行汇总,然后将其汇总到新的索引中. 汇总索引是紧凑存储数月或数年历史数据以供可视化和报告使用的好方法. 用到rollup的情况是我们有很多的历史数据,而且通常会比较大. 通过使用 rollup 功能,我们可以把很多针对大量数据的统计变为针对经过 rollup 后的索引操作,从而使得数据的统计更加有效.

ElasticSearch索引优化

- - 行业应用 - ITeye博客
ES索引的过程到相对Lucene的索引过程多了分布式数据的扩展,而这ES主要是用tranlog进行各节点之间的数据平衡. 所以从上我可以通过索引的settings进行第一优化:. 这两个参数第一是到tranlog数据达到多少条进行平衡,默认为5000,而这个过程相对而言是比较浪费时间和资源的. 所以我们可以将这个值调大一些还是设为-1关闭,进而手动进行tranlog平衡.

ElasticSearch 倒排索引、分词

- - 行业应用 - ITeye博客
es使用称为倒排索引的结构达到快速全文搜索的目的. 一个倒排索引包含一系列不同的单词,这些单词出现在任何一个文档,. 对于每个单词,对应着所有它出现的文档. 比如说,我们有2个文档,每个文档有一个conteng字段. 我们首先对每个字段进行分词,我们称之为terms或者tokens,创建了一些列有序列表,.

elasticsearch java API------批量添加索引

- - 行业应用 - ITeye博客
elasticsearch java API------批量添加索引.         person.setName("张三" + i);  .         person.setSex("男");  .         String index = "user"; // 相当于数据库名  .         String type = "tb_person"; // 相当于表名  .

Elasticsearch 索引设计实战指南

- - IT瘾-dev
随着 Elastic 的上市,ELK Stack 不仅在 BAT 的大公司得到长足的发展,而且在各个中小公司都得到非常广泛的应用,甚至连“婚庆网站”都开始使用 Elasticsearch 了. 随之而来的是 Elasticsearch 相关部署、框架、性能优化的文章早已铺天盖地. 初学者甚至会进入幻觉——“一键部署、导入数据、检索&聚合、动态扩展, So Easy,妈妈再也不用担心我的 Elastic 学习”.

Elasticsearch 索引容量管理实践

- -
作者:gavinyao,腾讯 PCG 后台开发工程师. Elasticsearch 是目前大数据领域最热门的技术栈之一,腾讯云 Elasticsearch Service(ES)是基于开源搜索引擎 Elasticsearch 打造的高可用、可伸缩的云端全托管 Elasticsearch 服务,完善的高可用解决方案,让业务可以放心的把重要数据存储到腾讯云 ES 中.

开源搜索引擎评估:lucene sphinx elasticsearch

- - 鲁塔弗的博客
lucene系,java开发,包括 solr和 elasticsearch. sphinx,c++开发,简单高性能. 搜索引擎程序这个名称不妥当,严格说来应该叫做 索引程序(indexing program),早期主要用来做中文全文搜索,但是随着互联网的深入普及,各家网站规模越来越大,索引程序在 优化网站架构上发挥了更大的作用: 替代mysql数据库 内置的索引.

elasticsearch RESTful搜索引擎-(java jest 使用[入门])

- - zzm
elasticsearch简称ES. 好吧下面我介绍下jest(第三方工具),个人认为还是非常不错的...想对ES用来更好,多多研究源代码吧...迟点,会写一些关于ES的源代码研究文章,现在暂时还是入门的阶段.哈..(不敢,不敢). 它是ES的java客户端,基于http restful.... jest是开源的....其他就不清楚了,看源代码吧..哈..

基于Nutch+Hadoop+Hbase+ElasticSearch的网络爬虫及搜索引擎

- - zzm
网络爬虫架构在Nutch+Hadoop之上,是一个典型的分布式离线批量处理架构,有非常优异的吞吐量和抓取性能并提供了大量的配置定制选项. 由于网络爬虫只负责网络资源的抓取,所以,需要一个分布式搜索引擎,用来对网络爬虫抓取到的网络资源进行实时的索引和搜索. 搜 索引擎架构在ElasticSearch之上,是一个典型的分布式在线实时交互查询架构,无单点故障,高伸缩、高可用.

elasticsearch更改mapping(不停服务重建索引)

- - zzm
Elasticsearch的mapping一旦创建,只能增加字段,而不能修改已经mapping的字段. 但现实往往并非如此啊,有时增加一个字段,就好像打了一个补丁,一个可以,但是越补越多,最后自己都觉得惨不忍睹了. 这里有一个方法修改mapping,那就是重新建立一个index,然后创建一个新的mapping.