Debezium Binlog实时数据采集、落地数据使用的思考总结 - Eric-Ln - 博客园

标签: | 发表时间:2021-10-30 23:01 | 作者:
出处:https://www.cnblogs.com

个人总结原则

 

  首先抛开技术框架的讨论,个人总结Binlog 日志的数据采集主要原则:
  •   原则一 : 与业务数据解耦
  •   原则二 : 与业务数据结构解耦
  •   原则三 : 数据是可回溯的



  分别阐述一下这三个原则的具体含义

 

  原则一

    在数据采集中,数据落地一般都会使用时间分区进行落地,那就需要我们确定一下固定的时间戳作为时间分区的基础时间序列。
在这种情况下看来,业务数据上的时间戳字段,无论是从实际开发中获取此时间戳的角度,还是现实表中都会存在这样的时间戳,都不可能所有表完全满足。
    举一下反例:
    表 :业务时间戳
    table A : create_time,update_time
    table B : create_time
    table C : create_at
    table D : 无

 

    像这样的情况,理论上可以通过限制 RD 和 DBA 的在设计表时规则化表结构来实现时间戳以及命名的统一化、做限制,但是是在实际工作中,这样的情况基本上是做不到的,相信很多读者也会遇到这样的情况。
    可能很多做数据采集的同学会想,我们能不能要求他们去制定标准呢?
    个人的想法是,可以,但是不能把大数据底层数据采集完全依靠这样互相制定的标准。原因有以下三点:
  • 如果只是依靠两个部门或者多个部门制定的口头的或者书面的标准,却没有强制性在coding上面做约束,全部都是人为在约束的话,后期人员增加,迟早会出问题。
  •     大数据部门与后台部门,在于数据情况变更的情况,有时候可能是信息延时的,也就是说,有可能在数据落地后发现异常后,才知道后台部门做出了调整。
  •     也是最重要的一点,大数据部门不能要求在底层数据源去要求数据源去适应大数据的采集,这样要成的后果很有可能是限制后台部门在开发业务功能上的自由度,这样的开发流程也是不合理的。

    所以如果想要使用唯一固定的时间序列,就要和业务的数据剥离开,我们想要的时间戳不受业务数据的变动的影响。

 

  原则二

    在业务数据库中,一定会存在表结构变更的问题,绝大部分情况为增加列,但是也会存在列重命名、列删除这类情况,而其中字段变更的顺序是不可控的。

    此原则想描述的是,导入到数据仓库中的表,要适应数据库表的各种操作,保持其可用性与列数据的正确性。

 

  原则三

    此数据可回溯,其中包括两个方面

  •     数据采集可回溯
  •     数据消费落地可回溯

 

    第一个描述的是,在采集binlog采集端,可以重新按位置采集binlog。

    第二个描述的是,在消费binlog落地的一端,可以重复消费把数据重新落地。

 

  此为笔者个人总结,无论是选择什么样的技术选型进行组合搭建,这几点原则是需要具备的。


实现方案以及具体操作

  技术架构 : Debezium + Confluent + Kafka + OSS/S3 + Hive

 

  基于原则一的解决方案

  Debezium 提供了 New Record State Extraction 的配置选项,相当于提供了一个transform 算子,可以抽取出binlog 中的元数据。
对于 0.10 版本的配置,可以抽取 table,version,connector,name,ts_ms,db,server_id,file,pos,row 等binlog元数据信息。

  其中ts_ms为binlog日志的产生时间,此为binlog元数据,可以应用于所有数据表,而且可以在完全对数据表内部结构不了解的情况下,使用此固定时间戳,完全实现我们的原则一。

  关于Debezium,不同版本之前的配置参数有可能是不同的,如果读者有需要实践的话需要在官方文档上确认相应版本的配置参数。
