22
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

アイスタイルAdvent Calendar 2016

Day 16

Embulkで簡易異機種DBレプリケーション

Last updated at Posted at 2016-12-15

SQL ServerとMySQLの間でテーブル単位でレプリケーションやりたいなぁって事、まれにありますよね?
年の瀬の今、まさにそんな気分です。

なので、Embulkで簡易的な異機種データベースレプリケーションに挑戦してみました。

前提条件

紹介する設定例は以下の条件じゃないと上手く行きません。

  • テーブルのスキーマは同一
  • 論理削除
  • オートインクリメントするサロゲートキーあり(idとかね)
  • 監査列あり(created_at、updated_at、deleted_atなど)

制約があるから簡易なのです。簡易なんてそんなもんです。

まずは一回だけ同期してみる

同じレイアウトのテーブル間の同期なので、テーブル名の指定だけでOKです。
今回使う出力モードはtruncate_insertです。

config.yml

in:
  type: sqlserver
  host: msssql_host
  user: embulk
  password: ''
  database: db1
  table: table1
out:
  type: mysql
  host: mysql_host
  user: embulk
  password: ''
  database: db1
  table: table1
  mode: truncate_insert

実行してみます。

$ embulk run config.yml

上手く行きました!
でも、この設定だと、SQL Server側の日付のミリ秒が落ちてしまいました。

ログを見ていると、一時テーブルを作成して、いったんそこにデータを入れた後に、出力先に入れてるんですが、一時テーブル作成時のデータ型が正しくありません。

出力にcolumn_optionsを指定すればデータ型を指定できるのですが、全カラム設定を書くのは面倒です。
試しに以下のように設定してみました。

config.yml
---
in:
  type: sqlserver
  host: msssql_host
  user: embulk
  password: ''
  database: db1
  table: table1
out:
  type: mysql
  host: mysql_host
  user: embulk
  password: ''
  database: db1
  table: table1
  mode: truncate_insert
  column_options:
   created_at: {type: 'DATETIME(6) NULL'}
   updated_at: {type: 'DATETIME(6) NULL'}
   deleted_at: {type: 'DATETIME(6) NULL'}

これで上手く行きました!
必要なところだけオプションを記述すれば良いところがステキです。

次は変数に挑戦してみる

Embulkの設定は、inとoutのペアを定義する関係上、一つの設定ファイルに一つのペアが書かれます。
でも、これだと、テーブルの数だけ設定が必要になってしまいます。

EmbulkのRUNコマンドに引数でテーブル名を渡したかったんですが、ドキュメントを見ても、現状そのようなオプションは見当たりませんでした。
その代わり環境変数を取得する機能がありました。

config.yml.liquid
---
in: 
  type: sqlserver
  host: msssql_host
  user: embulk
  password: ''
  database: db1
  table: {{ env.embulk_table_name }} # ここで環境変数を取得
out: 
  type: mysql
  host: mysql_host
  user: embulk
  password: ''
  database: db1
  table: {{ env.embulk_table_name }} # ここで環境変数を取得
  mode: truncate_insert
  column_options:
   created_at: {type: 'DATETIME(6) NULL'}
   updated_at: {type: 'DATETIME(6) NULL'}
   deleted_at: {type: 'DATETIME(6) NULL'}

変数を使う場合は、ファイル名の拡張子が.liquidに変わります。

次は実行用のシェルスクリプトを用意します。

run.sh
#!/bin/sh
run() {
  export embulk_table_name=$1 # ここで環境変数を設定
  embulk run config.yml.liquid
}

run 'table1'
run 'table2'

これで上手く行きました!
2つのテーブルの同期を1つの設定で行う事が出来るようになりました。

次は増分レコードの同期に挑戦してみる

同期したいテーブルが複数あっても、1つの設定ファイルを共有できるようになったので、次は、初期同期後、増えたレコードを同期できる様に設定してみます。
今回使う出力モードはinsertです。

config.yml.liquid
in:
  type: sqlserver
  host: msssql_host
  user: embulk
  password: ''
  database: db1
  table: {{ env.embulk_table_name }}
  incremental: true         # ここを追加指定
  incremental_columns: [id] # 自動的に値が増える列を指定
out:
  type: mysql
  host: mysql_host
  user: embulk
  password: ''
  database: db1
  table: {{ env.embulk_table_name }}
  mode: insert # insertに変更
  column_options:
   created_at: {type: 'DATETIME(6) NULL'}
   updated_at: {type: 'DATETIME(6) NULL'}
   deleted_at: {type: 'DATETIME(6) NULL'}
run.sh
run() {
  export embulk_table_name=$1
  embulk run config.yml.liquid -c $1-diff.yml | tee -a run.log
}

: > run.log

run 'table1'
run 'table2'

設定の変更点

in側に、この二行を追加します。

incremental: true
incremental_columns: [id]

out側はmodeを変更します。
truncate_insertだと、毎回out側のデータを消しちゃうのでinsertに変更します。

mode: insert

シェルスクリプトの変更点

