利用ogg实现oracle到kafka的增量数据实时同步 | 伦少的博客

标签: | 发表时间:2020-02-01 17:34 | 作者:
出处:https://dongkelun.com

前言

ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json。
下面是我的源端和目标端的一些配置信息:

- 版本 OGG版本 ip 别名
源端 OracleRelease 11.2.0.1.0 Oracle GoldenGate 11.2.1.0.3 for Oracle on Linux x86-64 192.168.44.128 master
目标端 kafka_2.11-1.1.0 Oracle GoldenGate for Big Data 12.3.1.1.1 on Linux x86-64 192.168.44.129 slave1

1、下载

可在 这里旧版本查询下载
注意:源端和目标端的文件不一样,目标端需要下载Oracle GoldenGate for Big Data,源端需要下载Oracle GoldenGate for Oracle具体下载方法见最后的附录截图。

2、源端(Oracle)配置

注意:源端是安装了oracle的机器,oracle环境变量之前都配置好了

2.1 解压

先建立ogg目录

1             
2
mkdir -p /opt/ogg             
unzip V34339-01.zip

解压后得到一个tar包,再解压这个tar

1             
2
tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar -C /opt/ogg             
chown -R oracle:oinstall /opt/ogg (使oracle用户有ogg的权限,后面有些需要在oracle用户下执行才能成功)

2.2 配置ogg环境变量

为了简单方便起见,我在/etc/profile里配置的,建议在生产中配置oracle的环境变量文件/home/oracle/.bash_profile里配置,为了怕出问题,我把OGG_HOME等环境变量在/etc/profile配置了一份,不知道这是否是必须的。

1             
vim /etc/profile             

1            
2
3
export OGG_HOME=/opt/ogg            
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:/usr/lib
export PATH=$OGG_HOME:$PATH

使之生效

1             
source/etc/profile             

测试一下ogg命令

1             
ggsci             

如果命令成功即可进行下一步,不成功请检查前面的步骤。

2.3 oracle打开归档模式

1            
2
su - oracle            
sqlplus / as sysdba

执行下面的命令查看当前是否为归档模式

1             
archive log list             

1            
2
3
4
5
6
SQL> archive log list            
Database log mode No Archive Mode
Automatic archival Disabled
Archive destination USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence 12
Current log sequence 14

若为Disabled,手动打开即可

1             
2
3
4
5
6
conn / as sysdba (以DBA身份连接数据库)             
shutdown immediate (立即关闭数据库)
startup mount (启动实例并加载数据库,但不打开)
alterdatabasearchivelog; (更改数据库为归档模式)
alterdatabaseopen; (打开数据库)
altersystemarchivelogstart; (启用自动归档)

再执行一下

1             
archive log list             

1            
2
3
4
5
6
Database log mode	       Archive Mode            
Automatic archival Enabled
Archive destination USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence 12
Next log sequence to archive 14
Current log sequence 14

可以看到为Enabled,则成功打开归档模式。

2.4 Oracle打开日志相关

OGG基于辅助日志等进行实时传输,故需要打开相关日志确保可获取事务内容,通过下面的命令查看该状态

1             
selectforce_logging, supplemental_log_data_minfromv$database;             

1            
2
3
FORCE_ SUPPLEMENTAL_LOG            
------ ----------------
NO NO

若为NO,则需要通过命令修改

1             
2
alterdatabaseforcelogging;             
alterdatabaseaddsupplementallogdata;

再查看一下为YES即可

1             
2
3
4
5
SQL> select force_logging, supplemental_log_data_min from v$database;             

FORCE_ SUPPLEMENTAL_LOG
------ ----------------
YES YES

2.5 oracle创建复制用户

首先root用户建立相关文件夹,并赋予权限

1             
2
mkdir -p /u01/app/oracle/oggdata/orcl             
chown -R oracle:oinstall /u01/app/oracle/oggdata/orcl

然后执行下面sql

1             
2
3
4
5
6
7
8
9
10
11
SQL> create tablespace oggtbs datafile '/u01/app/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;             