对于其他框架,例如市面上用的较多的Canal,或者读者有自己需要开发数据采集程序的话,binlog的元数据建议全部抽取出来,在此过程以及后续过程中都可能会被用到。


  基于原则二的解决方案


  对于 Hive ,目前主流的数据存储格式为Parquet,ORC,Json,Avro 这几种。
  抛开数据存储的效率讨论。
  对于前两中数据格式,为列存,也就是说,这两种数据格式的数据读取,会严格依赖于我们数据表中的数据存储的顺序,这样的数据格式,是无法满足数据列灵活增加、删除等操作的。
  Avro 格式为行存,但是它需要依赖于Schema Register服务,考虑Hive的数据表读取完全要依赖一个外部服务,风险过高。
  最后确定使用Json格式进行数据存储,虽然这样的读取和存储效率没有其他格式高,但是这样可以保证业务数据的任何变更都可以在hive中读取出来。

  Debezium 组件采集binlog 的数据就是为json格式,和预期的设计方案是吻合的,可以解决原则二带来的问题。

  对于其他框架,例如市面上用的较多的Canal,可以设置为Json数据格式进行传输,或者读者有自己需要开发数据采集程序的话,也是相同的道理。


  基于原则三的解决方案


  在采集binlog采集端,可以重新按位置采集binlog。
  此方案实现方式在Debezium官方网站上也给出了相应的解决方案,大概描述一下,需要用到 Kafkacat工具。


  对于每一个采集的mysql实例,创建数据采集任务时,Confluent都会相应的创建connector(也就是采集程序)的采集的元数据的topic,
  里面会存储相应的时间戳、文件位置、以及位置,可以通过修改此数据,重置采集binlog日志的位置。
  值得注意的是,此操作的时间节点也是有限制的,和mysql的binlog日志保存周期有关,所以此方式回溯时,需要确认的是mysql日志还存在。


  对于重复消费把数据重新落地。
  此方案因为基于kafka,对于kafka重新制定消费offset消费位点的操作网上有很多方案,此处不再赘述。
  对于读者自己实现的话,需要确认的选择的MQ支持此特性就好了。
 
  https://debezium.io/documentation/faq/#how_to_change_the_offsets_of_the_source_database

 

业务场景影响下的重要操作

  此部分只描述在笔者技术架构下如何实现以下操作,读者可以根据自己选择的技术组件探究不同的技术方案。

  数据库分库分表的情况


    基于Debezium的架构,一个Source 端只能对应一个mysql实例进行采集,对于同一实例上的分表情况,可以使用 Debezium Topic Routing 功能,
    在采集过滤binlog时把相应需要采集的表按照正则匹配写入一个指定的topic中。
    在分库的情况下,还需要在 sink 端 增加 RegexRouter transform算子进行topic 间的合并写入操作。



  数据增量采集与全量采集

    对于采集组件,目前目前的配置都是以增量为默认,所以无论是选择 Debezium 还是 Canal的话,正常配置就好。
    但是有些时候会存在需要采集全表的情况,笔者也给出一下全量的数据采集的方案。
    方案一
      Debezium 本身自带了这样的功能,需要将
      snapshot.mode 参数选型设置为 when_needed,这样可以做表的全量采集操作。
      官方文档中,在此处的参数配置有更加细致的描述。

  https://debezium.io/documentation/reference/0.10/connectors/mysql.html#snapshots

    方案二
      使用sqoop和增量采集同时使用的方式进行。
      此方案适用于表数据已存在很多,而目前binlog数据频率不频繁的情况下,使用此方案。
      值得注意的是有两点:
  •       sqoop数据导入落地为Parquet格式,与增量采集数据合并时,需要做数据格式整合,也就是中间需要有临时表,通过union all的方式把数据merge到全量表中。
  •       sqoop导入的Parquet格式,与 Debezium 处理某些数据类型时会存在不相同的问题,例如datetime类型,sqoop会导出string,Debezium 会转化为bigint。


  离线数据去重条件

    数据落地后,通过json表映射出binlog原始数据,那么问题也就来了,我们如何找到最新的一条数据呢?
    也许我们可以简单的认为,用我们刚刚的抽取的那个ts_ms,然后做倒排不就好了吗?
    大部分情况下这样做确实是可以的。
    但是笔者在实际开发中,发现这样的情况是不能满足所有情况的,因为在binlog中,可能真的会存在 ts_ms 与 PK 相同,但是确实不同的两条数据。
    那我们怎么去解决时间都相同的两条数据呢?
    答案就在上文,我们刚刚建议的把binlog 的元数据都抽取出来。

