Debezium 实现 MySQL 到 Elasticsearch 高效实时同步

标签: dev | 发表时间:2019-02-26 00:00 | 作者:
出处:http://itindex.net/relian

题记

来自Elasticsearch中文社区的问题——

MySQL中表无唯一递增字段,也无唯一递增时间字段,该怎么使用logstash实现MySQL实时增量导数据到es中?

logstash和kafka_connector都仅支持基于自增id或者时间戳更新的方式 增量同步数据。

回到问题本身:如果库表里没有相关字段,该如何处理呢?

本文给出相关探讨和解决方案。

1、 binlog认知

1.1 啥是 binlog?

binlog是Mysql sever层维护的一种 二进制日志,与innodb引擎中的redo/undo log是完全不同的日志;其主要是用来记录对mysql数据更新或潜在发生更新的SQL语句,并以"事务"的形式保存在磁盘中。

作用主要有:

  • 1)复制:达到master-slave数据一致的目的。

  • 2)数据恢复:通过mysqlbinlog工具恢复数据。

  • 3) 增量备份

1.2 阿里的Canal实现了增量Mysql同步


一图胜千言,canal是用java开发的基于数据库增量日志解析、提供增量数据订阅&消费的中间件。

目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。目的: 增量数据订阅&消费

综上,使用binlog可以突破logstash或者kafka-connector没有自增id或者没有时间戳字段的限制,实现增量同步。

2、基于binlog的同步方式

1)基于kafka Connect的Debezium 开源工程,地址:. https://debezium.io/

2)不依赖第三方的独立应用: Maxwell开源项目,地址:http://maxwells-daemon.io/

由于已经部署过conluent(kafka的企业版本,自带zookeeper、kafka、ksql、kafka-connector等),本文仅针对Debezium展开。

3、Debezium介绍

Debezium是捕获数据实时动态变化的开源的分布式同步平台。能实时捕获到数据源(Mysql、Mongo、PostgreSql)的:新增(inserts)、更新(updates)、删除(deletes)操作,实时同步到Kafka,稳定性强且速度非常快。

特点:

  • 1)简单。无需修改应用程序。可对外提供服务。

  • 2)稳定。持续跟踪每一行的每一处变动。

  • 3)快速。构建于kafka之上,可扩展,经官方验证可处理大容量的数据。

4、同步架构

如图,Mysql到ES的同步策略,采取“曲线救国”机制。

步骤1:基Debezium的binlog机制,将Mysql数据同步到Kafka。

步骤2:基于Kafka_connector机制,将kafka数据同步到Elasticsearch。

5、Debezium实现Mysql到ES增删改实时同步

软件版本:

confluent:5.1.2;
Debezium:0.9.2_Final;
Mysql:5.7.x.
Elasticsearch:6.6.1

5.1 Debezium安装

confluent的安装部署参见:http://t.cn/Ef5poZk,不再赘述。

Debezium的安装只需要把debezium-connector-mysql的压缩包解压放到Confluent的解压后的插件目录(share/java)中。

MySQL Connector plugin 压缩包的下载地址:

https://debezium.io/docs/install/

注意 重启一下confluent,以使得Debezium生效。

5.2 Mysql binlog等相关配置。

Debezium使用MySQL的binlog机制实现数据动态变化监测,所以需要Mysql提前配置binlog。

核心配置如下,在Mysql机器的/etc/my.cnf的mysqld下添加如下配置。

   

1[mysqld]
2
3server-id         = 223344
4log_bin           = mysql-bin
5binlog_format     = row
6binlog_row_image  = full
7expire_logs_days  = 10

然后,重启一下Mysql以使得binlog生效。

   

1systemctl start mysqld.service

5.3 配置connector连接器。

配置confluent路径目录 : /etc

创建文件夹命令 :

   

1mkdir kafka-connect-debezium

在mysql2kafka_debezium.json存放connector的配置信息 :

   

1[root@localhost kafka-connect-debezium]# cat mysql2kafka_debezium.json
2{ 
3        "name" : "debezium-mysql-source-0223",
4        "config":
5        {
6             "connector.class" : "io.debezium.connector.mysql.MySqlConnector",
7             "database.hostname" : "192.168.1.22",
8             "database.port" : "3306",
9             "database.user" : "root",
10             "database.password" : "XXXXXX",
11             "database.whitelist" : "kafka_base_db",
12             "table.whitlelist" : "accounts",
13             "database.server.id" : "223344",
14             "database.server.name" : "full",
15             "database.history.kafka.bootstrap.servers" : "192.168.1.22:9092",
16             "database.history.kafka.topic" : "account_topic",
17             "include.schema.changes" : "true" ,
18             "incrementing.column.name" : "id",
19             "database.history.skip.unparseable.ddl" : "true",
20             "transforms": "unwrap,changetopic",
21             "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
22             "transforms.changetopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
23             "transforms.changetopic.regex":"(.*)",
24             "transforms.changetopic.replacement":"$1-smt"
25        }
26}

