elasticsearch和filebeat学习笔记

标签: | 发表时间:2019-08-01 10:52 | 作者:
出处:https://blog.zhangyu.so

elasticsearch安装、维护以及Filebeat module编写相关的笔记,备忘。

安装、配置

CentOS

    rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
echo '[elasticsearch-5.x]
name=Elasticsearch repository for 5.x packages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md' > /etc/yum.repos.d/elasticsearch.repo
yum install elasticsearch # elasticsearch
yum install filebeat # filebeat

ElasticSearch

ES QueryString

http://127.0.0.1:9200/logstash-2017.10.17/testlog/_search?q=

  1. 全文检索: q=first
  2. 单字段全文检索:q=user:prismcdn
  3. 单字段精确检索:q=user:”prismcdn”
  4. 多个检索条件的组合:NOT、AND、OR、(、),如q=user:(“prismcdn” OR “hello”) AND NOT mesg:first
  5. 字段是否存在:q=_exists_:user,q=_missing_:user
  6. 通配符:单字符?,任意字符*,q=user:pri?m*
  7. 正则:q=mesg:/mes{2}ages?/,正则性能不佳,尽量避免使用
  8. 近似搜索:~标识单词可能有一两个字母写的不对,请ES按照相似度返回结果,如q=user:first~
  9. 范围搜索:对数值和时间都可以使用,[]表示端点数值包含在范围内,{}表示端点数值不包含在范围内,如q=rtt:>300,q=date:[“now-6h” TO “now”}

操作Index

  1. 删除某一个指定的索引
          # 将删除prismcdn/xnode这个module的所有索引字段
    curl -X DELETE http://es-1.prismcdn.internal:9200/_ingest/pipeline/filebeat-5.6.3-prismcdn-xnode-pipeline
    

ElasticSearch的集群化

1. 配置集群自发现

在/etc/elasticsearch/elasticsearch.yml中加入一行配置就可以让你的机器被集群发现。

    discovery.zen.ping.unicast.hosts: ["172.16.1.40", "172.16.1.44"]

正常情况下,还需要配置一下最少nodes数量,用于产生master

    discovery.zen.minimum_master_nodes: 2

2. 冷热数据分离

这里的热数据往往指一天以内的数据,因为Elasticsearch通常按天建立索引,所以我们会把当天的数据作为热数据,保存在SSD这样的快速存储中,以提高写入速度。而一天前的数据则迁移到普通大容量硬盘上,作为冷数据长期保存。

  1. 给SSD和SAS磁盘的节点分别设置“hot”和“stale”的tag,/etc/elasticsearch/elasticsearch.yml中添加设置项 node.attr.tag: hot stale
  2. 给index template设置默认的tag要求为hot,这里我们用filebeat,所以在filebeat的配置文件中指定,方法如下:

    /etc/filebeat/filebeat.yml setup.template.settings: index.routing.allocation.require.tag: hot

  3. 查看index template的设置是否正确
    curl http://es-1.prismcdn.internal:9200/_template python -m json.tool
  4. 如果存在问题,删除索引模板,重新创建

    curl -X DELETE http://es-1.prismcdn.internal:9200/_template/filebeat-6.0.1

  5. 已经创建的索引不会被自动修改,可以将其删掉

    curl -X DELETE http://es-1.prismcdn.internal:9200/filebeat-*

  6. 配置定时任务,将前一天的索引的tag由hot改为stale

    time=`date -d last-day “+%Y.%m.%d”` curl -X PUT -H “Content-Type:application/json” \ http://es-1.prismcdn.internal:9200/*-${time}/_settings?pretty -d’ { “index.routing.allocation.require.tag”: “stale” }’

FileBeat

以json格式推送log给ES

    # /etc/filebeat/filebeat.yml

filebeat.prospectors:

- input_type: log

  paths:
    - /opt/prismcdn/erepd/erep.log

  document_type: erep
  json.keys_under_root: true
  json.add_error_key: true