SELECT*FROM(SELECT*,
row_number()over(partitionBYt.idORDERBYt.`__ts_ms`DESC,t.`__file`DESC,cast(t.`__pos`ASint)DESC)ASorder_byFROMtest tWHEREdt='{pt}'ANDhour='{now_hour}') t1WHEREt1.order_by=1

 


    解释一下这个sql 中row_number的的条件
    __ts_ms : 为binlog中的ts_ms,也就是事件时间。
    __file : 为binlog此条数据所在file name。
    __pos : 为binlog中此数据所在文件中的位置,为数据类型。

    这样的条件组合取出的数据,就是最新的一条。

    也许有读者会问,如果这条数据被删除了怎么办,你这样取出来的数据不就是错的了吗?
    这个Debezium也有相应的操作,有相应的配置选项让你如何选择处理删除行为的binlog数据。
    作为给大家的参考,笔者选择 rewrite 的参数配置,这样在上面的sql最外层只需要判断 “delete = ’false‘“ 就是正确的数据啦。

    https://debezium.io/documentation/reference/0.10/configuration/event-flattening.html


架构上的总结


  在技术选型以及整体与细节的架构中,笔者始终在坚持一个原则——
  流程尽量简约而不简单,数据环节越长,出问题的环节就可能越多。对于后期锁定问题与运维难度也会很高。

  所以笔者在技术选型也曾考虑过Flink + Kafka 的这种方式,但是基于当时的现状,笔者并没有选择这样的技术选型,笔者也阐述一下原因。
  • 笔者的flink环境没有做开发平台化与运维平台化。
  • 场景偏向于数据采集和传输,而不是计算,Flink的优势特性并没有使用到很多。
  • 如果基于一个Mysql 实例开发一个Flink程序,使用原生的Flink steaming,做api式的程序开发,如果因为某些表的数据导致程序挂掉,这个实例的数据都无法采集了,这样的影响范围太大。
  • 如果基于一个一个表或者通过正则的方式匹配一些表,做一个flink程序,这样虽然是保证了灵活度,但是90%的代码都是冗余的,而且会有很多任务,浪费资源。
  • 最后就是开发和维护效率的问题,如果只是写原生的 flink 程序的话,后续的累加开发,会把程序变得越来越重,可能逻辑也会越来越繁琐。


  总结起来,我当时对于Flink的思考,如果Flink没有做开发和运维监控的平台化的情况下,可以作为一个临时方案,但是后期如果一直在这样一个开发流程下缝缝补补,多人开发下很容易出现问题,或者就是大家都这样一个程序框架下造轮子,而且越造越慢。而且后期的主要项目方向并没有把Flink平台化提上日程,所以也是考虑了一部分未来的情况进行的选择。
  所以个人最后确定技术选型的时候,并没有选用Flink。

 

结束语

  此篇文章笔者写的较为理论化,也是对此场景的一个技术理论总结。如果文中有其他不明确的操作的话,可以参考笔者之前的文章,有详细的代码级操作。

  技术架构上的方案多种多样,笔者只是选择了其中一种进行实现,也希望大家有其他的技术方案或者理论进行交流,烦请指正。

  微信:Franki__5

  

 

 

相关 [debezium binlog 实时] 推荐:

基于Binlog的实时同步功能——debezium、canel、databus技术选型 | holmofy

- -
去年的一篇文章大致地讲了我对MQ的一些认识,事实上Kafka在内的现代MQ,功能远不止这些. 后面整理好自己的思路,肯定会再写一篇文章来讲讲. 这篇文章的主角就是与MQ息息相关的CDC技术. CDC全称叫:change data capture,是一种基于数据库数据变更的事件型软件设计模式. 比如有一张订单表trade,订单每一次变更录入到一张trade_change的队列表.