Tablespace created.

SQL> create user ogg identified by ogg default tablespace oggtbs;

User created.

SQL> grant dba to ogg;

Grantsucceeded.

2.6 OGG初始化

1            
2
ggsci            
create subdirs
1            
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ggsci            

Oracle GoldenGate Command InterpreterforOracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258_FBO
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 20:20:21

Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.



GGSCI (ambari.master.com) 1> create subdirs

Creating subdirectories under current directory /root

Parameter files /root/dirprm: created
Report files /root/dirrpt: created
Checkpoint files /root/dirchk: created
Process status files /root/dirpcs: created
SQL script files /root/dirsql: created
Database definitions files /root/dirdef: created
Extract data files /root/dirdat: created
Temporary files /root/dirtmp: created
Stdout files /root/dirout: created


GGSCI (ambari.master.com) 2>

2.7 Oracle创建测试表

创建一个用户,在该用户下新建测试表,用户名、密码、表名均为 test_ogg。

1             
2
3
4
createusertest_oggidentifiedbytest_oggdefaulttablespaceusers;             
grantdbatotest_ogg;
conn test_ogg/test_ogg;
createtabletest_ogg(idint,namevarchar(20),primarykey(id));

3 目标端(kafka)配置

1            
2
3
mkdir -p /opt/ogg            
unzip 123111_ggs_Adapters_Linux_x64.zip
tar xf ggs_Adapters_Linux_x64.tar -C /opt/ogg/

3.2 环境变量

1            
vim /etc/profile            
1            
2
3
export OGG_HOME=/opt/ogg            
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib
export PATH=$OGG_HOME:$PATH
1            
source/etc/profile            

同样测试一下ogg命令

1             
ggsci             

3.3 初始化目录

1            
create subdirs            

4、OGG源端配置

4.1 配置OGG的全局变量

先切换到oracle用户下

1             
2
3
su oracle             
cd/opt/ogg
ggsci

1            
2
3
4
GGSCI (ambari.master.com) 1> dblogin userid ogg password ogg            
Successfully logged into database.

GGSCI (ambari.master.com) 2> edit param ./globals

然后和用vim编辑一样添加

1             
oggschema ogg             

4.2 配置管理器mgr

1            
2
3
4
5
GGSCI (ambari.master.com) 3> edit param mgr            
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

说明:PORT即mgr的默认监听端口;DYNAMICPORTLIST动态端口列表,当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个;AUTORESTART重启参数设置表示重启所有EXTRACT进程,最多5次,每次间隔3分钟;PURGEOLDEXTRACTS即TRAIL文件的定期清理

4.3 添加复制表

1            
2
3
4
5
6
7
8
9
GGSCI (ambari.master.com) 4> add trandata test_ogg.test_ogg            

Logging of supplemental redo data enabledfortable TEST_OGG.TEST_OGG.

GGSCI (ambari.master.com) 5> info trandata test_ogg.test_ogg

Logging of supplemental redologdata is enabledfortable TEST_OGG.TEST_OGG.

Columns supplementally loggedfortable TEST_OGG.TEST_OGG: ID

4.4 配置extract进程

1            
2
3
4
5
6
7
8
GGSCI (ambari.master.com) 6> edit param extkafka            
extract extkafka
dynamicresolution
SETENV (ORACLE_SID ="orcl")
SETENV (NLS_LANG ="american_america.AL32UTF8")
userid ogg,password ogg
exttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;

说明:第一行指定extract进程名称;dynamicresolution动态解析;SETENV设置环境变量,这里分别设置了Oracle数据库以及字符集;userid ggs,password ggs即OGG连接Oracle数据库的帐号密码,这里使用2.5中特意创建的复制帐号;exttrail定义trail文件的保存位置以及文件名,注意这里文件名只能是2个字母,其余部分OGG会补齐;table即复制表的表名,支持*通配,必须以;结尾

添加extract进程:

1            
2
GGSCI (ambari.master.com) 16> add extract extkafka,tranlog,begin now            
EXTRACT added.