- input_type: log

  paths:
    - /opt/prismcdn/report-srv/logs/trep-*.log

  document_type: trep
  json.keys_under_root: true # json格式,字段添加到root
  json.add_error_key: true

processors:
- drop_fields:
    fields: ["pp.A"] # 不发送pp.A字段

output.elasticsearch:
  hosts: ["es-1.prismcdn.internal:9200"]

处理自定义的文件数据

创建自定义的filebeat module

  1. 安装virtualenv, pip install virtualenv
  2. clone beats的工程,切换到filebeat目录,执行 make create-fileset
  3. 输入module名字、fileset名字
  4. 配置config/xnode.yml,如:
           type: log
    paths:
    \{\{ range $i, $path := .paths }}
    - \{\{$path}}
    \{\{ end }}
    exclude_files: [".gz$"]
    exclude_lines: []
    
  5. 配置ingest/pipeline.json,如:
           {
       "description": "Pipeline for parsing  xnode logs",
       "processors": [{
           "grok": {
               "field": "message",
               "patterns": [
                   "\\[%{DATA:xnode.time}\\]\\[%{DATA:xnode.level}\\]<%{DATA:xnode.type}> %{GREEDYDATA:xnode.message}"
               ],
               "ignore_missing": true
           }
       }, {
           "remove": {
               "field": "message"
           }
       }, {
           "date": {
               "field": "xnode.time",
               "target_field": "@timestamp",
               "formats": ["yyyy-MM-dd'T'HH:mm:ss.SSSZ"]
           }
       }, {
           "remove": {
               "field": "xnode.time"
           }
       }],
       "on_failure" : [{
           "set" : {
               "field" : "error.message",
               "value" : ""
           }
       }]
    }
    

相对复杂的例子可以见nginx access log的

    {
  "description": "Pipeline for parsing Nginx access logs. Requires the geoip and user_agent plugins.",
  "processors": [{
    "grok": {
      "field": "message",
      "patterns":[
        "\"?%{IP_LIST:nginx.access.remote_ip_list} - %{DATA:nginx.access.user_name} \\[%{HTTPDATE:nginx.access.time}\\] \"%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}\" %{NUMBER:nginx.access.response_code} %{NUMBER:nginx.access.body_sent.bytes} \"%{DATA:nginx.access.referrer}\" \"%{DATA:nginx.access.agent}\""
        ],
      "pattern_definitions": {
        "IP_LIST": "%{IP}(\"?,?\\s*%{IP})*"
      },
      "ignore_missing": true
    }
  }, {
    "split": {
      "field": "nginx.access.remote_ip_list",
      "separator": "\"?,?\\s+"
    }
  }, {
    "script": {
      "lang": "painless",
      "inline": "boolean isPrivate(def ip) { try { StringTokenizer tok = new StringTokenizer(ip, '.'); int firstByte = Integer.parseInt(tok.nextToken());      int secondByte = Integer.parseInt(tok.nextToken());      if (firstByte == 10) {        return true;      }      if (firstByte == 192 && secondByte == 168) {        return true;      }      if (firstByte == 172 && secondByte >= 16 && secondByte <= 31) {        return true;      }      if (firstByte == 127) {        return true;      }      return false;    } catch (Exception e) {      return false;    }  }  def found = false;  for (def item : ctx.nginx.access.remote_ip_list) {    if (!isPrivate(item)) {      ctx.nginx.access.remote_ip = item;      found = true;      break;    }  }  if (!found) {    ctx.nginx.access.remote_ip = ctx.nginx.access.remote_ip_list[0];  }"
      }
  }, {
    "remove":{
      "field": "message"
    }
  }, {
    "rename": {
      "field": "@timestamp",
      "target_field": "read_timestamp"
    }
  }, {
    "date": {
      "field": "nginx.access.time",
      "target_field": "@timestamp",
      "formats": ["dd/MMM/YYYY:H:m:s Z"]
    }
  }, {
    "remove": {
      "field": "nginx.access.time"
    }
  }, {
    "user_agent": {
      "field": "nginx.access.agent",
      "target_field": "nginx.access.user_agent"
    }
  }, {
    "remove": {
      "field": "nginx.access.agent"
    }
  }, {
    "geoip": {
      "field": "nginx.access.remote_ip",
      "target_field": "nginx.access.geoip"
    }
  }],
  "on_failure" : [{
    "set" : {
      "field" : "error.message",
      "value" : ""
    }
  }]
}
  • 关于ingest pipeline参见https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html
  • grok process见https://www.elastic.co/guide/en/elasticsearch/reference/5.6/grok-processor.html#grok-basics
  • grok内置的pattern可以查看 https://github.com/elastic/elasticsearch/blob/master/libs/grok/src/main/resources/patterns
  • 时间格式 https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match