embulk run config.yml.liquid -c $1-diff.yml | tee -a run.log

runコマンドに-cオプションを追加します。
このオプションをつけると、読み取った最終行のincremental_columnsの値をファイルに出力するようになります。
実行すると、diff.ymlにはこんな内容が出力されます。

table1-diff.yml
in:
  last_record: [63475]
out: {}

ちなみに、パイプでteeにつないでるのは、単に画面でも見たいしログも残したいからです。

これで上手く行きました!
初回はdiffファイルが存在しないので、全件ロードを行います。
二回目以降はdiffファイルを読み取り、そのincremental_columns値以上のレコードを検索してくれます。

最後に更新差分の同期に挑戦します

変更、削除されたレコードが同期されないと、レプリケーションとは言えません。

でも、更新はともかく、削除されたレコードってどうやって同期するのか?
そうです、ムリです。
なので、前提条件が必要になってきます。

論理削除で監査列ありの場合しか、このレプリケーションはできません。
まあ仕方ないです。簡易なので。

今回使う出力モードはmergeです。

更新、削除の定義を考える

まずは、前回の同期以降に、更新、論理削除(実際のところは、どちらも更新です)されたデータを抽出するクエリを考えます。

select
	*
from
	table1
where
        /* 更新された行 */
	(updated_at > created_at and deleted_flag = 0 and updated_at > '2016-12-01') 
        or 
        /* 削除された行 */
        (deleted_flag = 1 and deleted_at > '2016-12-01')

これで良さそうです。

入力するレコードが異なるため、増分同期と更新差分同期は、同じ設定ファイルに書くことが出来ません。
なので、config.yml.liquidをコピーして、config_merge.yml.liquidを作成します。
シェルスクリプトもrun.shをコピーして、run_merge.shを作成します。

config_merge.yml.liquid
in: 
  type: sqlserver 
  host: msssql_host 
  user: embulk 
  password: '' 
  database: db1 
  table: {{env.embulk_table_name}}
  where: "(updated_at > created_at and deleted_flag = 0 and updated_at > '{{env.embulk_last_date}}') 
       or (deleted_flag = 1 and deleted_at > '{{env.embulk_last_date}}')"
out: 
  type: mysql 
  host: mysql_host
  user: embulk 
  password: ''
  database: db1
  table: {{env.embulk_table_name}}
  mode: merge
  column_options:
   created_at: {type: 'DATETIME(6) NULL'}
   updated_at: {type: 'DATETIME(6) NULL'}
   deleted_at: {type: 'DATETIME(6) NULL'}
run_merge.sh
#!/bin/sh
pre() {

  if [ -e embulk_last_date.txt ]; then
     LAST_DATE=`cat embulk_last_date.txt`
  else
     LAST_DATE=` date '+%Y-%m-%d'`
  fi

  export embulk_last_date=$LAST_DATE
}

run() {
  export embulk_table_name=$1
  embulk run config_merge.yml.liquid | tee -a run_merge.log
}

: > run_merge.log

pre

run 'table1'
run 'table2'

date '+%Y-%m-%d' > embulk_last_date.txt

設定の変更点

in側には、先ほど考えた抽出条件をwhereに指定します。
ただし、日付は可変にしたいので、変数にしておきます。

where: "(updated_at > created_at and deleted_flag = 0 and updated_at > '{{env.embulk_last_date}}') 
     or (deleted_flag = 1 and deleted_at > '{{env.embulk_last_date}}')"

out側はmodemergeに変更します。

mode: merge

シェルスクリプトの変更点

whereには実行日付を渡さなければなりません。

  • 初回は今日の日付を渡す
  • 二回目以降は前回実行した日付を渡す

こんな感じでどうでしょう?

では、実行時の日付を取得して、環境変数に設定する関数を追加します。

pre() {

  if [ -e embulk_last_date.txt ]; then
     LAST_DATE=`cat embulk_last_date.txt`
  else
     LAST_DATE=` date '+%Y-%m-%d'`
  fi

  export embulk_last_date=$LAST_DATE
}

最終行に実行日付をファイルに保存する処理を追加します。

date '+%Y-%m-%d' > embulk_last_date.txt

これでひとまず完成です!

in側で実行されるSELECT文には、WHEREに指定した条件が付けられて実行されます。
out側では、INSERT INTO ~ ON DUPLICATE KEY UPDATEが実行されるため、無ければINSERT、あればUPDATEと言う動作になります。

残作業

エラー処理、ログのローテーション、定期実行の方法を考えれば、簡易的なレプリケーションが実現できそうです。

まとめ

異機種データベース同期ツールは高価な商用プロダクトがいくつか存在します。
安定性は高いだろうし、削除も同期できるでしょう。
でも、Embulkとシェルスクリプトの組合せで、簡易的ではありますが、近いことが実現できそうです。

条件さえ合えば、これで十分な場合もあるでしょう。

明日はいよいよ@yamamotojuntaの登場です!
今年も本当にアイカツ!なのか?

22
12
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
22
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?