はじめに
Oracle GoldenGate for DAA(Distributed Applications and Analytics) 23ai の Kafka ハンドラの環境構築から、Kafka トピックをTarget としたレプリケーションを試します。DAAは、以前 for Big Data というくくりでリリースされていましたが、対象製品が拡充され、Stream Analytics などが含まれるなどで名称が変更されています。
Database 以外の連携先としては、Cloud Storageや Big Data製品に並び Kafkaの引き合いも増えつつある様です
環境について
今回の環境は以下の図のようになります。

Source Database : Oracle 23ai RU23.6
PDB : ORCL236PDB1: SCOTT EMP2KAFA表を作成し、ソースとする
Target Stream : Kafka 3.6.2
Kafka トピック : EMP2KAFKAを作成し、ターゲットとする
GoldenGate HUB :
- GoldenGate for Oracle 23ai (23.4.1.24.05) デプロイメントに Extract を実装
- GoldenGate for DAA(Kafka) 23ai (23.4.0.24.06) デプロイメントに Replicat を実装
※ Trail ファイルは両デプロイメントからアクセス可能なディレクトリに配置する
実装の手順および実行
- [Source] Database 環境の準備と確認
- [Target] Kafka 環境の準備と確認
- GG HUB 環境の準備 (GG for Oracle / GG for DAA – Kafka) デプロイメント作成
- Oracle ⇨ Kafka のレプリケーション実施と監視・管理
- コンソールからレプリケーションの確認
1. [Source] Database 環境の準備と確認

