はじめに
Oracle input plugin for Embulkは、EmbulkでOracleをインプットとして扱うためのプラグインです。
Oracle input pluginの増分ローディング(Incremental loading)を試してみます。
増分ローディングにより、前回、Embulk runコマンドでインプットしたときの増分レコードのみがアウトプットの対象になります。
Oracle input plugin for Embulkの使用方法は以下のサイトを参照してください。
環境
使用する環境は以下のとおりです。
- CentOS 7.5(JDK1.8インストール済み)
- Oracle 12cR2
- Embulk 0.9.18
以下で構築した環境を利用しています。
テスト用のテーブルの作成
最初にテスト用のテーブル(emtest)とデータを作成します。
CREATE TABLE emtest
(
id NUMBER(3,0),
name VARCHAR2(50),
update_at TIMESTAMP(0)
);
insert into emtest values(1, 'AAAA', current_timestamp);
insert into emtest values(2, 'BBBB', current_timestamp);
commit;
Oracle input pluginを使用する
最初は増分ローディングを使用せずに、対象テーブルの全データをCSVに出力してみます。
作成した設定ファイルは以下のとおりです。
in:
type: oracle
driver_path: ./ojdbc7.jar
host: 192.168.10.232
user: hr
password: "hr"
database: testdb
table: emtest
select: "*"
out:
type: file
path_prefix: "./file/emtest"
sequence_format: "."
file_ext: csv
formatter:
type: csv
delimiter: ","
newline: CRLF
newline_in_field: LF
charset: UTF-8
quote_policy: MINIMAL
quote: '"'
escape: "\\"
null_string: "\\N"
default_timezone: 'Asia/Tokyo'
embulk runコマンドを実行すると、emtest.csvファイルに以下のように出力されます。
# embulk run config_ora_csv.yml
# cat file/emtest.csv
NO,NAME,UPDATE_AT
1.0,AAAA,2019-09-01 16:43:18.000000 +0900
2.0,BBBB,2019-09-01 16:43:18.000000 +0900
次に1行削除、1行追加して同様にembulk runコマンドを実行します。
delete from emtest where no = 1;
insert into emtest values(3, 'CCCC', current_timestamp);
commit;
embulk runコマンドを実行するとemtest.csvファイルは以下のように上書きされます。
NO,NAME,UPDATE_AT
2.0,BBBB,2019-09-01 16:43:18.000000 +0900
3.0,CCCC,2019-09-01 16:50:33.000000 +0900
増分ローディングを試す
増分ローディングを試すため、以下の4行を設定ファイルに追加しています。
incremental: true
incremental_columns: [update_at, id]
column_options:
no: { value_type: long }
incrementalで増分ローディングを有効にします。
incremental_columnsには増分を判断するカラムを指定します。デフォルトでは主キーが設定されます。
今回は[update_at, id]と設定しており、order by update_at, idの最後のレコードがconfig_diffとして保存され、次回実行時にこの情報を利用して増分が判断されます。
また、増分ローディングで使用するカラムに対しては以下のようにインデックスを作成しておきます。
CREATE INDEX emtest_incremental_loading_index ON emtest(update_at, no);
増分ローディングではembulk runコマンドを実行するときに-cオプションを使用します。
-c config_diff.ymlとすると最後に出力したupdate_at, idがconfig_diff.ymlに保存されます。
# embulk run config_ora_csv_increment.yml -c config_diff.yml
# cat file/emtest.csv
NO,NAME,UPDATE_AT
2,BBBB,2019-09-02 00:14:40.000000 +0900
3,CCCC,2019-09-02 00:14:46.000000 +0900
# cat config_diff.yml
in:
last_record: ['2019-09-01T17:14:46.000000', 3]
out: {}
増分ローディングを試すため、1行追加します。
insert into emtest values(4, 'DDDD', current_timestamp);
commit;
再度、embulk runコマンドを実行すると以下のようにno=4のレコード1行だけが出力されていることが分かります。
# embulk run config_ora_csv_increment.yml -c config_diff.yml
# cat file/emtest.csv
NO,NAME,UPDATE_AT
4,CCCC,2019-09-02 00:16:52.000000 +0900
# cat config_diff.yml
in:
last_record: ['2019-09-01T17:16:52.000000', 4]
out: {}
実行時のログには、以下のように実行したSQLが出力されます。
2019-09-04 11:40:27.062 +0200 [INFO] (0014:task-0000): SQL: SELECT * FROM "EMTEST" WHERE (("UPDATE_AT" > ?) OR ("UPDATE_AT" = ? AND "NO" > ?)) ORDER BY "UPDATE_AT", "NO"