注意如下配置:

  1. "database.server.id",对应Mysql中的server-id的配置。

  2. "database.whitelist" : 待同步的Mysql数据库名。

  3. "table.whitlelist" :待同步的Mysq表名。

  4. 重要:“database.history.kafka.topic”:存储数据库的Shcema的记录信息,而非写入数据的topic、

  5. "database.server.name":逻辑名称,每个connector确保唯一,作为写入数据的kafka topic的前缀名称。

坑一:transforms相关5行配置作用是写入数据格式转换。

如果没有,输入数据会包含:before、after记录修改前对比信息以及元数据信息(source,op,ts_ms等)。

这些信息在后续数据写入Elasticsearch是不需要的。(注意结合自己业务场景)。

格式转换相关原理:http://t.cn/EftoaIi

5.4 启动connector

   

1curl -X POST -H "Content-Type:application/json" 
2--data @mysql2kafka_debezium.json.json 
3http://192.168.1.22:18083/connectors | jq

5.5 验证写入是否成功。

5.5.1  查看kafka-topic
   

1    kafka-topics --list --zookeeper localhost:2181

此处会看到写入数据topic的信息。

注意新写入数据topic的格式:database.schema.table-smt 三部分组成。

本示例topic名称:

full.kafka_base_db.account-smt

5.5.2 消费数据验证写入是否正常
   

1./kafka-avro-console-consumer --topic full.kafka_base_db.account-smt --bootstrap-server 192.168.1.22:9092 --from-beginning

至此,Debezium实现mysql同步kafka完成。

6、kafka-connector实现kafka同步Elasticsearch

6.1、Kafka-connector介绍

见官网:https://docs.confluent.io/current/connect.html

Kafka Connect是一个用于连接Kafka与外部系统(如数据库,键值存储,检索系统索引和文件系统)的框架。 

连接器实现公共数据源数据(如Mysql、Mongo、Pgsql等)写入Kafka,或者Kafka数据写入目标数据库,也可以自己开发连接器。

6.2、kafka到ES connector同步配置

配置路径:

   

1/home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

配置内容:

   

1"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
2"tasks.max": "1",
3"topics": "full.kafka_base_db.account-smt",
4"key.ignore": "true",
5"connection.url": "http://192.168.1.22:9200",
6"type.name": "_doc",
7"name": "elasticsearch-sink-test"

6.3 kafka到ES启动connector

启动命令

   

1confluent load  elasticsearch-sink-test 
2-d /home/confluent-5.1.0/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

6.4 Kafka-connctor RESTFul API查看

Mysql2kafka,kafka2ES的connector详情信息可以借助postman或者浏览器或者命令行查看。

   

1curl -X GET http://localhost:8083/connectors

7、坑复盘。

坑2:同步的过程中可能出现错误,比如:kafka topic没法消费到数据。
排解思路如下:

  • 1)确认消费的topic是否是写入数据的topic;

  • 2)确认同步的过程中没有出错。可以借助connector如下命令查看。

1curl -X GET http://localhost:8083/connectors-xxx/status

坑3:Mysql2ES出现日期格式不能识别。

是Mysql jar包的问题,解决方案:在my.cnf中配置时区信息即可。

坑4:kafka2ES,ES没有写入数据。

排解思路:

  • 1)建议:先创建同topic名称一致的索引,注意:Mapping静态自定义,不要动态识别生成。

  • 2)通过connetor/status排查出错原因,一步步分析。

8、小结

  1. binlog的实现突破了字段的限制,实际上业界的go-mysql-elasticsearch已经实现。

  2. 对比:logstash、kafka-connector,虽然Debezium“曲线救国”两步实现了实时同步,但稳定性+实时性能相对不错。

  3. 推荐大家使用。大家有好的同步方式也欢迎留言讨论交流。

参考:
[1 ]http://t.cn/EftX2p8

[2]  http://t.cn/EftXJU6

[3]  http://t.cn/EftXO8c