(注:若报错

1             
ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).             

执行下面的命令再重新添加即可。

1             
create subdirs             

)

添加trail文件的定义与extract进程绑定:

1            
2
GGSCI (ambari.master.com) 17> add exttrail /opt/ogg/dirdat/to,extract extkafka            
EXTTRAIL added.

4.5 配置pump进程

pump进程本质上来说也是一个extract,只不过他的作用仅仅是把trail文件传递到目标端,配置过程和extract进程类似,只是逻辑上称之为pump进程

1             
2
3
4
5
6
7
8
GGSCI (ambari.master.com) 18> edit param pukafka             
extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost 192.168.44.129 mgrport 7809
rmttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;

说明:第一行指定extract进程名称;passthru即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;dynamicresolution动态解析;userid ogg,password ogg即OGG连接Oracle数据库的帐号密码rmthost和mgrhost即目标端(kafka)OGG的mgr服务的地址以及监听端口;rmttrail即目标端trail文件存储位置以及名称。

分别将本地trail文件和目标端的trail文件绑定到extract进程:

1             
2
3
4
GGSCI (ambari.master.com) 1> add extract pukafka,exttrailsource /opt/ogg/dirdat/to             
EXTRACT added.
GGSCI (ambari.master.com) 2> add rmttrail /opt/ogg/dirdat/to,extract pukafka
RMTTRAIL added.

4.6 配置define文件

Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射,在OGG命令行执行:

1             
2
3
4
GGSCI (ambari.master.com) 3> edit param test_ogg             
defsfile /opt/ogg/dirdef/test_ogg.test_ogg
userid ogg,password ogg
table test_ogg.test_ogg;

在OGG主目录下执行(oracle用户):

1             
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
./defgen paramfile dirprm/test_ogg.prm             

***********************************************************************
Oracle GoldenGate Table Definition GeneratorforOracle
Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258
Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 16:58:29

Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.


Starting at 2018-05-23 05:03:04
***********************************************************************

Operating System Version:
Linux
Version#1 SMP Wed Apr 12 15:04:24 UTC 2017, Release 3.10.0-514.16.1.el7.x86_64
Node: ambari.master.com
Machine: x86_64
softlimithardlimit
Address Space Size : unlimited unlimited
Heap Size : unlimited unlimited
File Size : unlimited unlimited
CPU Time : unlimited unlimited

Process id: 13126

***********************************************************************
** Running with the following parameters **
***********************************************************************
defsfile /opt/ogg/dirdef/test_ogg.test_ogg
userid ogg,password ***
table test_ogg.test_ogg;
Retrieving definitionforTEST_OGG.TEST_OGG



Definitions generatedfor1 tablein/opt/ogg/dirdef/test_ogg.test_ogg

将生成的/opt/ogg/dirdef/test_ogg.test_ogg发送的目标端ogg目录下的dirdef里:

1             
scp -r /opt/ogg/dirdef/test_ogg.test_ogg [email protected]:/opt/ogg/dirdef/             

5、OGG目标端配置

5.1 开启kafka服务

1            
2
3
cd/opt/kafka_2.11-1.1.0/            
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

5.2 配置管理器mgr

1            
2
3
4
5
GGSCI (ambari.slave1.com) 1>  edit param mgr            
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

5.3 配置checkpoint

checkpoint即复制可追溯的一个偏移量记录,在全局配置里添加checkpoint表即可。

1             
2
edit  param  ./GLOBALS             
CHECKPOINTTABLE test_ogg.checkpoint

5.4 配置replicate进程

1            
2
3
4
5
6
7
GGSCI (ambari.slave1.com) 4> edit param rekafka            
REPLICAT rekafka
sourcedefs /opt/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

说明:REPLICATE rekafka定义rep进程名称;sourcedefs即在4.6中在源服务器上做的表映射文件;TARGETDB LIBFILE即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props;REPORTCOUNT即复制任务的报告生成频率;GROUPTRANSOPS为以事务传输时,事务合并的单位,减少IO操作;MAP即源端与目标端的映射关系