Debug

    filebeat -e -d "*" -c xxx_config.yml

见  https://www.elastic.co/guide/en/beats/filebeat/current/enable-filebeat-debugging.html

注意 由于ingest中的字段定义不会自动更新,可以通过删除elasticsearch相应的index来重建。

    # 将删除prismcdn/xnode这个module的所有索引字段
curl -X DELETE http://es-1.prismcdn.internal:9200/_ingest/pipeline/filebeat-5.6.3-prismcdn-xnode-pipeline

Curator

安装

    rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
echo '
[curator-5]
name=CentOS/RHEL 7 repository for Elasticsearch Curator 5.x packages
baseurl=http://packages.elastic.co/curator/5/centos/7
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1' >> /etc/yum.repos.d/elasticsearch.repo
yum install elasticsearch-curator

配置curator.yml

    # host、port和timeout
client:
  hosts:
    - es-1.prismcdn.internal
  port: 9200
  url_prefix:
  use_ssl: False
  certificate:
  client_cert:
  client_key:
  ssl_no_validate: False
  http_auth:
  timeout: 3600
  master_only: False

logging:
  loglevel: INFO
  logfile:
  logformat: default
  blacklist: ['elasticsearch', 'urllib3']

删除旧数据

    # delete indices older than 14days
/usr/bin/curator_cli --config /opt/prismcdn/curator.yml delete_indices --filter_list \
'[{"filtertype":"age","source":"creation_date","direction":"older","unit":"days","unit_count":14},{"filtertype":"pattern","kind":"prefix","value":"filebeat"}]' \
--ignore_empty_list

合并数据

    # merge indices older than 1 day
/usr/bin/curator_cli --config /opt/prismcdn/curator.yml forcemerge --filter_list \
'[{"filtertype":"age","source":"creation_date","direction":"older","unit":"days","unit_count":1},{"filtertype":"pattern","kind":"prefix","value":"filebeat"}]' \
--ignore_empty_list --max_num_segments 1

关闭索引

    # close indices older than 7 days
/usr/bin/curator_cli --config /opt/prismcdn/curator.yml close --filter_list \
'[{"filtertype":"age","source":"creation_date","direction":"older","unit":"days","unit_count":7},{"filtertype":"pattern","kind":"prefix","value":"filebeat"}]' \
--ignore_empty_list --delete_aliases

监控

https://elkguide.elasticsearch.cn/elasticsearch/monitor/api/health.html

FAQ

  • filebeat报mapper_parsing_exceptionc 400错误

检查一下index template是否需要更新,可以删除老的index template,让filebeat自动调用创建index template步骤

  • kibana更新index fields时报FORBIDDEN/12/index read-only / allow delete (api)

在kibana的dev tools中执行

     PUT .kibana/_settings
{
    "index": {
        "blocks": {
            "read_only_allow_delete": "false"
        }
    }
}

Reference

  1. http://www.jianshu.com/p/f13a6dbb84ed
  2. https://elkguide.elasticsearch.cn/

相关 [elasticsearch filebeat 学习] 推荐:

elasticsearch和filebeat学习笔记

- -
elasticsearch安装、维护以及Filebeat module编写相关的笔记,备忘. 全文检索: q=first. 单字段全文检索:q=user:prismcdn. 单字段精确检索:q=user:”prismcdn”. 多个检索条件的组合:NOT、AND、OR、(、),如q=user:(“prismcdn” OR “hello”) AND NOT mesg:first.

