使用logstash同步至ES的几个坑 - 一位帅气的网友的个人空间 - OSCHINA - 中文开源技术交流社区
1.前言
记录使用logstash从sqlserver同步数据到ES中遇到的几点问题。使用的版本是es6.8.3+logstash6.8.3
2.logstash配置文件
2.1input
input {
jdbc {
jdbc_driver_library => "/usr/local/logstash-6.8.3/logstashconfs/sqljdbc4.jar"#sqlserver的驱动jar包jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.1.101:1433;databaseName=test;"
jdbc_user => "sa"
jdbc_password => "123456"
jdbc_default_timezone => "Asia/Shanghai"
jdbc_paging_enabled => "true"#分页record_last_run => true#记录上一次运行的值use_column_value => true#使用数据库中的字段追踪tracking_column => "update_time"#追踪的字段名称tracking_column_type => "timestamp"#追踪的字段类型last_run_metadata_path => "/usr/local/logstash-6.8.3/logstashconfs/sync-logs/consumer_statistics_update_time"#上一次运行的值存储的文件地址clean_run => false#使用数据库中的字段追踪statement => "SELECT * FROM v_test WHERE update_time>:sql_last_value and update_time<GETDATE() "#sql语句schedule => "*/5 * * * * *"#每5s执行一次}
}
-
statement
由于要查的数据是表关联的数据,一开始想的是建立多个jdbc,把数据存到es的不同的索引中,利用父子文档进行关联查询,
后来发现这种办法效率差,而且影响ES的性能,所以解决办法就是在sqlserver中建立好多表联查好的视图,
这里的 statement
中的v_test就是创建好的视图.
由于设置了Logstash 增量更新, 必须要使用 update_time>:sql_last_value and update_time<GETDATE()
这种限制条件,这样才可以保证数据不丢失也不重复
具体原因见: 如何使用 Logstash 实现关系型数据库与 ElasticSearch 之间的数据同步
-
schedule
网上的很多教程都说最小间隔是1min,实际上是可以做到秒级的.
schedule => "*/5 * * * * *"
只要在前面再加一个* 单位就是秒,这里就是每5s执行一次
2.2filter
filter {
if ![test]{ruby{code =>'event.set("test","")'}}
mutate{
convert => { "id" => "integer" }
remove_field => ["@timestamp"]
remove_field => ["@version"]
}
}
这里主要是对从sqlserver数据库查出来的数据进行一些处理,我这里删去了大多数的内容,仅保留一些代表性的.
-
if ![test]{ruby{code =>'event.set("test","")'}}
这个的意思是 test字段为null时,使用ruby这个语言进行处理, code =>''
这里面就是写代码的
event.set("test","")
意思就是 设置test字段的内容为""
当然我们也可以先 event.get("test")
,获取test字段的内容,然后在进行一系列处理后,再 event.set
,这样就可以保存处理后的字段的值
ruby语言的具体语法可以参考这个: Ruby教程
-
convert => { "id" => "integer" }
这个的意思就是将id字段的类型转化为integer,如果某个字段是时间类型可以转化为timestamp类型
2.3output
output {
elasticsearch {
hosts => ["htkj101:9200","htkj102:9200","htkj103:9200"]
index => "consumer_statistics"#索引名称document_id => "%{id}"#索引的iddocument_type => "consumer_statistics"#索引的type,这个在6.x版本以后就已经被废弃,可以忽略这个template_name => "consumer_statistics"#索引模板的名称}
}
-
document_id => "%{id}"
文档的id就是导入数据的id,这样设置可以实现幂等性
-
template_name => "consumer_statistics"
索引模板的名称 consumer_statistics
,ES会调用模板名称为 consumer_statistics
创建索引.
当然前提是你得先创建好这个模板
3.索引模板的创建
-
指令
curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d '在这里输入你创建的模板'
-
模板
{
"template": "consumer_statistics",
"order": 2,
"version": 60001,
"index_patterns": ["consumer_statistics"],
"settings": {
"index": {
"refresh_interval": "5s",
"max_result_window": "2147483647"#设置from+size的最大值}
},
"mappings": {
"_default_": {
"dynamic_templates": [{
"message_field": {
"path_match": "message",
"mapping": {
"norms": false,
"type": "text"
},
"match_mapping_type": "string"
}
}, {
"string_fields": {
"mapping": {
"norms": false,
"type": "text",
"fields": {
"keyword": {
"ignore_above": 1024,#设置不被索引的字段长度"type": "keyword"
}
}
},
"match_mapping_type": "string",
"match": "*"
}
}],
"properties": {
"@timestamp": {
"type": "date"
},
"geoip": {
"dynamic": true,
"properties": {
"ip": {
"type": "ip"
},
"latitude": {
"type": "half_float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "half_float"
}
}
},
"@version": {
"type": "keyword"
}
}
}
},
"aliases": {}
}
-
"max_result_window": "2147483647"
在业务处理的过程中往往需要分页,ES的JAVA-API是通过from,size来设置分页数量和每页的数量,
在默认的情况下from+size必须要小于10000,但是如果实际需求大于10000,则必须在这里设置
我这里设置的是 max_result_window
的最大值,实际情况中不需要设置如此之大,
因为ES会在内存中进行排序,如果一次返回的结果过大,可能会导致服务宕机.
-
"ignore_above": 1024
这里默认是256,意思是如果某一个字段的内容超过256字节的话,那么将不会被索引.
也就是说从ES中是能够看到这条数据的存在,但是如果你指定查询条件,是查不出来的.
举个例子,现在ES中有id,test两个字段,一共100条数据
test字段中只有一条数据超过了256字节,现在我查询test字段中包含"1"的数据,
即使这个超过256字节的数据含有1,但是也不会被查询到.
为了能够让他被索引到,这里将256改成1024,即只有超过1024字节才会不被索引.
- 完整命令
curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d '
{"template":"consumer_statistics","order":2,"version":60001,"index_patterns":["consumer_statistics"],"settings":{"index":{"refresh_interval":"5s","max_result_window":"2147483647"}},"mappings":{"_default_":{"dynamic_templates":[{"message_field":{"path_match":"message","mapping":{"norms":false,"type":"text"},"match_mapping_type":"string"}},{"string_fields":{"mapping":{"norms":false,"type":"text","fields":{"keyword":{"ignore_above":1024,"type":"keyword"}}},"match_mapping_type":"string","match":"*"}}],"properties":{"@timestamp":{"type":"date"},"geoip":{"dynamic":true,"properties":{"ip":{"type":"ip"},"latitude":{"type":"half_float"},"location":{"type":"geo_point"},"longitude":{"type":"half_float"}}},"@version":{"type":"keyword"}}}},"aliases":{}}'
在创建模板的过程中,发现总是创建失败,后来发现弄成这样的两行,就不会出错了.