Debezium Binlog实时数据采集、落地数据使用的思考总结 - Eric-Ln - 博客园

- -
  首先抛开技术框架的讨论,个人总结Binlog 日志的数据采集主要原则:.   原则一 : 与业务数据解耦.   原则二 : 与业务数据结构解耦.   原则三 : 数据是可回溯的.   分别阐述一下这三个原则的具体含义.     在数据采集中,数据落地一般都会使用时间分区进行落地,那就需要我们确定一下固定的时间戳作为时间分区的基础时间序列.

Debezium 实现 MySQL 到 Elasticsearch 高效实时同步

- - IT瘾-dev
来自Elasticsearch中文社区的问题——. MySQL中表无唯一递增字段,也无唯一递增时间字段,该怎么使用logstash实现MySQL实时增量导数据到es中. logstash和kafka_connector都仅支持基于自增id或者时间戳更新的方式 增量同步数据. 回到问题本身:如果库表里没有相关字段,该如何处理呢.

debezium 架构和常见使用场景

- -
Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台. 你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改. 只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back).

Flink CDC 核心:Debezium 1.9.0.Beta1 发布!

- - IT瘾-dev
我很高兴地宣布 Debezium  1.9.0.Beta1的发布. 此版本包括 Debezium Server 的许多新功能,包括 Knative Eventing 支持和使用 Redis 接收器的偏移存储管理、SQL Server 连接器的多分区缩放以及各种错误修复和改进. 总体而言,此版本已修复56 个问题.

MySQL 5.6 的 binlog API 很不错

- khsing - BT的花 blogs
MySQL 5.6 开始,开发者可以通过编程获得 binlog 内容,通过这个 API 可以做很多事情,比如自制备份,比如..... 这样做搜索更加简单了,对于开发者来说只有 SQL 操作;自动建索引这件事. 交给MySQL的某个'伪'复制节点自动完成,不需要再编程了!.

Binlog 的三个业务应用场景

- - IT瘾-dev
02、binlog的业务应用. 01、什么是binlog. binlog是mysql的一种二进制日志文件,用来记录数据的变化. mysql使用binlog进行主从复制,如图:. 客户端向master的mysql sever写入数据. 当数据发生变化时,master将变更的数据记录写入到二进制文件中,即binlog.

CDC (捕获数据变化) Debezium 介绍 | 首席架构师

- -
Debezium是一个分布式平台,它将您现有的数据库转换为事件流,因此应用程序可以看到数据库中的每一个行级更改并立即做出响应. Debezium构建在Apache Kafka之上,并提供Kafka连接兼容的连接器来监视特定的数据库管理系统. Debezium在Kafka日志中记录数据更改的历史,您的应用程序将从这里使用它们.

数据同步工具之FlinkCDC/Canal/Debezium对比-技术圈

- -
数据准实时复制(CDC)是目前行内实时数据需求大量使用的技术,随着国产化的需求,我们也逐步考虑基于开源产品进行准实时数据同步工具的相关开发,逐步实现对商业产品的替代. 本文把市面上常见的几种开源产品,Canal、Debezium、Flink CDC 从原理和适用做了对比,供大家参考. 本文首发微信公众号《import_bigdata》.

使用 Kafka、Debezium 和 Kubernetes 实现应用现代化的模式

- - InfoQ - 促进软件开发领域知识与创新的传播
本文最初发表于 RedHat 的开发者站点,经原作者 Bilgin Ibryam 许可,由 InfoQ 中文站翻译分享. “我们建造计算机的方式与建造城市的方式是一样的,那就是随着时间的推移,依然毫无计划,并且要建造在废墟之上. Ellen Ullman 在 1998 年写下了这样一句话,但它今天依然适用于我们构建现代应用程序的方式,那就是,随着时间的推移,我们要在遗留的软件上构建应用,而且仅仅有短期的计划.