Filebeat + Elasticsearch + Kibana 轻量日志收集与展示系统

- - wzyboy’s blog
有个段子是说现在创业公司招人的如果说自己是「大数据」(Big Data),意思其实是说他们会把日志收集上来,但是从来不看. 段子归段子,近些年所谓「微服务」「容器化」等「热门技术」的发展,的确促进了日志收集等技术的发展. 而 ELK ( Elasticsearch +. Kibana) 也不再是日志收集与展示系统的铁三角了.

filebeat使用elasticsearch的pipeline处理日志内容 | 阿小信的博客

- -
以前使用Logstash时,都是通过logstash来对日志内容做过滤解析等操作,现在6.3.0版本中,可以通过filebeat直接写数据到es中,要对日志内容做处理的话设置对应的pipeline就可以. 以gunicorn的access日志内容为例:. 有以上内容的日志,记录请求发生的时间,发起请求的ip,referer,useragent,status_line, status_code, 进程id, 请求执行时间.

Filebeat Multiline

- - 让一切随风
Filebeat获取的文件可能包含跨多行文本的消息,例如,多行消息在包含Java堆栈跟踪的文件中很常见. 为了正确处理这些多行事件,你需要在filebeat.yml中配置multiline以指定哪一行是单个事件的一部分. 在filebeat.yml的filebeat.inputs区域指定怎样处理跨多行的消息.

Elasticsearch 学习笔记

- - 研发管理 - ITeye博客
安装  Elasticsearch. 1:解压下载的安装包 elasticsearch-1.7.2.zip. 修改  node.name: es(集群状态名字一致). 2:在https://github.com/elasticsearch/elasticsearch-servicewrapper下载该插件后,解压缩.

Filebeat 的 Registry 文件解读

- - IT瘾-dev
你可能没有注意但很重要的filebeat小知识. Filebeat会将自己处理日志文件的进度信息写入到registry文件中,以保证filebeat在重启之后能够接着处理未处理过的数据,而无需从头开始. registry文件内容为一个list,list里的每个元素都是一个字典,字典的格式如下:. source:记录采集日志的完整路径.

利用kibana学习 elasticsearch restful api (DSL) - Ruthless - 博客园

- -
利用kibana学习 elasticsearch restful api (DSL). 1、了解elasticsearch基本概念. PUT 创建索引,eg:PUT /movie_index 新建movie_index索引. GET 用于检索数据,eg:GET movie_index/movie/1.

日志实时收集之FileBeat+Kafka

- - lxw的大数据田地
之前,我们的某一个业务用于实时日志收集处理的架构大概是这样的:. 在日志的产生端(LogServer服务器),都部署了FlumeAgent,实时监控产生的日志,然后发送至Kafka. 经过观察,每一个FlumeAgent都占用了较大的系统资源(至少会占用一颗CPU 50%以上的资源). 而另外一个业务,LogServer压力大,CPU资源尤其紧张,如果要实时收集分析日志,那么就需要一个更轻量级、占用资源更少的日志收集框架,于是我试用了一下Filebeat.

替代ELK:ClickHouse+Kafka+FileBeat才是最绝的

- -
saas 服务未来会面临数据安全、合规等问题. 公司的业务需要沉淀一套私有化部署能力,帮助业务提升行业竞争力. 为了完善平台系统能力、我们需要沉淀一套数据体系帮助运营分析活动效果、提升运营能力. 然而在实际的开发过程中,如果直接部署一套大数据体系,对于使用者来说将是一笔比较大的服务器开销. 为此我们选用折中方案完善数据分析能力.

[译]elasticsearch mapping

- - an74520的专栏
es的mapping设置很关键,mapping设置不到位可能导致索引重建. 请看下面各个类型介绍^_^. 每一个JSON字段可以被映射到一个特定的核心类型. JSON本身已经为我们提供了一些输入,支持 string,  integer/ long,  float/ double,  boolean, and  null..