[4] http://t.cn/EftXn9M

[5]  http://t.cn/EftXeOc


推荐阅读:

重磅 | 死磕Elasticsearch方法论认知清单(2019春节更新版)

Elasticsearch基础、进阶、实战第一公众号

相关 [debezium mysql elasticsearch] 推荐:

Debezium 实现 MySQL 到 Elasticsearch 高效实时同步

- - IT瘾-dev
来自Elasticsearch中文社区的问题——. MySQL中表无唯一递增字段,也无唯一递增时间字段,该怎么使用logstash实现MySQL实时增量导数据到es中. logstash和kafka_connector都仅支持基于自增id或者时间戳更新的方式 增量同步数据. 回到问题本身:如果库表里没有相关字段,该如何处理呢.

ElasticSearch 索引 VS MySQL 索引

- - crossoverJie's Blog
这段时间在维护产品的搜索功能,每次在管理台看到 elasticsearch 这么高效的查询效率我都很好奇他是如何做到的. 这甚至比在我本地使用 MySQL 通过主键的查询速度还快. 这类问题网上很多答案,大概意思呢如下:. Lucene 的全文检索引擎,它会对数据进行分词后保存索引,擅长管理大量的索引数据,相对于.

使用logstash同步mysql 多表数据到ElasticSearch实践 - 三度 - 博客园

- -
参考样式即可,具体使用配置参数根据实际情况而定. jdbc_connection_string => "jdbc:mysql://localhost/数据库名". jdbc_driver_library => "mysql-connector-java-5.1.45-bin.jar所在位置". type => "数据库表名1".

debezium 架构和常见使用场景

- -
Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台. 你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改. 只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back).

Flink CDC 核心:Debezium 1.9.0.Beta1 发布!

- - IT瘾-dev
我很高兴地宣布 Debezium  1.9.0.Beta1的发布. 此版本包括 Debezium Server 的许多新功能,包括 Knative Eventing 支持和使用 Redis 接收器的偏移存储管理、SQL Server 连接器的多分区缩放以及各种错误修复和改进. 总体而言,此版本已修复56 个问题.

Clickhouse 在日志存储与分析方面作为 ElasticSearch 和 MySQL 的替代方案

- -
2021年,Clickhouse 在日志存储与分析方面作为 ElasticSearch 和 MySQL 的替代方案. 原文作者:Anton Sidashin. 关于Clickhouse的文章,这段内容在互联网上仍然很流行,甚至被多次翻译. 现在已经过去两年多,同时 Clickhouse 的开发节奏.

CDC (捕获数据变化) Debezium 介绍 | 首席架构师

- -
Debezium是一个分布式平台,它将您现有的数据库转换为事件流,因此应用程序可以看到数据库中的每一个行级更改并立即做出响应. Debezium构建在Apache Kafka之上,并提供Kafka连接兼容的连接器来监视特定的数据库管理系统. Debezium在Kafka日志中记录数据更改的历史,您的应用程序将从这里使用它们.

数据同步工具之FlinkCDC/Canal/Debezium对比-技术圈

- -
数据准实时复制(CDC)是目前行内实时数据需求大量使用的技术,随着国产化的需求,我们也逐步考虑基于开源产品进行准实时数据同步工具的相关开发,逐步实现对商业产品的替代. 本文把市面上常见的几种开源产品,Canal、Debezium、Flink CDC 从原理和适用做了对比,供大家参考. 本文首发微信公众号《import_bigdata》.

使用 Kafka、Debezium 和 Kubernetes 实现应用现代化的模式

- - InfoQ - 促进软件开发领域知识与创新的传播
本文最初发表于 RedHat 的开发者站点,经原作者 Bilgin Ibryam 许可,由 InfoQ 中文站翻译分享. “我们建造计算机的方式与建造城市的方式是一样的,那就是随着时间的推移,依然毫无计划,并且要建造在废墟之上. Ellen Ullman 在 1998 年写下了这样一句话,但它今天依然适用于我们构建现代应用程序的方式,那就是,随着时间的推移,我们要在遗留的软件上构建应用,而且仅仅有短期的计划.

[译]elasticsearch mapping

- - an74520的专栏
es的mapping设置很关键,mapping设置不到位可能导致索引重建. 请看下面各个类型介绍^_^. 每一个JSON字段可以被映射到一个特定的核心类型. JSON本身已经为我们提供了一些输入,支持 string,  integer/ long,  float/ double,  boolean, and  null..