实时数据同步服务如何保证消息的顺序性

标签: | 发表时间:2020-08-16 08:48 | 作者:
出处:https://www.cnblogs.com

上一篇 介绍了移山(数据迁移平台)实时数据同步的整体架构; 
本文主要介绍移山(数据迁移平台)实时数据同步是如何保证消息的顺序性。

可以访问  这里 查看更多关于大数据平台建设的原创文章。

一. 什么是消息的顺序性?

  1. 消息生产端将消息发送给同一个MQ服务器的同一个分区,并且按顺序发送;

  2. 消费消费端按照消息发送的顺序进行消费。

二. 为什么要保证消息的顺序性?

在某些业务功能场景下需要保证消息的发送和接收顺序是一致的,否则会影响数据的使用。

需要保证消息有序的场景

移山的实时数据同步使用  canal 组件订阅MySQL数据库的日志,并将其投递至 kafka 中(想了解移山实时同步服务架构设计的可以点  这里); 
kafka 消费端再根据具体的数据使用场景去处理数据(存入 HBase、MySQL 或直接做实时分析); 
由于binlog 本身是有序的,因此写入到mq之后也需要保障顺序。

  1. 假如现在移山创建了一个实时同步任务,然后订阅了一个业务数据库的订单表;

  2. 上游业务,向订单表里插入了一个订单,然后对该订单又做了一个更新操作,则 binlog 里会自动写入插入操作和更新操作的数据,这些数据会被 canal server 投递至 kafka broker 里面;

  3. 如果 kafka 消费端先消费到了更新日志,后消费到插入日志,则在往目标表里做操作时就会因为数据缺失导致发生异常。

三. 移山实时同步服务是怎么保证消息的顺序性

实时同步服务消息处理整体流程如下:

我们主要通过以下两个方面去保障保证消息的顺序性。

1. 将需要保证顺序的消息发送到同一个partition

1.1 kafka的同一个partition内的消息是有序的
  • kafka 的同一个 partition 用一个write ahead log组织, 是一个有序的队列,所以可以保证FIFO的顺序;

  • 因此生产者按照一定的顺序发送消息,broker 就会按照这个顺序把消息写入 partition,消费者也会按照相同的顺序去读取消息;

  • kafka 的每一个 partition 不会同时被两个消费者实例消费,由此可以保证消息消费的顺序性。

1.2 控制同一key分发到同一partition

要保证同一个订单的多次修改到达 kafka 里的顺序不能乱,可以在Producer 往 kafka 插入数据时,控制同一个key (可以采用订单主键key-hash算法来实现)发送到同一 partition,这样就能保证同一笔订单都会落到同一个 partition 内。

1.3 canal 需要做的配置

canal 目前支持的mq有 kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力。我们只需在配置 instance 的时候开启如下配置即可:

1> canal.properties

# leader节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成
canal.mq.acks = all

备注:

  • 这样只要至少有一个同步副本存在,记录就不会丢失。

2> instance.properties

1 # 散列模式的分区数
2 canal.mq.partitionsNum=2
3 # 散列规则定义 库名.表名: 唯一主键,多个表之间用逗号分隔
4 canal.mq.partitionHash=test.lyf_canal_test:id

备注:

  • 同一条数据的增删改操作 产生的 binlog 数据都会写到同一个分区内;

  • 查看指定topic的指定分区的消息,可以使用如下命令:

    bin/kafka-console-consumer.sh --bootstrap-server serverlist --topic topicname --from-beginning --partition 0

2. 通过日志时间戳和日志偏移量进行乱序处理

将同一个订单数据通过指定key的方式发送到同一个 partition 可以解决大部分情况下的数据乱序问题。

2.1 特殊场景

对于一个有着先后顺序的消息A、B,正常情况下应该是A先发送完成后再发送B。但是在异常情况下:

  • A发送失败了,B发送成功,而A由于重试机制在B发送完成之后重试发送成功了;

  • 这时对于本身顺序为AB的消息顺序变成了BA。

移山的实时同步服务会在将订阅到的数据存入HBase之前再加一层乱序处理 。

2.2 binlog里的两个重要信息

使用  mysqlbinlog 查看 binlog:

/usr/bin/mysqlbinlog --base64-output=decode-rows -v /var/lib/mysql/mysql-bin.000001

执行时间和偏移量:

备注:

  1. 每条数据都会有执行时间和偏移量这两个重要信息, 下边的校验逻辑核心正是借助了这两个值

  2. 执行的sql 语句在 binlog 中是以base64编码格式存储的,如果想查看sql 语句,需要加上: --base64-output=decode-rows -v 参数来解码;

  3. 偏移量:

    • Position 就代表 binlog 写到这个偏移量的地方,也就是写了这么多字节,即当前 binlog 文件的大小;

    • 也就是说后写入数据的 Position 肯定比先写入数据的 Position 大, 因此可以根据 Position 大小来判断消息的顺序。