5.5 配置kafka.props

1            
2
cd/opt/ogg/dirprm/            
vim kafka.props
1            
2
3
4
5
6
7
gg.handlerlist=kafkahandler //handler类型            
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名称,无需手动创建
gg.handler.kafkahandler.format=json //传输文件的格式,支持json,xml等
gg.handler.kafkahandler.mode=op //OGGforBig Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次
gg.classpath=dirprm/:/opt/kafka_2.11-1.1.0/libs/*:/opt/ogg/:/opt/ogg/lib/*
1            
vim custom_kafka_producer.properties            
1            
2
3
4
5
6
7
8
bootstrap.servers=192.168.44.129:9092//kafkabroker的地址            
acks=1
compression.type=gzip //压缩类型
reconnect.backoff.ms=1000//重连延时
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

其中需要将后面的注释去掉,ogg不识别注释,如果不去掉会报错

5.6 添加trail文件到replicate进程

1            
2
GGSCI (ambari.slave1.com) 2> add replicat rekafka exttrail /opt/ogg/dirdat/to,checkpointtable test_ogg.checkpoint            
REPLICAT added.

6、测试

6.1 启动所有进程

在源端和目标端的OGG命令行下使用start [进程名]的形式启动所有进程。
启动顺序按照源mgr——目标mgr——源extract——源pump——目标replicate来完成。
全部需要在ogg目录下执行ggsci目录进入ogg命令行。
源端依次是

1             
2
3
start mgr             
start extkafka
start pukafka

目标端

1             
2
start mgr             
start rekafka

可以通过info all 或者info [进程名] 查看状态,所有的进程都为RUNNING才算成功
源端

1             
2
3
4
5
6
7
GGSCI (ambari.master.com) 5> info all             

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING
EXTRACT RUNNING EXTKAFKA 04:50:21 00:00:03
EXTRACT RUNNING PUKAFKA 00:00:00 00:00:03

目标端

1             
2
3
4
5
6
GGSCI (ambari.slave1.com) 3> info all             

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING
REPLICAT RUNNING REKAFKA 00:00:00 00:00:01

6.2 异常解决

如果有不是RUNNING可通过查看日志的方法检查解决问题,具体通过下面两种方法

1             
vim ggser.log             

或者ogg命令行,以rekafka进程为例

1            
GGSCI (ambari.slave1.com) 2> view report rekafka            

列举其中我遇到的一个问题:
异常信息

1             
2
3
SEVERE: Unable tosetproperty on handler'kafkahandler'(oracle.goldengate.handler.kafka.KafkaHandler). Failed tosetproperty: TopicName:="test_ogg"(class: oracle.goldengate.handler.kafka.KafkaHandler).             
oracle.goldengate.util.ConfigException: Failed tosetproperty: TopicName:="test_ogg"(class: oracle.goldengate.handler.kafka.KafkaHandler).
at ......

具体原因是网上的教程是旧版的,设置topicName的属性为:

1             
gg.handler.kafkahandler.topicName=test_ogg             

新版的这样设置

1             
gg.handler.kafkahandler.topicMappingTemplate=test_ogg             

大家可根据自己的版本进行设置,附上stackoverflow原答案

1             
2
3
4
5
6
7
8
I tried to move data from Oracle Database to Kafka using Golden gate adapter Version 12.3.0.1.0             

In new version there is no topicname

The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate=test

In previous version we have gg.handler.kafkahandler.topicName=test

6.3 测试同步更新效果

现在源端执行sql语句

1             
2
3
4
5
6
7
conn test_ogg/test_ogg             
insert into test_ogg values(1,'test');
commit;
update test_oggsetname='zhangsan'whereid=1;
commit;
delete test_oggwhereid=1;
commit;

查看源端trail文件状态

1             
2
ls -l /opt/ogg/dirdat/to*             
-rw-rw-rw- 1 oracle oinstall 1464 May 23 10:31 /opt/ogg/dirdat/to000000

查看目标端trail文件状态

1             
2
ls -l /opt/ogg/dirdat/to*             
-rw-r----- 1 root root 1504 May 23 10:31 /opt/ogg/dirdat/to000000

查看kafka是否自动建立对应的主题

1             
bin/kafka-topics.sh --list --zookeeper localhost:2181             

在列表中显示有test_ogg则表示没问题
通过消费者看是否有同步消息

1             
2
3
4
bin/kafka-console-consumer.sh --bootstrap-server 192.168.44.129:9092 --topic test_ogg --from-beginning             
{"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2018-05-23 10:31:28.000078","current_ts":"2018-05-23T10:36:48.525000","pos":"00000000000000001093","after":{"ID":1,"NAME":"test"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"U","op_ts":"2018-05-23 10:31:36.000073","current_ts":"2018-05-23T10:36:48.874000","pos":"00000000000000001233","before":{},"after":{"ID":1,"NAME":"zhangsan"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"D","op_ts":"2018-05-23 10:31:43.000107","current_ts":"2018-05-23T10:36:48.875000","pos":"00000000000000001376","before":{"ID":1}}

显然,Oracle的数据已准实时同步到Kafka,格式为json,其中op_type代表操作类型,这个可配置,我没有配置则按默认的来,默认为

1             
2
3
gg.handler.kafkahandler.format.insertOpKey = I             
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D

before代表操作之前的数据,after代表操作后的数据,现在已经可以从kafka获取到同步的json数据了,后面可以用SparkStreaming和Storm等解析然后存到hadoop等大数据平台里

6.4 SparkStreaming测试消费同步消息

具体代码可参考 Spark Streaming连接Kafka入门教程
下面附上消费成功的结果图

7、更新:后续遇到的问题

在后面的使用过程中发现上面同步到kafka的json数据中少一些我们想要的一些,下面讲一下我是如何解决的
首先建表:

1             
2
3
4
5
6
7
8
9
CREATETABLE"TCLOUD"."T_OGG2"             
("ID"NUMBER(*,0),
"TEXT_NAME"VARCHAR2(20),
"AGE"NUMBER(*,0),
"ADD"VARCHAR2(100),
"IDD"VARCHAR2(100),
CONSTRAINT"T_OGG2_PK"PRIMARYKEY("ID","IDD")

)

为什么不用之前建的表,主要是之前的字段太少,不容易看出问题,现在主要是增加几个字段,然后id,idd是联合主键。
看一下按照之前的配置,同步到kafka的数据(截取部分数据)

1             
2
3
4
5
{"table":"TCLOUD.T_OGG2","op_type":"I","op_ts":"2018-05-31 11:46:09.512672","current_ts":"2018-05-31T11:46:15.292000","pos":"00000000000000001903","after":{"ID":4,"TEXT_NAME":null,"AGE":0,"ADD":null,"IDD":"8"}}             
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 11:49:10.514549","current_ts":"2018-05-31T11:49:16.450000","pos":"00000000000000002227","before":{},"after":{"ID":4,"TEXT_NAME":"lisi","IDD":"7"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 11:49:48.514869","current_ts":"2018-05-31T11:49:54.481000","pos":"00000000000000002373","before":{"ID":4,"IDD":"7"},"after":{"ID":1,"IDD":"7"}}

{"table":"TCLOUD.T_OGG2","op_type":"D","op_ts":"2018-05-31 11:52:38.516877","current_ts":"2018-05-31T11:52:45.633000","pos":"00000000000000003161","before":{"ID":1,"IDD":"7"}}

现在只有insert的数据是全的,update更新非主键字段before是没有数据的,更新主键before只有主键的数据,delete只有before的主键字段,也就是update和delete的信息是不全的,且没有主键信息(程序里是不能判断哪一个是主键的),这样对于程序自动解析同步数据是不利的(不同的需求可能不一样),具体自己可以分析,就不啰嗦了,这里主要解决,有需要before和after全部信息和主键信息的需求。

7.1 添加before

在源端extract里添加下面几行

1             
2
3
4
GGSCI (ambari.master.com) 33> edit param extkafka             
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES

重启 extkafka

1             
2
stop extkafka             
start extkafka

然后测试

1             
2
3
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.709000","pos":"00000000000000003770","before":{"ID":1,"AGE":20,"IDD":"1"},"after":{"ID":1,"AGE":1,"IDD":"1"}}             
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.714000","pos":"00000000000000004009","before":{"ID":1,"AGE":20,"IDD":"2"},"after":{"ID":1,"AGE":1,"IDD":"2"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.715000","pos":"00000000000000004248","before":{"ID":1,"AGE":20,"IDD":"8"},"after":{"ID":1,"AGE":1,"IDD":"8"}}

发现update之后before里有数据即可,但是现在before和after的数据都不全(只有部分字段)

网上有的说只添加GETUPDATES即可,但我测试了没有成功,关于每个配置项什么含义可以参考 https://blog.csdn.net/linucle/article/details/13505939(有些配置的含义里面也没有给出)
参考: http://www.itpub.net/thread-2083473-1-1.html

7.2 添加主键

在kafka.props添加

1            
gg.handler.kafkahandler.format.includePrimaryKeys=true            

重启 rekafka

1             
2
stop rekafka             
start rekafka

测试:

1             
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:58:57.637035","current_ts":"2018-05-31T14:59:03.401000","pos":"00000000000000004510","primary_keys":["ID","IDD"],"before":{"ID":1,"AGE":1,"IDD":"1"},"after":{"ID":1,"AGE":20,"IDD":"1"}}             

发现有primary_keys,不错~

参考: http://blog.51cto.com/lyzbg/2088409

7.3 补全全部字段

如果字段补全应该是Oracle没有开启全列补充日志

1             
2
3
4
5
SQL> select supplemental_log_data_all from v$database;             

SUPPLE
------
NO

通过以下命令开启

1             
2
3
4
5
6
7
8
9
10
11
SQL> alter database add supplemental log data(all) columns;             

Database altered.

SQL> select supplemental_log_data_all from v$database;

SUPPLE
------
YES

SQL>

测试一下

1             
2
3
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.891000","pos":"00000000000000006070","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"1"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"1"}}             
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.893000","pos":"00000000000000006341","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"2"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"2"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.895000","pos":"00000000000000006612","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"8"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"8"}}

到现在json信息里的内容已经很全了,基本满足了我想要的,附图:

启发我发现和Oracle全列补充日志没有开启有关的博客: https://blog.csdn.net/huoshuyinhua/article/details/79013387
开启命令参考: https://blog.csdn.net/aaron8219/article/details/16825963

注:博客上讲到,开启全列补充日志会导致磁盘快速增长,LGWR进程繁忙,不建议使用。大家可根据自己的情况使用。

8、关于通配

如果想通配整个库的话,只需要把上面的配置所有表名的改为 ,如test_ogg.test_ogg改为 test_ogg.,但是kafka的topic不能通配,所以需要把所有的表的数据放在一个topic即可,后面再用程序解析表名即可。

9、附录

目标端在 这里,下载下来后文件名123111_ggs_Adapters_Linux_x64.zip

源端在 旧版本查询下载,下载后文件名为V34339-01.zip

参考资料

基于OGG的Oracle与Hadoop集群准实时同步介绍

本文由  董可伦 发表于  伦少的博客 ,采用 署名-非商业性使用-禁止演绎 3.0进行许可。

非商业转载请注明作者及出处。商业转载请联系作者本人。

本文标题: 利用ogg实现oracle到kafka的增量数据实时同步

本文链接: https://dongkelun.com/2018/05/23/oggOracle2Kafka/


相关 [利用 ogg oracle] 推荐:

利用ogg实现oracle到kafka的增量数据实时同步 | 伦少的博客

- -
ogg即Oracle GoldenGate是Oracle的同步工具,本文讲如何配置ogg以实现Oracle数据库增量数据实时同步到kafka中,其中同步消息格式为json. 下面是我的源端和目标端的一些配置信息:. 注意:源端和目标端的文件不一样,目标端需要下载Oracle GoldenGate for Big Data,源端需要下载Oracle GoldenGate for Oracle具体下载方法见最后的附录截图.

使用OGG,两个Oracle库之间单向同步数据

- - CSDN博客数据库推荐文章
配置:源数据库100.100.100.21         实例名dbsid2.             目标数据库100.100.100.41       实例名db1. 实验目标:源数据库中的 scott用户 emp表同步到目标数据库 scott中的test表. 安装包为文件ogg112101_fbo_ggs_Linux_x64_ora10g_64bit.zip.

OGG双向复制oracle数据库配置

- -
Oracle GoldenGate supports an active-active bi-directional configuration, where there are two systems with identical sets of data that can be changed by application users on either system.

基于OGG的Oracle与Hadoop集群准实时同步介绍 - 偶素浅小浅 - 博客园

- -

[Oracle] 利用自治事务实现审计功能

- - CSDN博客数据库推荐文章
在一个事务(外层事务)中可以定义一个或几个自治事务. 自治事务可以独立commit,不对外层事务产生影响,同样外层事务的 rollback 也对自治事务没有影响. 假设现在有这样一个需求,不能对某个表进行某些操作(如更新),一旦有这类操作,就强制整个回滚,并且在审计日志中记录该次违规操作. 现在的问题是当事务回滚之后,日志中的记录也会跟着被回滚,这时就可以利用自治事务来防止这个问题.

简析将shp导入Oracle并利用geoserver将导入的数据发布 - Naa

- - 博客园_首页
首先要安装有支持空间数据的Oracle,其次有安装版或免安装版的geoserver,两者都部署好后,我们将开始讨论进一步需要配置的环境. 这里我使用由Oracle提供的shp2sdo来进行shp导入,同时使用由geoserver提供的可以连接Oracle数据源的插件来进行数据的发布. Shp2sdo的下载地址为: http://down.51cto.com/data/223757.

Oracle 收购 Ksplice

- feng823 - LinuxTOY
实现无需重启即可为 Linux 内核打安全补丁的 Ksplice 被 Oracle 收购. 在被收购前, Ksplice 为 Fedora, Ubuntu 免费提供该功能,对于 RHEL 和 CentOS 则需要订阅其产品. Oracle 表示将把 Ksplice 带来的零宕机安全更新功能添加到 Oracle 产品订阅服务中,同时停止对其他企业级 Linux 发行版的支持,将 Oracle Unbreakable Linux 打造成唯一具备零宕机安全更新功能的企业级 Linux 发行版.

Linux Ksplice,MySQL and Oracle

- Syn - DBA Notes
Oracle 在 7 月份收购了 Ksplice. 使用了 Ksplice 的 Linux 系统,为 Kernel 打补丁无需重启动,做系统维护的朋友应该明白这是一个杀手级特性. 现在该产品已经合并到 Oracle Linux 中. 目前已经有超过 700 家客户,超过 10 万套系统使用了 Ksplice (不知道国内是否已经有用户了.

oracle license计算

- Fenng - eagle's home
Oracle license的计算是基于CPU core的. 用core的数目乘以一个系数core factor就可以得到所需的oracle license的数目. 对于不同的CPU,core factor是不一样的,可以从oracle提供的这张列表中查到 Oracle Processor Core Factor Table.

Oracle Exadata初探

- - 技术改变世界 创新驱动中国 - 《程序员》官网
在我们看来,它是一个把硬件和软件根据合理的配置整合在一起的 Oracle数据库(在本文编写时是11gR2版本)平台. Exadata数据库机器包含了存储子系统,在存储层上运行着研发的新软件,这使得研发人员可以做一些在其他平台上无法完成的事情. 实际上,Exadata一开始是以一个存储系统形式诞生的,如果你跟参与研发此产品的人交谈,你经常会听到他们称存储组件为Exadata或者是SAGE (Storage Appliance for Grid Environments,网格环境存储设备),这是该产品研发项目的代码名称.