ORCL236PDB1を作成し、SCOTTユーザにEMP2KAFKA表を作成する。
GG for Oracle の Extractはこの表を対象にDML処理を抽出する。
2. [Target] Kafka 環境の準備と確認
⇨ [記事] GoldenGate DAA 23ai 向けKafka環境の準備 をご参照下さい
Link:
3. GG HUB 環境の準備 (GG for Oracle / GG for DAA – Kafka) デプロイメント作成
⇨ [記事] GoldenGate DAA 23ai 向けGG HUB環境の準備 をご参照下さい
4. Oracle ⇨ Kafka のレプリケーション実施と監視・管理
A) Kafka 依存性ファイルのインストール
DAAに付属の依存性ダウンローダ・ユーティリティを使用してダウンロードする必要がある
Oracle GoldenGate for Distributed Applications and AnalyticsによるApache Kafkaへのリアルタイム・メッセージ取込み
ただし、事前に依存性ダウンローダの設定が必要となる
1.3.1 依存性ダウンローダの設定
export PROXY_SERVER_HOST=www-proxy-hqdc.us.oracle.com
export PROXY_SERVER_PORT=80
/u01/app/ogg/opt/DependencyDownloader へ 移動
[oracle@gghub DependencyDownloader]$ ./kafka.sh 3.6.2 (⇦ version指定)
openjdk version "1.8.0_345"
Java is installed.
Apache Maven 3.9.6 (bc0240f3c744dd6b6ec2920b3cd08dcc295161ae)
Maven is accessible.
Root Configuration Script
INFO: This is the Maven binary [../../ggjava/maven-3.9.6/bin/mvn].
INFO: This is the location of the settings.xml file [./docs/settings_np.xml].
INFO: This is the location of the toolchains.xml file [./docs/toolchains.xml].
INFO: The dependencies will be written to the following directory[../dependencies/kafka_3.6.2].
INFO: The Maven coordinates are the following:
INFO: Dependency 1
(省略) ⇨ 依存性ファイルのインストール完了
/u01/app/oracle/product/kafkaconf/producer.properties ファイルを作成し以下をセットします。
bootstrap.servers=<Kafkaブローカ-ノード※>:9092
compression.type=none
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=16384
linger.ms=0
※今回の環境では serverb1 としました。
B) Kafka用 Replicatを構成する
B-1. Replicaの追加 ~ クラシックを選択し、プロセス名を入力
B-2. Replicaオプションの設定
- Replicat トレイル名 : ok : Oracle用のExtractが生成する名称に合わせる
- Replicat トレイル・サブディレクトリ : Oracle用のExtractが生成するディレクトリに合わせる
- Replicat ターゲット : 今回は Kafka
B-3 管理対象オプション ~ 全てデフォルト
B-4 パラメータ・ファイル ~ 今回は特に変更しない
B-5. プロパティ・ファイル ~ デフォルトでは最小限のプロパティがセットされる
ここで設定されるプロパティは以下です。
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.kafkaProducerConfigFile=/u01/app/oracle/product/kafkaconf/producer.properties
gg.handler.kafkahandler.topicMappingTemplate=EMP2KAFKA
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format=json
gg.hander.name.logSuccessfullySentMessages=true
gg.classpath=/u01/app/oracle/product/goldengate_daa/opt/DependencyDownloader/dependencies/kafka_3.6.2/*
jvm.bootoptions=-Xmx512m -Xms32m
Source (Oracle Database)からの抽出環境=Extractも起動します。(手順は省略)
ここでは、以下の3つのパターンでレプリケーション動作を確認する
a. 限りなくデフォルトに近い設定でDMLを発行する
b. プロパティを区切りテキストに変更してDMLを発行する
c. プラガブル・フォーマッタを利用したプロパティでDMLを発行する
a. 限りなくデフォルトに近い設定でDMLを発行する
source table scott.emp2kafka に1件挿入 & commit
INSERT INTO EMP2KAFKA VALUES (7369,'SMITH','CLERK',7902,to_date('17-12-1980','dd-mm-yyyy'),800,NULL,20);
INSERT INTO EMP2KAFKA VALUES(7499,'ALLEN','SALESMAN',7698,to_date('20-2-1981','dd-mm-yyyy'),1600,300,30);
COMMIT;
Kafka Broker から Topicの内容を確認する
[oracle@serverb1 bin]$ ./kafka-console-consumer.sh --bootstrap-server serverb1:9092 --topic EMP2KAFKA --from-beginning
{"after":{"EMPNO":7369,"ENAME":"SMITH","JOB":"CLERK","MGR":7902,"HIREDATE":"1980-12-17 00:00:00","SAL":800.00,"COMM":null,"DEPTNO":20}}
{"after":{"EMPNO":7499,"ENAME":"ALLEN","JOB":"SALESMAN","MGR":7698,"HIREDATE":"1981-02-20 00:00:00","SAL":1600.00,"COMM":300.00,"DEPTNO":30}}
プロパティ設定どおり、JSONフォーマットによるメッセージ送信が確認できます。
9.2.34.4.6.2 操作データのフォーマットの詳細
上記から、操作モードでのモデル化により、挿入処理が “after” として JSON化されていますが、Update については、処理対象カラムとデータの”before” / “after” が出力されます。
SQL> update emp2kafka set sal=1800 where empno=7369;
以下がKafka トピックへ
{"before":{"EMPNO":7369,"SAL":800.00},"after":{"EMPNO":7369,"SAL":1800.00}}
b. プロパティを区切りテキストに変更してDMLを発行する
gg.handler.kafkahandler.format=json を以下に書き換える
(カンマ区切りのテキストデータとして扱う)
gg.handler.kafkahandler.format=delimitedtext
gg.handler.kafkahandler.format.fieldDelimiter=CDATA[,]
以下のSQL (DML)を実行します。
SQL>INSERT INTO EMP VALUES
(7566,'JONES','MANAGER',7839,to_date('2-4-1981','dd-mm-yyyy'),2975,NULL,20);
以下がKafka トピックへ
7566,JONES,MANAGER,7839,1981-04-02 00:00:00,2975.00,NULL,20
※ Delete 処理でも同様に、Delete 対象のレコードデータが出力されますが、プロパティの設定に依存します。
c. プラガブル・フォーマッタを利用したプロパティでDMLを発行する
マニュアルを参照し、JSONベースのプラガブル・フォーマッタを利用してみます。
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.kafkaProducerConfigFile=/u01/app/oracle/product/kafkaconf/producer.properties
gg.handler.kafkahandler.topicMappingTemplate=EMP2KAFKA
gg.handler.kafkahandler.mode=op
# gg.handler.kafkahandler.mode=tx
# gg.handler.kafkahandler.format=json
# gg.handler.kafkahandler.format=delimitedtext
# gg.handler.kafkahandler.format.fieldDelimiter=CDATA[,]
gg.hander.name.logSuccessfullySentMessages=true
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.handler.kafkahandler.format.prettyPrint=false
gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.generateSchema=true
gg.handler.kafkahandler.format.schemaDirectory=dirdef
gg.handler.kafkahandler.format.treatAllColumnsAsStrings=false
gg.handler.kafkahandler.format.metaColumnsTemplate=${objectname[table]},${optype[op_type]},${timestamp[op_ts]},${currenttimestamp[current_ts]},${position[pos]},${primarykeycolumns[primary_keys]},${alltokens[tokens]}
gg.classpath=/u01/app/oracle/product/goldengate_daa/opt/DependencyDownloader/dependencies/kafka_3.6.2/*
jvm.bootoptions=-Xmx512m -Xms32m
上記のプロパティ設定により以下のメッセージ出力となります。
INSERT INTO EMP2KAFKA VALUES (7788,'SCOTT','ANALYST',7566,to_date('13-6-1987','dd-mm-yyyy'),3000,NULL,20);
update emp2kafka set sal=9999 where empno=7788;
delete from emp2kafka where empno='7788';
commit;
以下がKafka トピックへ
{"table":"SCOTT.EMP2KAFKA","op_type":"I","op_ts":"2025-05-14 21:17:07.000000","current_ts":"2025-05-13 22:52:21.041000","pos":"00000000020000004541","primary_keys":["EMPNO"],"tokens":{},"after":{"EMPNO":7788,"ENAME":"SCOTT","JOB":"ANALYST","MGR":7566,"HIREDATE":"1987-06-13 00:00:00","SAL":3000.00,"COMM":null,"DEPTNO":20}}
{"table":"SCOTT.EMP2KAFKA","op_type":"U","op_ts":"2025-05-14 21:17:17.000000","current_ts":"2025-05-13 22:52:31.053000","pos":"00000000020000004920","primary_keys":["EMPNO"],"tokens":{},"before":{"EMPNO":7788,"SAL":3000.00},"after":{"EMPNO":7788,"SAL":9999.00}}
{"table":"SCOTT.EMP2KAFKA","op_type":"D","op_ts":"2025-05-14 21:16:47.000000","current_ts":"2025-05-13 22:52:00.926000","pos":"00000000020000004161","primary_keys":["EMPNO"],"tokens":{},"before":{"EMPNO":7788,"ENAME":"SCOTT","JOB":"ANALYST","MGR":7566,"HIREDATE":"1987-06-13 00:00:00","SAL":9999.00,"COMM":null,"DEPTNO":20}}
5.コンソールからレプリケーションの確認
ここは、Extract / Replicat に関するレポートファイルおよび統計情報を参照する事で、詳細な情報が得られます。ただし、統計については各プロセスを一時的に停止⇨開始すると過去データをGUI上に表示する事が出来ないのでご注意下さい。
***********************************************************************
* ** Run Time Statistics ** *
***********************************************************************
最後にコミットされたトランザクションの最後のレコードは次のとおりです:
___________________________________________________________________
Trail name : /u01/app/oracle/product/goldengate/deployment/dep01/var/lib/data/ok000000003
Hdr-Ind : E (x45) Partition : . (x0c)
UndoFlag : . (x00) BeforeAfter: A (x41)
RecLength : 123 (x007b) IO時間 : 2025-05-15 18:58:07.000000
IOType : 5 (x05) OrigNode : 255 (xff)
TransInd : . (x03) FormatType : R (x52)
SyskeyLen : 0 (x00) Incomplete : . (x00)
AuditRBA : 339 AuditPos : 51246720
Continued : N (x00) RecCount : 1 (x01)
2025-05-15 18:58:07.000000 Insert Len 123 RBA 3469
TDR Index: 1
___________________________________________________________________
2025-05-14 20:34:02 INFO OGG-30478 Reading /u01/app/oracle/product/goldengate/deployment/dep01/var/lib/data/ok000000003, current SEQNO 3, RBA 3,846, 1 records.
Report at 2025-05-14 20:34:02 (activity since 2025-05-14 20:33:20)
表SCOTT.EMP2KAFKAからSCOTT.EMP2KAFKAまで:
# inserts: 1
# updates: 0
# deletes: 0
# upserts: 0
# discards: 0
Last log location read:
FILE: /u01/app/oracle/product/goldengate/deployment/dep01/var/lib/data/ok000000003
SEQNO: 3
RBA: 3846
TIMESTAMP: Not Available
EOF: YES
READERR: 400
2025-05-14 20:34:02 INFO OGG-25701 ファイル・キャッシング・スレッドが停止されました。スレッドID: 139979252168448。.
CACHE OBJECT MANAGER統計
CACHE MANAGER VM USAGE
vm current = 0 vm anon queues = 0
vm anon in use = 0 vm file = 0
vm used max = 0 cachesize = 16G
vm current copy on write = 0 vm used max copy on write = 0
vm current inactive objs = 0
CACHE CONFIGURATION
cache size = 16G cache force paging = 24G
pageout eligible size = 8M cache vm pageout max = 64M
================================================================================
RUNTIME STATS FOR SUPERPOOL
ノート: キャッシュ・オブジェクトは、必ずではありませんが、トランザクションのプロキシであることが多いです
CACHEオブジェクト統計
アクティブ・オブジェクト = 0 最大同時実行数 = 0
ゼロ以外の合計l = 0 合計オブジェクト = 0
削除のためにキューされているオブジェクト = 0 削除のためにキューされる最大数 = 0
非アクティブ・オブジェクト = 0
CACHEファイル・キャッシング
ファイルキャッシュ・リクエスト = 0 ディスクへのバイト = 0
ファイル取得 = 0 ファイルキャッシュされたオブジェクト = 0
キュー・エントリ = 0 処理されたキュー = 1
不要なキュー・エントリ = 0 通知されていないキュー = 0
fc requesting obj = 0
CACHE MANAGEMENT
バッファ・リンク = 0 anon取得 = 0
強制マップ解除 = 0 cnnbl試行 = 0
キャッシュ・アウト = 0
Note: For statistics tables below each specification represents a power of
two bucket.
For example: '|4K: 1234 |8K:' represents 1234 items between 4K and 8K-1
割当てリクエスト分散
< 128B: 0
128B: 0 | 256B: 0 | 512B: 0 | 1K: 0
2K: 0 | 4K: 0 | 8K: 0 | 16K: 0
32K: 0 | 64K: 0 | 128K: 0 | 256K: 0
512K: 0 | 1M: 0 | 2M: 0 | 4M: 0
8M: 0 | 16M: 0 | 32M: 0 | 64M: 0
128M: 0 | 256M: 0 | 512M: 0 | 1G: 0
2G: 0
キャッシュされたオブジェクト・サイズの分散
0: 0
< 4K: 0
4K: 0 | 8K: 0 | 16K: 0 | 32K: 0
64K: 0 | 128K: 0 | 256K: 0 | 512K: 0
1M: 0 | 2M: 0 | 4M: 0 | 8M: 0
16M: 0 | 32M: 0 | 64M: 0 | 128M: 0
256M: 0 | 512M: 0 | 1G: 0 | 2G: 0
4G: 0 | 8G: 0 | 16G: 0 | 32G: 0
64G: 0 | 128G: 0 | 256G: 0 | 512G: 0
1T: 0 | 2T: 0 | 4T: 0 | 8T: 0
16T: 0 | 32T: 0 | 64T: 0 | 128T: 0
256T: 0 | 512T: 0 | 1P: 0 | 2P: 0
================================================================================
CUMULATIVE STATS FOR SUPERPOOL (PREVIOUS RUNS ONLY)
ノート: キャッシュ・オブジェクトは、必ずではありませんが、トランザクションのプロキシであることが多いです
CACHEオブジェクト統計
アクティブ・オブジェクト = 0 最大同時実行数 = 0
ゼロ以外の合計l = 0 合計オブジェクト = 0
削除のためにキューされているオブジェクト = 0 削除のためにキューされる最大数 = 0
非アクティブ・オブジェクト = 0
CACHEファイル・キャッシング
ファイルキャッシュ・リクエスト = 0 ディスクへのバイト = 0
ファイル取得 = 0 ファイルキャッシュされたオブジェクト = 0
キュー・エントリ = 0 処理されたキュー = 0
不要なキュー・エントリ = 0 通知されていないキュー = 0
fc requesting obj = 0
CACHE MANAGEMENT
バッファ・リンク = 0 anon取得 = 0
強制マップ解除 = 0 cnnbl試行 = 0
キャッシュ・アウト = 0
割当てリクエスト分散
< 128B: 0
128B: 0 | 256B: 0 | 512B: 0 | 1K: 0
2K: 0 | 4K: 0 | 8K: 0 | 16K: 0
32K: 0 | 64K: 0 | 128K: 0 | 256K: 0
512K: 0 | 1M: 0 | 2M: 0 | 4M: 0
8M: 0 | 16M: 0 | 32M: 0 | 64M: 0
128M: 0 | 256M: 0 | 512M: 0 | 1G: 0
2G: 0
キャッシュされたオブジェクト・サイズの分散
0: 0
< 4K: 0
4K: 0 | 8K: 0 | 16K: 0 | 32K: 0
64K: 0 | 128K: 0 | 256K: 0 | 512K: 0
1M: 0 | 2M: 0 | 4M: 0 | 8M: 0
16M: 0 | 32M: 0 | 64M: 0 | 128M: 0
256M: 0 | 512M: 0 | 1G: 0 | 2G: 0
4G: 0 | 8G: 0 | 16G: 0 | 32G: 0
64G: 0 | 128G: 0 | 256G: 0 | 512G: 0
1T: 0 | 2T: 0 | 4T: 0 | 8T: 0
16T: 0 | 32T: 0 | 64T: 0 | 128T: 0
256T: 0 | 512T: 0 | 1P: 0 | 2P: 0
キュー統計
キューの数 = 13 キュー・ヒット = 0 キュー・ミス = 0
queue size q hits q misses curlen maxlen cannibalized
0 64K 0 0 0 0 0
1 128K 0 0 0 0 0
2 256K 0 0 0 0 0
3 512K 0 0 0 0 0
4 1M 0 0 0 0 0
5 2M 0 0 0 0 0
6 4M 0 0 0 0 0
7 8M 0 0 0 0 0
8 16M 0 0 0 0 0
9 32M 0 0 0 0 0
10 64M 0 0 0 0 0
11 128M 0 0 0 0 0
12 256M 0 0 0 0 0
================================================================================
RUNTIME STATS FOR CACHE POOL #0
POOL INFO group: KFREP02 id: p1018496_BLOB instance: 0 tid: (nil)
objects active = 0 objects concurrent (max) = 0
objects inactive = 0 objects total = 0 (0 )
flag = 0x000000030
last error = (0=<none>)
割当てリクエスト分散
< 128B: 0
128B: 0 | 256B: 0 | 512B: 0 | 1K: 0
2K: 0 | 4K: 0 | 8K: 0 | 16K: 0
32K: 0 | 64K: 0 | 128K: 0 | 256K: 0
512K: 0 | 1M: 0 | 2M: 0 | 4M: 0
8M: 0 | 16M: 0 | 32M: 0 | 64M: 0
128M: 0 | 256M: 0 | 512M: 0 | 1G: 0
2G: 0
================================================================================
CUMULATIVE STATS FOR CACHE POOL #0 (PREVIOUS RUNS ONLY)
POOL INFO group: KFREP02 id: p1018496_BLOB instance: 0 tid: (nil)
objects active = 0 objects concurrent (max) = 0
objects inactive = 0 objects total = 0 (0 )
flag = 0x000000030
last error = (0=<none>)
割当てリクエスト分散
< 128B: 0
128B: 0 | 256B: 0 | 512B: 0 | 1K: 0
2K: 0 | 4K: 0 | 8K: 0 | 16K: 0
32K: 0 | 64K: 0 | 128K: 0 | 256K: 0
512K: 0 | 1M: 0 | 2M: 0 | 4M: 0
8M: 0 | 16M: 0 | 32M: 0 | 64M: 0
128M: 0 | 256M: 0 | 512M: 0 | 1G: 0
2G: 0
----
最後に
今回はOracle GoldenGate DAA 23ai の Kafka ハンドラを用いて、Oracle Database への DMLを伝播し、Kafka Topic へその処理内容を送信する内容について触れました。送信先がデータベースではない場合、その送信されたデータをどのように扱うかによって、その設定(プロパティ)は大きく変わると思いますが環境構築から処理のイメージは伝わったのではないでしょうか。
GoldenGate DAAは様々なデータ・ソース連携が可能で、かつ拡張され続けてています。他のデータ・ソース連携も念頭に追求していく予定です。