3.消息乱序处理演示

3.1 在订阅表里插入一条数据,然后再做两次更新操作:
MariaDB [test]> insert into lyf_canal_test (name,status,content) values('demo1',1,'demo1 test');
Query OK, 1 row affected (0.00 sec)
 
MariaDB [test]> update lyf_canal_test set name = 'demo update' where id = 13;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
 
MariaDB [test]> update lyf_canal_test set name = 'demo update2',content='second update',status=2 where id = 13;
Query OK, 1 row affected (0.00 sec)
3.2 产生三条需要保证顺序的消息

插入,第一次更新,第二次更新这三次操作产生的 binlog 被 canal server 推送至 kafka 中的消息分别称为: 消息A,消息B,消息C

  • 消息A: 

  • 消息B: 

  • 消息C: 

3.3 网络原因造成消息乱序

假设由于不可知的网络原因:

  • kafka broker收到的三条消息分别为: 消息A,消息C,消息B

  • 则 kafka 消费端消费到的这三条消息先后顺序就是: 消息A,消息C,消息B

  • 这样就造成了消息的乱序,因此 订阅到的数据在存入目标表前必须得加乱序校验处理

3.4 消息乱序处理逻辑

我们利用HBase的特性,将数据主键做为目标表的 rowkey。当 kafka 消费端消费到数据时,乱序处理主要流程(摘自禧云数芯大数据平台技术白皮书)如下:

demo的三条消息处理流程如下: 
1> 判断消息A 的主键id做为rowkey在hbase的目标表中不存在,则将消息A的数据直接插入HBase: 

2> 消息C 的主键id做为rowkey,已经在目标表中存在,则这时需要拿消息C 的执行时间和表中存储的执行时间去判断:

  • 如果消息C 中的执行时间小于表中存储的执行时间,则证明消息C 是重复消息或乱序的消息,直接丢弃;

  • 消息C 中的执行时间大于表中存储的执行时间,则直接更新表数据(本demo即符合该种场景): 

  • 消息C 中的执行时间等于表中存储的执行时间,则这时需要拿消息C 的偏移量和表中存储的偏移量去判断:

    • 消息C 中的偏移量小于表中存储的偏移量,则证明消息C 是重复消息,直接丢弃;

    • 消息C 中的偏移量大于等于表中存储的偏移量,则直接更新表数据。

3> 消息B 的主键id做为rowkey,已经在目标表中存在,则这时需要拿消息B 的执行时间和表中存储的执行时间去判断:

  • 由于消息B中的执行时间小于表中存储的执行时间(即消息C 的执行时间),因此消息B 直接丢弃。

3.5 主要代码

kafka 消费端将消费到的消息进行格式化处理和组装,并借助  HBase-client API 来完成对 HBase 表的操作。

1> 使用 Put组装单行数据

/**
* 包名: org.apache.hadoop.hbase.client.Put
* hbaseData 为从binlog订阅到的数据,通过循环,为目标HBase表
* 添加rowkey、列簇、列数据。
* 作用:用来对单个行执行加入操作。
*/
Put put = new Put(Bytes.toBytes(hbaseData.get("id")));
// hbaseData 为从binlog订阅到的数据,通过循环,为目标HBase表添加列簇和列
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(mapKey), Bytes.toBytes(hbaseData.get(mapKey)));

 

2> 使用  checkAndMutate,更新 HBase表的数据

只有服务端对应rowkey的列数据与预期的值符合期望条件(大于、小于、等于)时,才会将put操作提交至服务端。

