Fivetranは、様々なデータソース(各種SaaSアプリやDBなど)から、クラウド型DWHを中心としたデータ分析基盤へ、データ同期を自動化させることができるツールです。
最短5分のセットアップでデータパイプラインを構築することができ、それ以降のデータ同期を自動で実行してくれるとのこと。
今回は、このFivetranを使用して(RDS)PostgreSQLから(AWS)Databricksへのパイプラインを構築し、データ同期を行ってみました。
全体の流れ
- 同期先((AWS)Databricks)を登録します
- データソース((RDS)PostgreSQL)のコネクターを作成します
- データの自動同期(増分同期)を確認します ※この記事です
データの自動同期を確認します
ソース側のデータベースに変更を加えて、自動同期(増分同期)される様子を見ていきたいと思います。
検証をスムーズにするため同期間隔をデフォルトの6時間から15分に変更しました。
1. データ追加
追加されたデータは自動同期にて反映されます。
↓ 自動同期(増分同期)
↓ 完了
↓ 結果
データ追加が反映されていることを確認しました。
また、追加分以外のレコードで「_fivetran_synced」カラムが変わっていないことから、増分同期はちゃんと必要な分のみ同期してくれることを確認。
もしこれが毎回全レコード作り直しとかになっていたら、支払料金が変わってきてしまいます。
2. データ更新
データ更新も自動同期で反映されますが、主キー無しテーブルについては以下の問題が発生しました。
更新できないエラー
- レプリカアイデンティティを設定しないとデータの更新ができない
- 主キーありテーブルと異なる形で反映される
↓ まず、主キー無しテーブルに更新をかけようとするとエラーになってしまいました。
↓ 主キーありテーブルは問題なし。
■ 更新できないエラー について
論理レプリケーション設定による影響でした。
https://www.postgresql.jp/document/14/html/logical-replication-publication.html
- パブリケーションに指定したテーブルは、「レプリカアイデンティティ」の設定をしなければ、その後UPDATE、DELETEの操作ができなくなってしまう
- 主キーがあればそれがデフォルトでレプリカアイデンティティになるが、
主キーが無い場合はレプリカアイデンティティが設定されないため、追加でレプリカアイデンティティを設定しないとエラーになってしまう - 追加で設定する設定する際には、下記のいずれかを選択する
- ユニークキー:一意であり、かつ「NOT NULL」が指定されたカラムでなければならない(それがあればそもそも主キーになっていそう…)
- full:行全体をキーとするので、主キー/ユニークキーが無いテーブルでも使用できるが、上記ドキュメントによれば『非常に非効率なので、他の解決方法がない場合のみの代替手段』とのこと
今回は検証なので full の方で追加の設定をすることにしますが、非常に非効率だそうなので、もし現実に主キー無しのテーブルを同期する場合があれば、注意が必要そうです。
改めてデータ更新を行い、無事準備完了しました。
↓ 自動同期後
主キーありテーブルについては想定通り、データ更新が反映されていることを確認しました。
■ 主キーありテーブルと異なる形で反映される について
主キー無しテーブルもは反映されていますが、主キーありテーブルとは異なる形をとっています。
主キー無しテーブルの方は、『古い方のレコードが論理削除+更新後のレコードが新規追加』という形での反映となりました。
やはり、もし現実に主キー無しのテーブルを同期する必要がある場合は注意が必要…
3. データ削除
物理削除ではなく、「_fivetran_deleted」カラムの論理削除で反映されます。
データ更新と同様、主キー無しテーブルではレプリカアイデンティティの設定が無いとDELETEがエラーになりますが、設定してあれば同期自体は問題なく反映されました。
既に レプリカアイデンティティを full で設定しているため、DELETEできています
↓ 自動同期後
「_fivetran_deleted」カラムの論理削除で反映されています。
まず主キーありテーブルについて、想定通りの反映を確認…
主キー無しテーブルについても、同じように反映されることを確認できました。
4. カラム追加
カラム追加はそれのみでは反映されませんが、レコードに変更が入ると一緒に反映されます。
↓ 自動同期後
カラム追加だけでは反映されない。
↓
レコードに変更を入れる
↓ 自動同期後
反映されました(追加したカラムはnullでもOK)
もちろん値が入っていてもOKです。
5. カラム変更
カラム変更はそれのみでは反映されませんが、レコードに変更が入ると一緒に反映されます。
ただし、拡張方向のカラム変更のみ反映対象なので、反映されないパターンもありました。
数値型 → 文字列型 の変更
変更前
文字列型へとカラム変更します。
↓ 自動同期後
これだけでは反映されない。
↓
レコードに変更を入れます。
↓ 自動同期後
数値型 → 文字列型 の変更が反映されました。
文字列型 → 数値型 の変更
変更前
数値型へとカラム変更します。
↓
レコードに変更を入れます。
↓ 自動同期後
文字列型 → 数値型 の変更は同期されません。
6. カラム削除
自動同期では反映されませんが、手動で履歴データの再同期というものを行うと、登録されていたデータがnullに更新されるという形で反映されました。
カラム自体は残ったままとなりました。
↓
レコードにも変更を入れてみます。
↓ 自動同期後
反映されない。
↓
画面から『すべての履歴データを再同期する』を実行
↓
↓
↓
カラム自体は削除されないが、データがnullに更新される
7. テーブルの追加
テーブルを作成してパブリケーションに追加すると、自動同期にて反映されます。
下記のテーブルを新規に作成します
↓ 自動同期後
初期レコードあり/無しパターンどちらも反映されました。
8. テーブルの削除
テーブルの削除は同期されません。
↓ 自動同期後
削除されない。
データもそのまま、selectもエラーにならない。
↓
手動の履歴再同期も実施してみる。
↓
削除されない。
データもそのまま、selectもエラーにならない。
ソース側でオブジェクトを削除しても、宛先側では削除されずに最終更新時の状態のまま残ります。
まとめ・感想
一通り終わりました。
テーブル削除、カラム削除は必要なら宛先側で削除するしかなさそうです。
要注意だと思ったのはレプリカアイデンティティのところ、勉強になりました。
主キー無しテーブルがあって、かつそれを同期対象としたい…という状況があるかは分かりませんが、気を付けなければいけないケースだと思います。
あとはコネクション作成の手順が思ったより大変だった印象です。
社内で話を聞くと、FivetranはSaaSサービスとの接続がより得意なので、SaaSサービスとのコネクション作成ならここまで大変ではないようでした。
機会があればSaaSサービスとの接続も試してみたいです。