使用logstash同步至ES的几个坑 - 一位帅气的网友的个人空间 - OSCHINA - 中文开源技术交流社区

标签: | 发表时间:2021-04-26 15:52 | 作者:
出处:https://my.oschina.net

1.前言

记录使用logstash从sqlserver同步数据到ES中遇到的几点问题。使用的版本是es6.8.3+logstash6.8.3

2.logstash配置文件

2.1input

      input {
    jdbc {
        jdbc_driver_library => "/usr/local/logstash-6.8.3/logstashconfs/sqljdbc4.jar"#sqlserver的驱动jar包jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        jdbc_connection_string => "jdbc:sqlserver://192.168.1.101:1433;databaseName=test;"
        jdbc_user => "sa"
        jdbc_password => "123456"
        jdbc_default_timezone => "Asia/Shanghai"
		jdbc_paging_enabled => "true"#分页record_last_run => true#记录上一次运行的值use_column_value => true#使用数据库中的字段追踪tracking_column => "update_time"#追踪的字段名称tracking_column_type => "timestamp"#追踪的字段类型last_run_metadata_path => "/usr/local/logstash-6.8.3/logstashconfs/sync-logs/consumer_statistics_update_time"#上一次运行的值存储的文件地址clean_run => false#使用数据库中的字段追踪statement => "SELECT * FROM v_test WHERE update_time>:sql_last_value and update_time<GETDATE() "#sql语句schedule => "*/5 * * * * *"#每5s执行一次}
}
  • statement

由于要查的数据是表关联的数据,一开始想的是建立多个jdbc,把数据存到es的不同的索引中,利用父子文档进行关联查询,

后来发现这种办法效率差,而且影响ES的性能,所以解决办法就是在sqlserver中建立好多表联查好的视图,

这里的 statement中的v_test就是创建好的视图.

由于设置了Logstash 增量更新, 必须要使用 update_time>:sql_last_value and update_time<GETDATE()这种限制条件,这样才可以保证数据不丢失也不重复

具体原因见: 如何使用 Logstash 实现关系型数据库与 ElasticSearch 之间的数据同步

  • schedule

网上的很多教程都说最小间隔是1min,实际上是可以做到秒级的.

schedule => "*/5 * * * * *"只要在前面再加一个* 单位就是秒,这里就是每5s执行一次

2.2filter

      filter {
	if ![test]{ruby{code =>'event.set("test","")'}}	
	mutate{
		convert => { "id" => "integer" }
		remove_field => ["@timestamp"]
		remove_field => ["@version"]
	}
}

这里主要是对从sqlserver数据库查出来的数据进行一些处理,我这里删去了大多数的内容,仅保留一些代表性的.

  • if ![test]{ruby{code =>'event.set("test","")'}}

这个的意思是 test字段为null时,使用ruby这个语言进行处理, code =>''这里面就是写代码的

event.set("test","")意思就是 设置test字段的内容为""

当然我们也可以先 event.get("test"),获取test字段的内容,然后在进行一系列处理后,再 event.set,这样就可以保存处理后的字段的值

ruby语言的具体语法可以参考这个: Ruby教程

  • convert => { "id" => "integer" }

这个的意思就是将id字段的类型转化为integer,如果某个字段是时间类型可以转化为timestamp类型

2.3output

      output {
		elasticsearch {
			hosts => ["htkj101:9200","htkj102:9200","htkj103:9200"]
			index => "consumer_statistics"#索引名称document_id => "%{id}"#索引的iddocument_type => "consumer_statistics"#索引的type,这个在6.x版本以后就已经被废弃,可以忽略这个template_name => "consumer_statistics"#索引模板的名称}
}
  • document_id => "%{id}"

文档的id就是导入数据的id,这样设置可以实现幂等性

  • template_name => "consumer_statistics"

索引模板的名称 consumer_statistics,ES会调用模板名称为 consumer_statistics创建索引.

当然前提是你得先创建好这个模板

3.索引模板的创建

  • 指令

      curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d '在这里输入你创建的模板'
  • 模板

      {
	"template": "consumer_statistics",
	"order": 2,
	"version": 60001,
	"index_patterns": ["consumer_statistics"],
	"settings": {
		"index": {
			"refresh_interval": "5s",
			"max_result_window": "2147483647"#设置from+size的最大值}
	},
	"mappings": {
		"_default_": {
			"dynamic_templates": [{
				"message_field": {
					"path_match": "message",
					"mapping": {
						"norms": false,
						"type": "text"
					},
					"match_mapping_type": "string"
				}
			}, {
				"string_fields": {
					"mapping": {
						"norms": false,
						"type": "text",
						"fields": {
							"keyword": {
								"ignore_above": 1024,#设置不被索引的字段长度"type": "keyword"
							}
						}
					},
					"match_mapping_type": "string",
					"match": "*"
				}
			}],
			"properties": {
				"@timestamp": {
					"type": "date"
				},
				"geoip": {
					"dynamic": true,
					"properties": {
						"ip": {
							"type": "ip"
						},
						"latitude": {
							"type": "half_float"
						},
						"location": {
							"type": "geo_point"
						},
						"longitude": {
							"type": "half_float"
						}
					}
				},
				"@version": {
					"type": "keyword"
				}
			}
		}
	},
	"aliases": {}
}
  • "max_result_window": "2147483647"

在业务处理的过程中往往需要分页,ES的JAVA-API是通过from,size来设置分页数量和每页的数量,

在默认的情况下from+size必须要小于10000,但是如果实际需求大于10000,则必须在这里设置

我这里设置的是 max_result_window的最大值,实际情况中不需要设置如此之大,

因为ES会在内存中进行排序,如果一次返回的结果过大,可能会导致服务宕机.

  • "ignore_above": 1024

这里默认是256,意思是如果某一个字段的内容超过256字节的话,那么将不会被索引.

也就是说从ES中是能够看到这条数据的存在,但是如果你指定查询条件,是查不出来的.

举个例子,现在ES中有id,test两个字段,一共100条数据

test字段中只有一条数据超过了256字节,现在我查询test字段中包含"1"的数据,

即使这个超过256字节的数据含有1,但是也不会被查询到.

为了能够让他被索引到,这里将256改成1024,即只有超过1024字节才会不被索引.

  • 完整命令
      curl -H "Content-Type: application/json" -XPUT http://htkj101:9200/_template/consumer_statistics -d '
{"template":"consumer_statistics","order":2,"version":60001,"index_patterns":["consumer_statistics"],"settings":{"index":{"refresh_interval":"5s","max_result_window":"2147483647"}},"mappings":{"_default_":{"dynamic_templates":[{"message_field":{"path_match":"message","mapping":{"norms":false,"type":"text"},"match_mapping_type":"string"}},{"string_fields":{"mapping":{"norms":false,"type":"text","fields":{"keyword":{"ignore_above":1024,"type":"keyword"}}},"match_mapping_type":"string","match":"*"}}],"properties":{"@timestamp":{"type":"date"},"geoip":{"dynamic":true,"properties":{"ip":{"type":"ip"},"latitude":{"type":"half_float"},"location":{"type":"geo_point"},"longitude":{"type":"half_float"}}},"@version":{"type":"keyword"}}}},"aliases":{}}'

在创建模板的过程中,发现总是创建失败,后来发现弄成这样的两行,就不会出错了.

相关 [logstash 同步 es] 推荐:

MYSQL logstash 同步数据到es的几种方案对比以及每种方案数据丢失原因分析。

- -
MYSQL logstash 同步增量数据到ES. 最近一段时间,在使用mysql通过logstash-jdbc同步数据到es,但是总是会有一定程度数据丢失. logstash-jdbc无非是通过sql遍历数据表的所有数据,然后同步到es. 对于表里面的所有字段都需要查出来然后同步到es中去. 数据同步脚本分为全量同步与增量同步.

使用logstash同步至ES的几个坑 - 一位帅气的网友的个人空间 - OSCHINA - 中文开源技术交流社区

- -
记录使用logstash从sqlserver同步数据到ES中遇到的几点问题. 使用的版本是es6.8.3+logstash6.8.3. jdbc_driver_library => "/usr/local/logstash-6.8.3/logstashconfs/sqljdbc4.jar"#sqlserver的驱动jar包jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver".

使用logstash同步mysql 多表数据到ElasticSearch实践 - 三度 - 博客园

- -
参考样式即可,具体使用配置参数根据实际情况而定. jdbc_connection_string => "jdbc:mysql://localhost/数据库名". jdbc_driver_library => "mysql-connector-java-5.1.45-bin.jar所在位置". type => "数据库表名1".

MySQL如何实时同步数据到ES?试试这款阿里开源的神器!

- - 掘金后端本月最热
SpringBoot实战电商项目mall(40k+star)地址:. mall项目中的商品搜索功能,一直都没有做实时数据同步. 最近发现阿里巴巴开源的 canal可以把MySQL中的数据实时同步到Elasticsearch中,能很好地解决数据同步问题. 今天我们来讲讲 canal的使用,希望对大家有所帮助.

ES优化总结

- - 非技术 - ITeye博客
最近一直在研究ES集群,也看了很多篇前辈们总结的博客,同事借鉴了官方给出的一些建议,做了一下几点总结,希望对后来者有用:. 为了防止ES进程的内存被置换到磁盘上(会导致在检索的时候发生内存交换导致检索速度迟缓)引起性能急速下降. 候可以把config/elasticsearch.yml中的bootstrap.mlockall设置为true就可以了.

es的连接查询

- - 行业应用 - ITeye博客
在一般的关系型数据库中,都支持连接操作. 在ES这种分布式方案中进行连接操作,代价是十分昂贵的. 不过ES也提供了相类似的操作,支持水平任意扩展,实现连接的效果. 其他内容, 参考Elasticsearch官方指南整理. 在ES中支持两种连接方式:嵌套查询 和 has_child、has_parent父子查询.

ES性能优化总结

- - 互联网 - ITeye博客
    Elasticsearch是目前大数据领域最热门的技术栈之一,经过近8年的发展,已从0.0.X版升级至6.X版本,虽然增加了很多的特性和功能,但是在主体架构上,还是没有太多的变化. 下面就把我对于ES使用实践的一些经验总结一下,供大家参考;也请大家拍砖. 如果有条件,尽可能使用SSD硬盘, 不错的CPU.

es近实时搜索原理

- - 企业架构 - ITeye博客
 随着按段(per-segment)搜索的发展, 一个新的文档从索引到可被搜索的延迟显著降低了. 新文档在几分钟之内即可被检索,但这样还是不够快.  提交(Commiting)一个新的段到磁盘需要一个 . fsync 来确保段被物理性地写入磁盘,这样在断电的时候就不会丢失数据. 但是  fsync 操作代价很大; 如果每次索引一个文档都去执行一次的话会造成很大的性能问题.

ELK(ElasticSearch, Logstash, Kibana)搭建实时日志分析平台

- - 编程语言 - ITeye博客
在搜索ELK资料的时候,发现这篇文章比较好,于是摘抄一小段:. 以下内容来自: http://baidu.blog.51cto.com/71938/1676798. 日志主要包括系统日志、应用程序日志和安全日志. 系统运维和开发人员可以通过日志了解服务器软硬件信息、检查配置过程中的错误及错误发生的原因.

Logstash 参考指南(多个管道) - 风继续吹 - SegmentFault 思否

- -
如果需要在同一进程中运行多个管道,Logstash提供了一种通过名为. pipelines.yml的配置文件完成此操作的方法,这个文件必须放在. path.settings文件夹,并遵循此结构:. 该文件在YAML中格式化并包含一个字典列表,其中每个字典描述一个管道,每个键/值对指定该管道的设置. 这个示例展示了两个不同的管道,它们由ID和配置路径描述,对于第一个管道,.