MySQL binlog 增量数据解析服务 - 简书

标签: | 发表时间:2018-09-28 09:56 | 作者:
出处:https://www.jianshu.com

MySQL binlog 增量数据解析服务

介绍如何借助于 MySQL 的 Master-slave 协议实现 MySQL 增量数据获取服务

1. 起因

做过后端开发的同学都知道, 经常会遇到如下场景:

  1. 后端程序根据业务逻辑, 更新数据库记录
  2. 过了几天, 业务需求需要更新搜索索引
  3. 又过了几天, 随着数据需求方的增多, 结构改成发送数据到消息中间件(例如 Kafka), 其他系统自行从消息中间件订阅数据
传统程序结构

所有涉及到类似需求的代码中都写了各种发送消息中间件的代码, 冗余, 易错, 而且难以保证一致性. 那么问题来了:

数据都在 MySQL 中, 是否可以实现仅仅更新 MySQL 就实现数据更新和发布逻辑?

2. Linkedin Databus

最早我听说的解决方案是 Linkedin 实现的, 参见

核心思路就是通过数据库的 binary log(简称: binlog) 来实现数据库更新的自动获取. Linkedin 自己实现了 MySQL 版本和 Oracle 版本。

3. 原理

以 MySQL 为例, 数据库为了主从复制结构和容灾,都会有一份提交日志 (commit log),通过解析这份日志,理论上说可以获取到每次数据库的数据更新操作。获取到这份日志有两种方式:

  1. 在 MySQL server 上通过外部程序监听磁盘上的 binlog 日志文件
  2. 借助于 MySQL 的 Master-Slave 结构,使用程序伪装成一个单独的 Slave,通过网络获取到 MySQL 的binlog 日志流

这里有一个注意的点: MySQL 的 binlog 支持三种格式: StatementRowMixed格式:

  • Statement格式就是说日志中记录 Master 执行的 SQL
  • Row格式就是说每次讲更改的数据记录到日志中
  • Mixed格式就是让 Master 自主决定是使用 Row还是 Statement格式

由于伪装成 Slave 的解析程序很难像 MySQL slave 一样通过 Master 执行的 SQL 来获取数据更新,因此要将 MySQL Master 的 binlog 格式调整成 Row格式才方便实现数据更新获取服务

至于 Oracle 的实现,我厂没用 Oracle。。。。

4. 数据增量同步服务拆解

好了, 如果想自己写一个 Databus 服务, 就需要如下几个核心模块:

  • 4.1、MySQL binlog 解析类库
  • 4.2、部署方式
  • 4.3、binlog 状态维护模块
  • 4.4、消息中间件(大多数人会选择 Kafka 吧)
  • 4.5、数据发布策略
  • 4.6、数据序列化方式
    • 将获取到的 binlog 序列化成其他可识别格式
    • AVRO、protocol buffer、JSON,哪个喜欢选哪个,但注意跨平台,别用 Java 原生的序列化 =.=|||
  • 4.7、集群管理服务
  • 4.8、服务监控

4.1、协议解析可选方案

时至今日, 已经有很多大厂开源了自己的 MySQL binlog 解析方案,Java 语言可选的有:

想自己造轮子实现协议的,也可以参考 MySQL 官方文档

4.2、部署方式

由于 binlog 可以通过网络协议获取,也可以直接通过读取磁盘上的 binlog 文件获取, 因此同步服务就有两种部署方式:

  • 通过读取 binlog 文件的话, 就要跟 MySQL Master 部署到同一台服务器
    • 系统隔离性不好,高峰期会不会跟 MySQL master 争抢系统资源
    • 类似 AWS RDS 这种云数据库服务,不允许部署程序到 RDS 节点
  • 通过 relay-log 协议通过网络读取,同步服务就方便部署到任意地方
部署方式

4.3、binlog 状态维护模块

在 MySQL 中, Master-slave 之间只用标识:

  1. serverId:master一般设置为1, 各个 server 之间必须不同
  2. binlog 文件名称:当前读取到了哪一个 binlog 文件
  3. binlog position:当前读取的 binlog 文件的位置

由于同步服务会重启,因此必须自行维护 binlog 的状态。一般存储到 MySQL 或者 Zookeeper 中。当服务重启后,自动根据存储的 binlog 位置,继续同步数据。

4.4、消息中间件可选方案

虽然现在 Kafka 如日中天,大多数情况下大家都会选择 Kafka 作为消息中间件缓冲数据。选择其他的消息中间件也未尝不可。 但有一点注意:

  • MySQL 中的数据更新是有顺序的
  • 数据更新发布到消息中间件中,也建议能够保序,例如事务中经典的转账的例子,试想一下如果消息队列不保序, 其他数据服务消费到不保序的数据是否还能满足业务需求

