最近在对已有的大数据采集和数据集成工具进行梳理,并考虑进行相关的产品整合工作,经过对已有的产品的测试和验证,已经实际需要的业务场景,初步考虑清楚后续需要进行新增和完善部分的内容。
数据库实时同步和复制
对于数据库实时同步和复制一定会谈到的两款商用产品就是Oracle GoldenGate和Quest
SharePlex,具体的介绍网上也比较多,其核心特点就是支持异构数据库之间的实时数据同步和负责,而且对源数据库本身侵入性很小。两个商用产品基本都是对各种数据库的Log日志文件进行分析,然后进行复制。
那对于这块如果要自研来实现有无可能,对于Mysql来说由于采用Binlog日志方式,类似淘宝的Otter已经可以完整的实现数据库的实体同步复制。如果单纯是Oracle-Oracle数据库之间,我们也可以采用Oracle
DataGuard或者Oracle Stream流复制技术进行复制,还有就是基于Oracle LogMiner进行redo
log日志分析后进行两个数据库之间的同步。因此关键问题还是在异构数据库之间的同步复制上。
对于数据库复制,Oracle当前常用的解决方案主要有:
oracle日志文件,比如LogMiner,OGG,SharePlex
oracle CDC(Change Data Capture)
oracle trigger机制,比如DataBus , SymmetricDS
oracle 物化视图(materialized view)比如淘宝的yugong开源
在这些解决方案里面可以看到有开源的SymmetricDS解决方案,但是是基于触发器机制,侵入性还是相对较大。也有淘宝的yugong可以实现Oracle->mysql的全量或增量复制,但是基于增量物化视图方式,本身会影响到源库数据表的CUD操作。
而实际上最佳的解决方案仍然是基于log日志的实时同步复制,其核心思路包括三个步骤
1. 在源库设置为记录日志或归档模式,源库首先能够记录下日志信息。
2. 实时的能够读取到日志信息,并对日志信息进行解析或适当转换映射,包括和目标库的适配。
3. 在目标数据库直接运行相应解析后的日志SQL语句,实现同步更新。
由于Mysql本身提供可读性很强的Binlog日志,因此可以看到Mysql->Mysql,Mysql->Oracle的实时同步日志问题是可以得到很好解决的。而对于Oracle->Oracle也可以解决,较难的就是Oracle->Mysql或其它异构数据库,这里需要分析Oracle本身的redo
log日志(当前Oracle提供有logminer工具),如果我们自己写一个解析包的话就需要对Oracle redo
log结构有完整的了解。
而结合Oracle
流复制技术,我们可以考虑Oracle首先将变更信息写入到自己的AQ,然后我们从AQ订阅消息后直接处理或者写入到我们自己的消息队列或流处理软件,然后在流处理软件中完成相关的映射转换后写入到目标异构数据库中。
数据库采集同步
对于数据库采集同步,当前谈到比较多的工具主要有Sqoop和结构化数据库间的ETL工具,当然当前对于开源的Kettle和Talend本身也集成了大数据集成内容,可以实现和hdfs,hbase和主流Nosq数据库之间的数据同步和集成。而淘宝的DataX则主要可以实现常见主流的结构化数据库(Oracle,
Mysql,SqlServer)和hdfs之间的数据集成和同步。
对于Sqoop和DataX等当前也支持基于Key关键字段或时间戳的数据增量导入。即我们可以将导入命令行语句或Shell脚本通过任务或调度管理平台(类似开源的Chronos)配置为定时的调度作业,来实现对数据库的定时增量采集。
而对于常规的数据库包括大数据存储之间的采集和集成,再充分考虑性能的情况下,核心思路为:
1. 将源数据库数据进行导出,使用Sql或DB原生的导出命令直接导出为txt文件,字段以分隔符进行分隔。
1.1
可以部署多个代理端,对数据库数据启用多个线程进行导出
1.2
支持基于key值或时间戳的增量数据导出
2. 对导出的数据进行压缩后进行传输(特别是在源和目标库不在同一个数据中心时)
3. 在目标库端基于数据库原生的load命令对数据进行bulk批量导入。
在整个实现里面有两个核心,一个就是可以启用多个代理端和多线程机制并行导出数据库,其次就是导出数据压缩传输,然后在通过load
data原生命令进行数据库的bulk批量装载提升性能。
如果基于以上思路我们可以看到数据采集的重点还是在性能上面,不会去实现ETL工具本身复杂的数据映射和转化,数据聚合等操作。核心只是做异构数据库和Hdfs文件存储之间的数据搬移。而我们完全自己研发的DataPipe产品基本参考上述思路实现,其测试性能对于结构化数据库之间采集和集成是Sqoop或DataX的2-3倍左右,而对于hdfs之间的集成则在5-10倍左右的性能提升。
对于这种采集存在的约束就是不要去处理数据变更的问题,仅仅是做数据的全量同步或者是数据库表数据的简单Append处理,否则性能本身会下降很多。如果有大量数据更新需要同步,最好的方式还是首先Truncate掉目标数据库表,然后再进行全量同步。简单验证对于Mysql数据库间100万数据,180M左右数据量的全量同步整体同步时间在14秒左右即全部完成。
文件采集
对于文件的采集大家谈的比较多的还是flume进行实时的文件采集和处理,当然对于ELK(Elasticsearch、Logstash、Kibana三者的组合)虽然是处理日志,但是也有基于模板配置的完整增量实时文件采集实现。如果是仅仅是做日志的采集和分析,那么用ELK解决方案就完全够用的。
我们谈的文件采集还是是采集文件后进行预处理,然后将采集的文件导入到hdfs库进行存储。对于这种方式即需要实现flume和hdfs的集成。同时我们看到如果对采集的数据要进行实时的处理和分析,则还需要结合kafka和storm来共同完成。这是一个完整的组合,网上也有完整的例子可以参考。
对于文件采集,其核心的实现思路可以概括为如下几个步骤来完成:
1. 实现对服务器源目录的实时监听,同时对文件流实时采集。
2. 实现对采集的文件流进行预处理,如通过flume 定制sink或其它相应插件。
3. 对采集的文件流输出到txt文件再导入到hdfs或者直接通过hdfs api接口增量导入hdfs文件系统。
4. 对于流处理模式则需要将flume接到kafa+storm上,实现流处理,storm处理结果可以存到redis库。