LoginSignup
2
1

More than 3 years have passed since last update.

Oracle input plugin for Embulkの増分ローディングを試す

Last updated at Posted at 2019-09-01

はじめに

Oracle input plugin for Embulkは、EmbulkでOracleをインプットとして扱うためのプラグインです。

Oracle input pluginの増分ローディング(Incremental loading)を試してみます。
増分ローディングにより、前回、Embulk runコマンドでインプットしたときの増分レコードのみがアウトプットの対象になります。

Oracle input plugin for Embulkの使用方法は以下のサイトを参照してください。

環境

使用する環境は以下のとおりです。

  • CentOS 7.5(JDK1.8インストール済み)
  • Oracle 12cR2
  • Embulk 0.9.18

以下で構築した環境を利用しています。

テスト用のテーブルの作成

最初にテスト用のテーブル(emtest)とデータを作成します。

CREATE TABLE emtest
 (
 id NUMBER(3,0),
 name VARCHAR2(50),
 update_at TIMESTAMP(0)
 );

insert into emtest values(1, 'AAAA', current_timestamp);
insert into emtest values(2, 'BBBB', current_timestamp);

commit;

Oracle input pluginを使用する

最初は増分ローディングを使用せずに、対象テーブルの全データをCSVに出力してみます。
作成した設定ファイルは以下のとおりです。

in:
  type: oracle
  driver_path: ./ojdbc7.jar
  host: 192.168.10.232
  user: hr
  password: "hr"
  database: testdb
  table: emtest
  select: "*"

out:
  type: file
  path_prefix: "./file/emtest"
  sequence_format: "."
  file_ext: csv
  formatter:
    type: csv
    delimiter: ","
    newline: CRLF
    newline_in_field: LF
    charset: UTF-8
    quote_policy: MINIMAL
    quote: '"'
    escape: "\\"
    null_string: "\\N"
    default_timezone: 'Asia/Tokyo'

embulk runコマンドを実行すると、emtest.csvファイルに以下のように出力されます。

# embulk run config_ora_csv.yml
# cat file/emtest.csv
NO,NAME,UPDATE_AT
1.0,AAAA,2019-09-01 16:43:18.000000 +0900
2.0,BBBB,2019-09-01 16:43:18.000000 +0900

次に1行削除、1行追加して同様にembulk runコマンドを実行します。

delete from emtest where no = 1;
insert into emtest values(3, 'CCCC', current_timestamp);
commit;

embulk runコマンドを実行するとemtest.csvファイルは以下のように上書きされます。

NO,NAME,UPDATE_AT
2.0,BBBB,2019-09-01 16:43:18.000000 +0900
3.0,CCCC,2019-09-01 16:50:33.000000 +0900

増分ローディングを試す

増分ローディングを試すため、以下の4行を設定ファイルに追加しています。

  incremental: true
  incremental_columns: [update_at, id]
  column_options:
    no: { value_type: long }

incrementalで増分ローディングを有効にします。
incremental_columnsには増分を判断するカラムを指定します。デフォルトでは主キーが設定されます。
今回は[update_at, id]と設定しており、order by update_at, idの最後のレコードがconfig_diffとして保存され、次回実行時にこの情報を利用して増分が判断されます。

また、増分ローディングで使用するカラムに対しては以下のようにインデックスを作成しておきます。

CREATE INDEX emtest_incremental_loading_index ON emtest(update_at, no);

増分ローディングではembulk runコマンドを実行するときに-cオプションを使用します。
-c config_diff.ymlとすると最後に出力したupdate_at, idがconfig_diff.ymlに保存されます。

# embulk run config_ora_csv_increment.yml -c config_diff.yml
# cat file/emtest.csv
NO,NAME,UPDATE_AT
2,BBBB,2019-09-02 00:14:40.000000 +0900
3,CCCC,2019-09-02 00:14:46.000000 +0900
# cat config_diff.yml
in:
  last_record: ['2019-09-01T17:14:46.000000', 3]
out: {}

増分ローディングを試すため、1行追加します。

insert into emtest values(4, 'DDDD', current_timestamp);
commit;

再度、embulk runコマンドを実行すると以下のようにno=4のレコード1行だけが出力されていることが分かります。

# embulk run config_ora_csv_increment.yml -c config_diff.yml
# cat file/emtest.csv
NO,NAME,UPDATE_AT
4,CCCC,2019-09-02 00:16:52.000000 +0900

# cat config_diff.yml
in:
  last_record: ['2019-09-01T17:16:52.000000', 4]
out: {}

実行時のログには、以下のように実行したSQLが出力されます。

2019-09-04 11:40:27.062 +0200 [INFO] (0014:task-0000): SQL: SELECT * FROM "EMTEST" WHERE (("UPDATE_AT" > ?) OR ("UPDATE_AT" = ? AND "NO" > ?)) ORDER BY "UPDATE_AT", "NO"

参考

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1