// 如果 update_info(列族) execute_time(列) 不存在值就插入数据,如果存在则返回false
boolean res1 = table.checkAndMutate(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info")) .qualifier(Bytes.toBytes("execute_time")).ifNotExists().thenPut(put);
 
// 如果存在,则去比较执行时间
if (!res1) {
// 如果本次传递的执行时间大于HBase中的执行时间,则插入put
boolean res2 =table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"),
Bytes.toBytes("execute_time"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_time")),put);
 
// 执行时间相等时,则去比较偏移量,本次传递的值大于HBase中的值则插入put
if (!res2) {
boolean res3 = table.checkAndPut(Bytes.toBytes(hbaseData.get("id")),
Bytes.toBytes("update_info"), Bytes.toBytes("execute_position"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_position")),put);
}
}

 

四.总结

  1. 目前移山的实时同步服务,kafka 消费端是使用一个线程去消费数据;

  2. 如果将来有版本升级需求,将消费端改为多个线程去消费数据时,要考虑到多线程消费时有序的消息会被打乱这种情况的解决办法。


相关 [实时 数据 同步] 推荐:

基于 Flink SQL CDC 的实时数据同步方案 (developer.aliyun.com)

- - IT瘾-jianshu
整理:陈政羽(Flink 社区志愿者). Flink 1.11 引入了 Flink SQL CDC,CDC 能给我们数据和业务间能带来什么变化. 本文由 Apache Flink PMC,阿里巴巴技术专家伍翀 (云邪)分享,内容将从传统的数据同步方案,基于 Flink CDC 同步的解决方案以及更多的应用场景和 CDC 未来开发规划等方面进行介绍和演示.

百万级商品数据实时同步,查询结果秒出

- - IT瘾-dev
前阵子老板安排了一个新任务,要建设一个商家商品搜索系统,能够为用户提供快速、准确的搜索能力,在用户输入搜索内容时,要能从商家名称和商品名称两个维度去搜索,搜索出来的结果,按照准确率排序,并按商家所属商品的关联关系,来组合数据结构,同时提供API给业务系统调用. 我们面临以下几个难题: ①商家数据库和商品数据库是多台不同的服务器,并且数据量达百万级,如何才能实现跨数据库的数据同步呢.

实时数据同步服务如何保证消息的顺序性

- -
上一篇 介绍了移山(数据迁移平台)实时数据同步的整体架构; . 本文主要介绍移山(数据迁移平台)实时数据同步是如何保证消息的顺序性. 这里 查看更多关于大数据平台建设的原创文章. 消息生产端将消息发送给同一个MQ服务器的同一个分区,并且按顺序发送;. 消费消费端按照消息发送的顺序进行消费. 在某些业务功能场景下需要保证消息的发送和接收顺序是一致的,否则会影响数据的使用.

如何基于日志,同步实现数据的一致性和实时抽取?

- - 运维派
目前就职于宜信技术研发中心,任架构师,负责流式计算和大数据业务产品解决方案. 曾任职于Naver china(韩国最大搜索引擎公司)中国研发中心资深工程师,多年从事CUBRID分布式数据库集群开发和CUBRID数据库引擎开发. dbus+wormhole总体架构和技术实现方案. 大家好,我是王东,来自宜信技术研发中心,这是我来社群的第一次分享,如果有什么不足,请大家多多指正、包涵.

利用ogg实现oracle到kafka的增量数据实时同步 | 伦少的博客

- -
ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json. 下面是我的源端和目标端的一些配置信息:. 注意:源端和目标端的文件不一样,目标端需要下载Oracle GoldenGate for Big Data,源端需要下载Oracle GoldenGate for Oracle具体下载方法见最后的附录截图.

MySQL如何实时同步数据到ES?试试这款阿里开源的神器!

- - 掘金后端本月最热
SpringBoot实战电商项目mall(40k+star)地址:. mall项目中的商品搜索功能,一直都没有做实时数据同步. 最近发现阿里巴巴开源的 canal可以把MySQL中的数据实时同步到Elasticsearch中,能很好地解决数据同步问题. 今天我们来讲讲 canal的使用,希望对大家有所帮助.

多屏时代的App实时同步,BackWire为开发者提供后台跨屏数据同步的BaaS解决方案

- - 互联网的那点事...
移动互联网时代,要想给用户提供一个完整、无缝的产品体验,开发者必须为自己的产品开发支持多个终端的应用. 同时在多终端环境下,还要解决各终端间数据实时同步的问题. 而昨天在InnoSpace 的首届DemoDay 上,就出现了一家为开发者提供App 多终端数据实时同步服务的创业公司: Backwire.

同步mysql数据到hive

- - ITeye博客
地址为:http://archive.cloudera.com/cdh/3/下载相应版本,如sqoop-1.2.0-CDH3B4.tar.gz. 地址为:http://archive.cloudera.com/cdh/3/,版本可以为hadoop-0.20.2-CDH3B4.tar.gz. 3.解压 sqoop-1.2.0-CDH3B4.tar.gz ,hadoop-0.20.2-CDH3B4.tar.gz 到某目录如/home/hadoop/,解压后的目录为.

inotify-rsync实时同步脚本

- lostsnow - 无网不剩
rsync是linux下一款非常强大的同步工具,采用差异同步的方法,只上传文件/文件夹的不同部分,同时可以对上传部分先进行压缩,所以rsync的传输效率是很高的. 但rsync也有缺点,最大的问题就是每次执行rsync命令都会遍历目标目录,当文件不多时,这没什么问题,一旦文件数到了一定规模,那么每次遍历都会消耗很多资源.

Twitter实时同步Google Buzz的方法

- Gene - 月光博客
  Google Buzz和FriendFeed一样支持导入Twitter的信息,不过Buzz和FriendFeed不同的是,它对于Twitter的同步是单向的,你可以看到别人的Twitter,却无法回复到Twitter中,另一个重要的问题是延时,Twitter同步信息到Buzz的延时非常严重,和Friendfeed的实时性同步形成鲜明对比.