由于上诉原因,类似 AWS SQS 这样的消息队列就不满足此处对消息队列的需求(参见: AWS SQS 官方文档关于保序方面的解释

4.5、数据发布策略

解析到了数据,现在要做的就是将数据发布到消息中间件中。有一下几个方面需要注意:

4.5.1、topic 策略

一个 MySQL 节点中可以有多个数据库, 每个数据库有多张表,是采用一个节点一个 Kafka Topic,还是一个数据库一个 Topic, 还是一张表一个 Topic?

4.5.2、数据分区策略

Kafka 中数据是根据 key 进行分区, 同一个分区下保证消息的顺序。

如何选择数据的key的限制因素就是看数据消费端是否希望同一个表的同一条数据的更新记录都落到同一个 Kafka 分区上,进而不需要消费端做多进程间的状态维护, 简化消费端逻辑。例如: 一个Kafka Topic 有20个分区,同一个表 table_1 中 ID 为1的数据前后两次更新被发送到了不同的 partition,这就要求消费端必须每个 partition 保持lag一致, 并且及时同步数据状态到其他消费进程可见才可以保证保序; 但如果同一个表 table_1 中 ID 为1的数据前后两次更新被发送到了同一个 partition, 由于 Kafka 保证同一个 partition 保序,消费端就简化了很多。

如下图展示数据乱序问题:

  • 假设 kafka 中 A2为新的数据, A1为同一个 ID 的老数据
  • 由于 慢消费进程数据堆积,导致 A2这个新数据先被消费, 当老数据 A1被消费时有可能覆盖之前的结果
    数据乱序问题

要实现上述的逻辑, 就要求在 Kafka 数据的 Key 的选择上做文章:

  1. 一种方式是使用 table 的名称作为 Kafka 的 key,这样同一张表的数据一定在一个 partition 上保序。 但这样的坏处是,如果数据集中在某一张表频繁更新,会造成某一个 partition 上数据量远大于其他 partition,消费端无法通过并行方式提高扩展性。
  2. 另一种方式就是,在 db 层面保证每张表的第一个 column 是主键,这样采用 binlog 中第一个 column 的数据作为 Kafka 的 key, 数据的平衡性会好很多,易于消费端扩容。

如下图,消息无乱序情况:

  • 数据 AC的每个版本由于 Hash 值 % 分区数量相同,同属于同一个分区, 并且按数据版本保序
  • 数据 BDAC, 数据按修改时间顺序保序但属于不同分区

4.6、数据序列化方式选择

读取到 binlog 数据后, 需要将数据序列化成更简单易用的格式,发送到 Kafka。如果选择 Avro 作为序列化方式的话,可以考虑集成 Kafka 背后的公司 Confluent 提出的一个新的方法: Schema Registry,具体信息参见 Confluent 公司官网。

4.7、集群管理服务

随着业务的扩展,越来越多的 MySQL 接入了数据同步服务。运维管理的压力也就随之而来。因此可能最后系统演变成如下结构:

  • 独立一个集群管理程序,负责管理解析程序节点,分配任务
  • 各个解析程序启动后,首先在 Zookeeper 注册,然后领取同步节点任务,启动解析过程
  • 类似的任务管理结构很常见,比如 Storm 中 Nimbus 节点管理 worker 节点等。
集群管理

4.8 服务监控

服务的监控必不可少。除了基础的进程监控,数据同步服务的关键是 binlog 解析服务与 MySQL master 之间的延迟监控,避免在 MySQL 写入高峰期导致数据延迟,影响后面的数据消费服务。

获取延迟的方法也很简单:

  1. 在 MySQL master 上实行 SHOW MASTER STATUS获取到 Master 节点当前的文件 ID 和 binlog 位置
  2. 获取同步服务当前处理的 binlog 文件 ID 和位置:
  3. 将相减的结果发送到监控服务(例如 open-falcon),后续根据需求报警
    • 一般文件 ID 相减结果 N 大于1, 表示同步服务已经落后 MySQL Master N 个文件,情况比较严重(除非是 MySQL Master 刚刚 rotate 新文件)
    • 文件 ID 相同,binlog 位置相减结果 M 就是相差的 binlog 文件大小, 单位: bytes
    • 此计算公式仅仅为近似估算,建议在差距持续一段时间(比如持续2分钟)的情况下再报警。

5、踩过的坑

Canal Blob 类型字段编码

由于 Canal 将 binlog 中的值序列化成了 String 格式给下游程序,因此在 Blob 格式的数据序列化成 String 时为了节省空间,强制使用了 IOS_8859_0作为编码。因此,在如下情况下会造成中文乱码:

  1. 同步服务 JVM 使用了 UTF-8 编码
  2. BLOB 字段中存储有中文字符

参见:

      // com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert 第541行起:
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
    // fixed text encoding
    // https://github.com/AlibabaTech/canal/issues/18
    // mysql binlog中blob/text都处理为blob类型,需要反查table
    // meta,按编码解析text
    if (fieldMeta != null && isText(fieldMeta.getColumnType())) {
        columnBuilder.setValue(new String((byte[]) value, charset));
        javaType = Types.CLOB;
    } else {
        // byte数组,直接使用iso-8859-1保留对应编码,浪费内存
        columnBuilder.setValue(new String((byte[]) value, ISO_8859_1));
        javaType = Types.BLOB;
    }
    break;

总结

通过实现数据同步服务,可以在一定程度上实现数据消费端与后端程序解耦。但凡事皆有成本,是否值得引入到现有系统架构中,还需要架构师自己斟酌。

-- EOF --

相关 [mysql binlog 数据] 推荐:

MySQL binlog 增量数据解析服务 - 简书

- -
MySQL binlog 增量数据解析服务. 介绍如何借助于 MySQL 的 Master-slave 协议实现 MySQL 增量数据获取服务. 做过后端开发的同学都知道, 经常会遇到如下场景:. 后端程序根据业务逻辑, 更新数据库记录. 过了几天, 业务需求需要更新搜索索引. 又过了几天, 随着数据需求方的增多, 结构改成发送数据到消息中间件(例如 Kafka), 其他系统自行从消息中间件订阅数据.

一个重大轮子: 基于mysql数据库binlog的增量订阅&消费

- - agapple
   早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求. 不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元.

[转][转]基于mysql数据库binlog的增量订阅&消费中间件:Canal

- - heiyeluren的blog(黑夜路人的开源世界)
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求. 不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元. 目前内部使用的同步,已经支持mysql5.x和oracle部分版本的日志解析.

MySQL 5.6 的 binlog API 很不错

- khsing - BT的花 blogs
MySQL 5.6 开始,开发者可以通过编程获得 binlog 内容,通过这个 API 可以做很多事情,比如自制备份,比如..... 这样做搜索更加简单了,对于开发者来说只有 SQL 操作;自动建索引这件事. 交给MySQL的某个'伪'复制节点自动完成,不需要再编程了!.

阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&消费

- - zzm
   早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求. 不过早期的数据库同步业务,主要是基于trigger的方式获取增 量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此 开启了一段新纪元.

mysql 数据分离

- - 数据库 - ITeye博客
网上看到一个读写分离的帖子,感觉不错. 构建高性能web之路------mysql读写分离实战(转). 一个完整的mysql读写分离环境包括以下几个部分:. 在本次实战中,应用程序client基于c3p0连接后端的database proxy. database proxy负责管理client实际访问database的路由策略,采用开源框架amoeba.

MySQL数据库的修复

- Xin - 博客园-首页原创精华区
找到mysql的安装目录的bin/myisamchk工具,在命令行中输入:. 然后myisamchk 工具会帮助你恢复数据表的索引. 好象也不用重新启动mysql,问题就解决了. 当你试图修复一个被破坏的表的问题时,有三种修复类型. 如果你得到一个错误信息指出一个临时文件不能建立,删除信息所指出的文件并再试一次--这通常是上一次修复操作遗留下来的.

同步mysql数据到hive

- - ITeye博客
地址为:http://archive.cloudera.com/cdh/3/下载相应版本,如sqoop-1.2.0-CDH3B4.tar.gz. 地址为:http://archive.cloudera.com/cdh/3/,版本可以为hadoop-0.20.2-CDH3B4.tar.gz. 3.解压 sqoop-1.2.0-CDH3B4.tar.gz ,hadoop-0.20.2-CDH3B4.tar.gz 到某目录如/home/hadoop/,解压后的目录为.

mysql数据整理命令

- - SQL - 编程语言 - ITeye博客
Analyze Table(修复索引). MySQL 的Optimizer(优化元件)在优化SQL语句时,首先需要收集一些相关信息,其中就包括表的cardinality(可以翻译为“散列程度”),它表示某个索引对应的列包含多少个不同的值——如果cardinality大大少于数据的实际散列程度,那么索引就基本失效了.

mysql 数据文件迁移

- - 数据库 - ITeye博客
1.新mysql存储路径:/data1/mysql/. 2.cp /var/lib/mysql到新存储路径:. 存储文件: cp -rp /var/lib/mysql /data1/mysql/data/. 日志文件: cp -rp /var/log/mysql /data1/mysql/log/.