使用canal+kafka监听MySQL binlog小实践

标签: canal kafka 监听 | 发表时间:2020-04-11 17:17 | 作者:ZacPark
出处:https://juejin.im/welcome/backend

前言

最近,想对MySQL有进一步的认识,看如何保证缓存与数据库一致性,在负责业务上也需要这方面的优化,有些文章提到使用监听MySQL binlog实现,想试下,本文纯属好奇心驱使。

  • 所用工具:MySQL + Canal + Kafka

1、kafka: kafka.apache.org/quickstart(… 2、Canal: github.com/alibaba/can… 3、MySQL版本如下:

  +-------------------------+-----------------------+
| Variable_name           | Value                 |
+-------------------------+-----------------------+
| innodb_version          | 8.0.12                |
| protocol_version        | 10                    |
| slave_type_conversions  |                       |
| tls_version             | TLSv1,TLSv1.1,TLSv1.2 |
| version                 | 8.0.12                |
| version_comment         | Homebrew              |
| version_compile_machine | x86_64                |
| version_compile_os      | osx10.14              |
| version_compile_zlib    | 1.2.11                |
+-------------------------+-----------------------+
复制代码

MySQL binlog简介

  • binlog是MySQL server层维护的一种二进制日志,与innodb等存储引擎中的redo/undo log是完全不同的日志;主要是用来记录对MySQL数据更新或潜在发生更新的SQL语句,并以“事务”的形式保存在磁盘中。 那么,binlog日志的作用如下:

主从复制 数据恢复 增量备份

  • 查看binlog相关配置: show variables like '%log_bin%';
  +---------------------------------+-----------------------------------+
| Variable_name                   | Value                             |
+---------------------------------+-----------------------------------+
| log_bin                         | ON                                |
| log_bin_basename                | /usr/local/var/mysql/binlog       |
| log_bin_index                   | /usr/local/var/mysql/binlog.index |
| log_bin_trust_function_creators | OFF                               |
| log_bin_use_v1_row_events       | OFF                               |
| sql_log_bin                     | ON                                |
+---------------------------------+-----------------------------------+
复制代码
  • 查看binlog目录: show binary logs;
  +---------------+-----------+
| Log_name      | File_size |
+---------------+-----------+
| binlog.000036 |       155 |
| binlog.000037 |      1066 |
| binlog.000038 |      3075 |
+---------------+-----------+
复制代码
  • 查看binlog的状态: show master status;可查看当前二进制日志文件的状态信息,显示正在写入的二进制文件,以及当前的position。
  | binlog.000038 |     3075 |              |                  |                 |
复制代码
  • 那么,再来查看 binlog.000038日志内容, ./mysqlbinlog /usr/local/var/mysql/binlog.000038
  • ROW级别下,SQL语句需要解码,需要加解码选项, ./mysqlbinlog --base64-output=decode-rows -v /usr/local/var/mysql/binlog.000038
  # at 3311
#200410 17:29:47 server id 1  end_log_pos 3386 CRC32 0xac866698 Write_rows: table id 65 flags: STMT_END_F
### INSERT INTO `zacblog`.`t_zb_article`
### SET
###   @1=5
###   @2=22121
###   @3='dada'
###   @4='dadwad'
###   @5=0
###   @6='2020-04-10 17:29:31'
###   @7=1586510981
复制代码
  • 当然,binlog的格式也是可以设定的,分别有 ROWSTATEMENTMIXED选项。

缓存与数据库一致性

  • 首先,根据CAP理论,一个系统同时满足C、A、P是不可能的,放弃C也不是说放弃一致性,而是放弃强一致性,追求 最终一致性
  • 此前,项目也遇到过,缓存与数据库如何保持一致性的问题,网上形形色色的答案,但我看到有些大神往往引用** Cache-Aside pattern 这篇文章,文章中讲的模式就是先更新数据库,再删除缓存**。

The order of the steps is important. Update the data store before removing the item from the cache. If you remove the cached item first, there is a small window of time when a client might fetch the item before the data store is updated. That will result in a cache miss (because the item was removed from the cache), causing the earlier version of the item to be fetched from the data store and added back into the cache. The result will be stale cache data. 译文:如果先删除缓存,会有短暂的时间窗口,客户端访问数据库的旧值,并且导致该key下的请求全部打到数据库,然后旧值也会重新保存到缓存。

enter image description here

  • 那么,更新完数据库,删除缓存失败(概率不能说没有)时,还是会有问题。。。

1、可以使用MQ重试机制,当remove抛出异常,我们可以利用MQ异步重试删除。 2、利用监控捕捉重试异常。

  • 写到这儿,我都快觉得我跑题了,确实这部分网上各抒己见,上面纯属个人看法。下面就是满足个人好奇心了,有文章说使用MQ监听 MySQL binlog的方式异步删除缓存,删除失败继续放入MQ重试。

Canal简介

  • 为了监听 MySQL binlog我搜到 美团DB数据同步到数据仓库的架构与实践 文章中写到使用 Canal实现 binlogKafka的连接。
  • Canal译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
  • 基于日志增量订阅和消费的业务包括

1、数据库镜像 2、数据库实时备份 3、索引构建和实时维护(拆分异构索引、倒排索引等) 4、业务 cache 刷新 5、带业务逻辑的增量数据处理

  • 然后,我们就要开始根据 QuickStart的命令去启动了,大部分都是根据官网操作的,只是遇到了几个坑。
  • 具体命令,我就不再赘述了,具体顺序为

1、先启动kafka 2、Canal instance.properties和canal.properties,包含db、用户信息、Kafka集群及topic信息 3、根据Canal配置信息,在Kafka创建相应的topic和消费者

踩坑记录

  • canal.properties关键配置
  ...
canal.serverMode = kafka
...
canal.mq.servers = localhost:9092,localhost:9093,localhost:9094
复制代码
  • instance.properties关键配置
  canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=example
复制代码
  • caching_sha2_password Auth failed
  Caused by: java.io.IOException: connect /127.0.0.1:3306 failure
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:83) ~[canal.parse.driver-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.connect(MysqlConnection.java:89) ~[canal.parse-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.preDump(MysqlEventParser.java:86) ~[canal.parse-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:183) ~[canal.parse-1.1.4.jar:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
Caused by: java.io.IOException: caching_sha2_password Auth failed
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:257) ~[canal.parse.driver-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:80) ~[canal.parse.driver-1.1.4.jar:na]
... 4 common frames omitted
复制代码
  • mysql -hlocalhost -P3306 -ucanal -p123 -Dzacblog登录数据库,重启 canal即可。

最后效果

  • 根据对数据库表,进行修改,Canal会通过Kafka发出消息,在Consumer接收到的消息格式如下。
  • INSERT语句, data为关键字段
  {
    "data":[
        {
            "article_id":"3",
            "author_id":"1121212",
            "article_title":"dadad",
            "article_content":"dadad",
            "status":"0",
            "create_time":"2020-04-10 16:30:53",
            "update_time":"2020-04-10 16:31:00"
        }
    ],
    "database":"zacblog",
    "es":1586507462000,
    "id":1,
    "isDdl":false,
    "mysqlType":{
        "article_id":"bigint(20)",
        "author_id":"bigint(20)",
        "article_title":"varchar(50)",
        "article_content":"text",
        "status":"tinyint(3)",
        "create_time":"datetime",
        "update_time":"timestamp"
    },
    "old":null,
    "pkNames":[
        "article_id"
    ],
    "sql":"",
    "sqlType":{
        "article_id":-5,
        "author_id":-5,
        "article_title":12,
        "article_content":2005,
        "status":-6,
        "create_time":93,
        "update_time":93
    },
    "table":"t_zb_article",
    "ts":1586507463069,
    "type":"INSERT"
}
复制代码
  • UPDATE语句,更新字段用 old的list来表示
  {
...
    "old":[
        {
            "article_title":"dadad",
            "update_time":"2020-04-10 16:31:00"
        }
    ],
...
    "type":"UPDATE"
}
复制代码
  • DELETE语句, data为关键字段
  {
...
    "data":[
        {
            "article_id":"5",
            "author_id":"22121",
            "article_title":"dada",
            "article_content":"dadwad",
            "status":"0",
            "create_time":"2020-04-10 17:29:31",
            "update_time":"2020-04-10 17:29:41",
            "ext":null
        }
    ],
    "database":"zacblog",
    "es":1586517722000,
...
    "type":"DELETE"
}
复制代码
  • ALTER TABLE,直接是语句
  {
    "data":null,
    "database":"zacblog",
    "es":1586511807000,
    "id":6,
    "isDdl":true,
    "mysqlType":null,
    "old":null,
    "pkNames":null,
    "sql":"ALTER TABLE `zacblog`.`t_zb_article` 
ADD COLUMN `ext` varchar(255) NULL AFTER `update_time`",
    "sqlType":null,
    "table":"t_zb_article",
    "ts":1586511808016,
    "type":"ALTER"
}
复制代码

小结

  • 本文从 MySQL binlog出发,途径缓存-数据库一致性的讨论,最后为了满足好奇心,简单动手监听了下 binlog改动的Kafka消息。
  • 其实,在实际开发当中,我们也可以参照这个消息格式,对数据库的增删改,发出对应的业务消息,这个消息格式,我觉得还是有借鉴意义的。

参考文章

相关 [canal kafka 监听] 推荐:

使用canal+kafka监听MySQL binlog小实践

- - 掘金后端
最近,想对MySQL有进一步的认识,看如何保证缓存与数据库一致性,在负责业务上也需要这方面的优化,有些文章提到使用监听MySQL binlog实现,想试下,本文纯属好奇心驱使. 所用工具:MySQL + Canal + Kafka. 1、kafka: kafka.apache.org/quickstart(… 2、Canal: github.com/alibaba/can… 3、MySQL版本如下:.

监听mysql的binlog日志工具分析:canal、Maxwell、Databus、DTS - 程序员大本营

- -
阿里云的数据传输服务DTS. 定位:基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议. mysql master收到dump请求,开始推送binary log给slave(也就是canal).

kafka监控之kafka-run-class.sh

- - 开源软件 - ITeye博客
kafka自带了很多工具类,在源码kafka.tools里可以看到:. 这些类该如何使用呢,kafka的设计者早就为我们考虑到了,在${KAFKA_HOME}/bin下,有很多的脚本,其中有一个kafka-run-class.sh,通过这个脚本,可以调用其中的tools的部分功能,如调用kafka.tools里的ConsumerOffsetChecker.scala,.

数据同步工具之FlinkCDC/Canal/Debezium对比-技术圈

- -
数据准实时复制(CDC)是目前行内实时数据需求大量使用的技术,随着国产化的需求,我们也逐步考虑基于开源产品进行准实时数据同步工具的相关开发,逐步实现对商业产品的替代. 本文把市面上常见的几种开源产品,Canal、Debezium、Flink CDC 从原理和适用做了对比,供大家参考. 本文首发微信公众号《import_bigdata》.

闲扯kafka mq

- - 开源软件 - ITeye博客
本文主要讲解关于kafka mq的设计思想及个人理解. 关于kafka的详细信息,大家可以参考官网的文献 http://kafka.apache.org/documentation.html这是一篇相当不错的文章,值得仔细研读. 第一个问题:消息队列(Message Queue)是干嘛用的. 首先,要对消息队列有一个基本的理解.

Kafka优化

- - ITeye博客
配置优化都是修改server.properties文件中参数值. 1.网络和io操作线程配置优化. # broker处理消息的最大线程数. # broker处理磁盘IO的线程数. 一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.

Kafka Connect简介

- - 鸟窝
Kafka 0.9+增加了一个新的特性 Kafka Connect,可以更方便的创建和管理数据流管道. 它为Kafka和其它系统创建规模可扩展的、可信赖的流数据提供了一个简单的模型,通过 connectors可以将大数据从其它系统导入到Kafka中,也可以从Kafka中导出到其它系统. Kafka Connect可以将完整的数据库注入到Kafka的Topic中,或者将服务器的系统监控指标注入到Kafka,然后像正常的Kafka流处理机制一样进行数据流处理.

kafka consumer group offset

- - 开源软件 - ITeye博客
     kafka0.9及以前版本kafka offset 保存在zookeeper, 因频繁读写zookeeper性能不高;从0.10开始,主题分区offset存储于kafka独立主题中.     管理监控kafka主题及分区offset至关重要,原网上很开源流行工具KafkaOffsetMonitor、kafka-manager,旧版offset保存于zookeeper,kafka consumer无相应API,从kafka0.10.1.1以后提供相应API读取主题分区offset(也可以调用KafkaClient API,kafka管理API由scala语言编写).

Kafka设计解析(二):Kafka High Availability (上)

- -
Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务. 若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失. 而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高.

GitHub - andreas-schroeder/kafka-health-check: Health Check for Kafka Brokers.

- -
At AutoScout24, to keep the OS up to date of our clusters running on AWS, we perform regular in-place rolling updates. As we run immutable servers, we terminate each broker and replace them with fresh EC2 instances (keeping the previous broker ids).