この記事では、Airbyte の Change Data Capture(CDC)の機能を用い、MySQL から PostgreSQL にデータをレプリケーションしてみます。また、裏で DB に流れているコマンドについても簡単に確認してみます。
1. はじめに
ある DB(ソース DB)から別の DB(ターゲットDB)にデータを継続的に移動するというのは様々なユースケースで必要とされ、現時点では ETL 処理などのバッチ処理で実装されることが多いです。ただし、バッチ処理に関しては、以下のような問題もあります。
- 対象テーブル数が多い、処理内容が複雑などの理由で、開発に時間と手間が掛かる。
- 例えば ETL 処理を1日に1回実行する場合、ターゲット DB への最新データの反映は最大1日遅れる。
バッチ処理以外のデータ移動のアプローチとして Change Data Capture(CDC)があります。ソース DB のデータ変更内容を抽出し、ターゲットDBにその変更内容を反映させるという処理を自動的に行うことで、ソース DB 上のデータと同じ内容をターゲット DB 上に容易に実現する手法・ツールになります。
Change Data Capture の定義には本来、ターゲット DB にデータ変更内容を反映させるところまでは含まれず、変更内容に基づいて何かしらの処理を実行することを指しますが、本記事ではターゲット DB にデータを反映させるところまでを行うレプリケーションツールとして扱います。
レプリケーションは MySQL や PostgreSQL が標準で対応している機能ですが、これらは基本同じ種類の DBMS 間のデータ移動になります。CDC の場合はソース DB とターゲット DB が異なる種類の DBMS である場合にも対応しています。
Change Data Capture を実現するツールは昔から商用ツールを中心にいくつかあったのですが、昨今はオープンソースのツールでもこの機能を持つものが増えてきました。本記事ではそれらの1つである Airbyte を使って CDC をセットアップし、ソース DB(MySQL)からターゲット DB(PostgreSQL)への変更データの反映の結果を確認した結果を共有したいと思います。
また、主にターゲット DB 側で変更データ反映を実行しているクエリについても確認してみます。
2. セットアップ
今回は以下の環境でセットアップを行いました。
- 実行サーバー:EC2 インスタンス(Amazon Linux 2, m6i.large)
- Airbyte Open Source
- ソース DB:MySQL 8.0.29
- ターゲット DB:PostgreSQL 14.4
2-1. Airbyte のインストール
Docker Compose を利用します。以下の通り実施しています。Airbyte 自身の可用性とかを気にしなければ、コマンド3発なので簡単ですね。
2-2. MySQL のセットアップ
Airbyte から MySQL に接続する DB ユーザー(source_user)と、今回対象とするテーブルを含む DB(source_db)を作成します(rootユーザーで実施)。
-- Airbyte用のDBユーザー作成
create user source_user identified by '************';
-- DBを作成し、上で作成したユーザーに全権限を付与
create database source_db;
grant all privileges on source_db.* to source_user;
-- Airbyteはバイナリログから変更データを抽出するため、以下の権限付与が必要
grant reload, show databases, replication slave, replication client on *.* to source_user;
-- Airbyteから実行される処理を確認するために一般ログを有効化
set global general_log = on;
その後、source_user で source_db に接続し、簡単なテーブル(test_table)を作成しておきます。ちなみに、Airbyte の CDC は主キーが存在するテーブルのレプリケーションしかサポートしていません。
create table test_table (
id int primary key,
name varchar(10),
flag char(1)
);
以上で MySQL のセットアップは完了です(本来はバイナリログを有効にする必要がありますが、公式 RPM パッケージからインストールした場合はデフォルトで有効になっています)。
2-3. PostgreSQL のセットアップ
Airbyte から PostgreSQL に接続するDBユーザー(target_user)と、今回対象とするテーブルを含む DB(target_db)を作成します(postgresユーザーで実施)。
-- DBユーザーの作成
create role target_user with login password '************';
-- DBを作成し、上で作成したDBユーザーに全権限を付与
create database target_db;
grant all privileges on database target_db to target_user;
その他、AirByte から実行されるコマンドを記録するために postgesql.conf に log_statement を all に設定しています。
以上で PostgreSQL のセットアップは完了です。
2-4. Airbyte の CDC セットアップ
以下の流れでセットアップします。
- Source の設定
- Destination の設定
- Connection の設定
2-4-1. Source の設定
まずはソース DB を Airbyte に登録します。
Airbyte の管理画面(http://localhost:8000)を開き、左側メニューの [Sources] を選択後、画面右上の [New Source] を押します。その後、以下のように入力します。入力後に画面一番下の [Set up source] を押せば、接続テストが実行され、テストが通ればソースDBが登録されます。
項目 | 入力値 | 説明 |
---|---|---|
Source Type | MySQL | ソース DB の DBMS を指定する。 |
Name | soruce_db (MySQL) | 作成する Source の名前。分かり易いものをつけておく。 |
Host | 172.16.1.226 | ソース DB のホスト名 or IP アドレスを指定する。(左の値は EC2 インスタンスの IP アドレス) |
Port | 3306 | ソース DB のポート番号を指定する。(今回はデフォルトポート番号を利用) |
Database | source_db | 2-2.で作成した DB を指定する。 |
Username | source_user | 2-2.で作成した DB ユーザー名を指定する。 |
Password | ******* | 2-2.で作成した DB ユーザーのパスワードを指定する。 |
Replication Method | CDC | CDC をするためにはこの設定が必要。 |
2-4-2. Destination の設定
次にターゲット DB を Airbyte に登録します。(Airbyte ではターゲットを Destinationと言います)
Airbyte の管理画面の左側メニューから [Destinations] を選択後、画面右上の [New destination] を押します。その後、以下のように入力します。入力後に画面一番下の [Set up destination] を押せば、接続テストが実行され、テストが通ればターゲット DB が登録されます。
項目 | 入力値 | 説明 |
---|---|---|
Destination Type | Postgres | ターゲット DB の DBMS を指定する。 |
Name | target_db (PostgreSQL) | 作成する Destination の名前。分かり易いものをつけておく。 |
Host | 172.16.1.226 | ターゲットDB のホスト名 or IP アドレスを指定する。(左の値は EC2 インスタンスの IP アドレス) |
Port | 5432 | ターゲット DB のポート番号を指定する。(今回はデフォルトポート番号を利用) |
DB Name | target_db | 2-3.で作成した DB を指定する。 |
Default Schema | public | 指定必須だが、今回は利用しない。 |
User | target_user | 2-3.で作成した DB ユーザー名を指定する。 |
Password | ******* | 2-3.で作成した DB ユーザーのパスワードを指定する。 |
2-4-3. Connection の設定
CDC の設定を行います。Airbyte ではデータ移動の処理のことを Connection と呼びます。
Airbyte の管理画面の左側メニューから [Connections] を選択後、画面右上の [New Connection] を押します。その後、[Select an existing source] において 2-4-1. で作成した Source を選択し、[Use existing source] を押します。
次の画面では [Select an existing destination] において 2-4-2. で作成した Destination を選択し、[Use existing destination] を押します。
次の画面では以下の入力をして、[Set up connection] を押します。(デフォルトで選択されていると思いますが、対象テーブルのチェックがオンになっていること、[Sync mode] が [Incremental Deduped + History] になっていることを確認しておいた方が良いかもしれません)
項目 | 入力値 | 説明 |
---|---|---|
Name | source_db to target_db | Connection の名前。分かり易い名前を指定する。 |
Transfer - Replication frequency | Every 5 minutes | 今回は頻度高くデータ反映するので、GUI 上で指定できる最短の時間間隔を指定 |
以上で Connection の設定は終了し、Change Data Capture が動き始めます。設定はとても簡単ですね。
3. 変更データの反映の確認
Airbyte の CDC ではターゲット DB 側にソース DB の DB 名(今回は source_db)と同じスキーマが自動で作成され、そのスキーマの中に対象テーブルと同じ名前のテーブル(今回は test_table)が作成され、そちらに変更データが反映されます。また、変更の履歴は <テーブル名>_scd という名前のテーブル(今回は test_table_scd)に保存されます。ちなみに、scd は Slowly Chanaging Demension の略です。
以下の操作をソース DB 側で実施し、ターゲット DB にどう反映されるか確認してみます。
- レコードの INSERT
- 主キー指定によるレコードの UPDATE
- 主キー指定によるレコードの DELETE
それ以外にも、主キー列以外を WHERE に指定した UPDATE/DELETE 文を試していますが、挙動は想像通りなので割愛します。
3-1. レコードのINSERT
ソース DB(MySQL)側で以下の INSERT 文を実行します。
insert into test_table values
(1, 'AAAAA', 'Y'),
(2, 'BBBBB', 'Y');
しばらくすると、ターゲット DB 側の test_table / test_table_scd で INSERT したデータが確認できます。(id name flag のカラム順番が変わっているのは気になりますが)
target_db=> select * from source_db.test_table;
_airbyte_unique_key | id | flag | name | _ab_cdc_log_pos | _ab_cdc_log_file | _ab_cdc_deleted_at | _ab_cdc_updated_at | _airbyte_ab_id | _airbyte_
emitted_at | _airbyte_normalized_at | _airbyte_test_table_hashid
----------------------------------+----+------+-------+-----------------+------------------+--------------------+----------------------+--------------------------------------+-------------
---------------+-------------------------------+----------------------------------
c4ca4238a0b923820dcc509a6f75849b | 1 | Y | AAAAA | 1872 | binlog.000005 | | 2022-07-17T08:36:32Z | bdf2334f-a58c-46ef-9868-2791c39d32a0 | 2022-07-17 0
8:34:12.415+00 | 2022-07-17 08:36:40.787686+00 | 68a503bb2f7a322abf64b118e81bf581
c81e728d9d4c2f636f067f89cc14862c | 2 | Y | BBBBB | 1872 | binlog.000005 | | 2022-07-17T08:36:32Z | aacca064-a711-4368-bc16-1649b861a388 | 2022-07-17 0
8:34:12.415+00 | 2022-07-17 08:36:40.787686+00 | 07e5d2eeb4f4a6c0a78536fe0a5a4689
(2 rows)
target_db=> select * from source_db.test_table_scd
target_db-> ;
_airbyte_unique_key | _airbyte_unique_key_scd | id | flag | name | _ab_cdc_log_pos | _ab_cdc_log_file | _ab_cdc_deleted_at | _ab_cdc_updated_at | _airbyte_start_
at | _airbyte_end_at | _airbyte_active_row | _airbyte_ab_id | _airbyte_emitted_at | _airbyte_normalized_at | _airbyte_test_table_hashid
----------------------------------+----------------------------------+----+------+-------+-----------------+------------------+--------------------+----------------------+-----------------
-----+-----------------+---------------------+--------------------------------------+----------------------------+-------------------------------+----------------------------------
c4ca4238a0b923820dcc509a6f75849b | 4ebb0ca68102de10f265559739637a66 | 1 | Y | AAAAA | 1872 | binlog.000005 | | 2022-07-17T08:36:32Z | 2022-07-17T08:36
:32Z | | 1 | bdf2334f-a58c-46ef-9868-2791c39d32a0 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:36:40.737628+00 | 68a503bb2f7a322abf64b118e81bf581
c81e728d9d4c2f636f067f89cc14862c | ab25f5fe8d4608d536f04553d6b2102d | 2 | Y | BBBBB | 1872 | binlog.000005 | | 2022-07-17T08:36:32Z | 2022-07-17T08:36
:32Z | | 1 | aacca064-a711-4368-bc16-1649b861a388 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:36:40.737628+00 | 07e5d2eeb4f4a6c0a78536fe0a5a4689
(2 rows)
Airbyte の管理画面からは、2行ソース DB から emitted され(抽出され)、ターゲット DB に2行 committed されたことが分かります。(ちなみに、2 m 38 s 時間が掛っているように見えるのは、Sync の処理自体はソースでの INSERT 文実行の前から開始されているため、少し長めに出ています)
3-2. 主キー指定によるレコードの UPDATE
次に、ソース DB(MySQL)側で以下の UPDATE 文を実行します。
update test_table set name = 'bbbbb' where id = 2;
しばらくすると、ターゲット DB 側にもデータが反映されます。test_table テーブルは実際に値が更新されていますが、test_table_scd テーブルは変更データが追記されています。古いデータ(name = 'BBBBB' のレコード)は _airbyte_active_row が 0 になっています。
一見想定通りな気がしますが、
- test_table テーブルの id = 1 のレコード(未更新レコード)の _airbyte_normalized_at が更新されている。
- test_table_scd テーブルの id = 1 のレコードの _airbyte_normalized_at も更新されている。
というのは気になりますので、後でも少し触れます。
target_db=> select * from source_db.test_table;
_airbyte_unique_key | id | flag | name | _ab_cdc_log_pos | _ab_cdc_log_file | _ab_cdc_deleted_at | _ab_cdc_updated_at | _airbyte_ab_id | _airbyte_
emitted_at | _airbyte_normalized_at | _airbyte_test_table_hashid
----------------------------------+----+------+-------+-----------------+------------------+--------------------+----------------------+--------------------------------------+-------------
---------------+-------------------------------+----------------------------------
c4ca4238a0b923820dcc509a6f75849b | 1 | Y | AAAAA | 1872 | binlog.000005 | | 2022-07-17T08:36:32Z | bdf2334f-a58c-46ef-9868-2791c39d32a0 | 2022-07-17 0
8:34:12.415+00 | 2022-07-17 08:43:25.959628+00 | 68a503bb2f7a322abf64b118e81bf581
c81e728d9d4c2f636f067f89cc14862c | 2 | Y | bbbbb | 2204 | binlog.000005 | | 2022-07-17T08:43:18Z | 828d0f8d-5cd9-4627-8542-38bfc5848118 | 2022-07-17 0
8:39:11.725+00 | 2022-07-17 08:43:25.959628+00 | 9758d4a2d9c58ad80bcd0f88e1e1e195
(2 rows)
target_db=> select * from source_db.test_table_scd;
_airbyte_unique_key | _airbyte_unique_key_scd | id | flag | name | _ab_cdc_log_pos | _ab_cdc_log_file | _ab_cdc_deleted_at | _ab_cdc_updated_at | _airbyte_start_
at | _airbyte_end_at | _airbyte_active_row | _airbyte_ab_id | _airbyte_emitted_at | _airbyte_normalized_at | _airbyte_test_table_hashid
----------------------------------+----------------------------------+----+------+-------+-----------------+------------------+--------------------+----------------------+-----------------
-----+----------------------+---------------------+--------------------------------------+----------------------------+-------------------------------+----------------------------------
c4ca4238a0b923820dcc509a6f75849b | 4ebb0ca68102de10f265559739637a66 | 1 | Y | AAAAA | 1872 | binlog.000005 | | 2022-07-17T08:36:32Z | 2022-07-17T08:36
:32Z | | 1 | bdf2334f-a58c-46ef-9868-2791c39d32a0 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:43:25.798248+00 | 68a503bb2f7a322abf64b118e81bf581
c81e728d9d4c2f636f067f89cc14862c | ab25f5fe8d4608d536f04553d6b2102d | 2 | Y | BBBBB | 1872 | binlog.000005 | | 2022-07-17T08:36:32Z | 2022-07-17T08:36
:32Z | 2022-07-17T08:43:18Z | 0 | aacca064-a711-4368-bc16-1649b861a388 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:43:25.798248+00 | 07e5d2eeb4f4a6c0a78536fe0a5a4689
c81e728d9d4c2f636f067f89cc14862c | 33d0bfeebc74af1f66c53a0b2651d347 | 2 | Y | bbbbb | 2204 | binlog.000005 | | 2022-07-17T08:43:18Z | 2022-07-17T08:43
:18Z | | 1 | 828d0f8d-5cd9-4627-8542-38bfc5848118 | 2022-07-17 08:39:11.725+00 | 2022-07-17 08:43:25.798248+00 | 9758d4a2d9c58ad80bcd0f88e1e1e195
(3 rows)
Airbyte の管理画面からは、ソース DB から1行変更が抽出され、ターゲット DB に1行反映されたことが分かります。(4 m 23 s 掛かっているように見えるのは、3-1. と同じ理由です)
3-3. 主キー指定によるレコードの DELETE
次に、ソース DB(MySQL)側で以下の DELETE 文を実行します。
delete from test_table where id = 1;
しばらくすると、ターゲット DB 側にもデータが反映されます。test_table テーブルは該当レコードが削除されていますが、test_table_scd テーブルは削除情報が一番下に追記され、2行目の削除前のデータは _airbyte_active_row が 0 に更新されています。(すいません、test_table のデータを取り損ねたので、以下は test_table_scd のみです)
target_db=> select * from source_db.test_table_scd order by _airbyte_emitted_at;
_airbyte_unique_key | _airbyte_unique_key_scd | id | flag | name | _ab_cdc_log_pos | _ab_cdc_log_file | _ab_cdc_deleted_at | _ab_cdc_updated_at | _airbyte_star
t_at | _airbyte_end_at | _airbyte_active_row | _airbyte_ab_id | _airbyte_emitted_at | _airbyte_normalized_at | _airbyte_test_table_hashid
----------------------------------+----------------------------------+----+------+-------+-----------------+------------------+----------------------+----------------------+---------------
-------+----------------------+---------------------+--------------------------------------+----------------------------+-------------------------------+----------------------------------
c81e728d9d4c2f636f067f89cc14862c | ab25f5fe8d4608d536f04553d6b2102d | 2 | Y | BBBBB | 1872 | binlog.000005 | | 2022-07-17T08:36:32Z | 2022-07-17T08:
36:32Z | 2022-07-17T08:43:18Z | 0 | aacca064-a711-4368-bc16-1649b861a388 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:43:25.798248+00 | 07e5d2eeb4f4a6c0a78536fe0a5a4689
c4ca4238a0b923820dcc509a6f75849b | 4ebb0ca68102de10f265559739637a66 | 1 | Y | AAAAA | 1872 | binlog.000005 | | 2022-07-17T08:36:32Z | 2022-07-17T08:
36:32Z | 2022-07-17T08:45:25Z | 0 | bdf2334f-a58c-46ef-9868-2791c39d32a0 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:45:33.593568+00 | 68a503bb2f7a322abf64b118e81bf581
c81e728d9d4c2f636f067f89cc14862c | 33d0bfeebc74af1f66c53a0b2651d347 | 2 | Y | bbbbb | 2204 | binlog.000005 | | 2022-07-17T08:43:18Z | 2022-07-17T08:
43:18Z | | 1 | 828d0f8d-5cd9-4627-8542-38bfc5848118 | 2022-07-17 08:39:11.725+00 | 2022-07-17 08:45:33.593568+00 | 9758d4a2d9c58ad80bcd0f88e1e1e195
c4ca4238a0b923820dcc509a6f75849b | 5eafa570326946a66b534b916815a9db | 1 | Y | AAAAA | 2528 | binlog.000005 | 2022-07-17T08:45:25Z | 2022-07-17T08:45:25Z | 2022-07-17T08:
45:25Z | | 0 | 9c4983a7-a0f9-4947-bd7a-ee718565be5f | 2022-07-17 08:44:12.231+00 | 2022-07-17 08:45:33.593568+00 | 07bd0b71b94017f4bc27e04cd560f218
(4 rows)
4. Airbyte から実行されるコマンド
4-1. ターゲット DB(PostgreSQL)に実行されるコマンド
直前の 3. で示した通り Airbyte の CDC は test_table と test_table_scd のデータを作ることが最終ゴールなのですが、その過程で以下の順で中間テーブルが生成されます。テーブルの名前を見ると分かりますが、内部では dbt を使っているようですね。
- _airbyte_tmp_<ランダム3文字>_test_table
- _airbyte_raw_test_table
- test_table_stg__dbt_tmp<ランダム12桁数字>
- test_table_stg
- test_table_scd__dbt_tmp<ランダム12桁数字>
- test_table_scd
- test_table__dbt_tmp<ランダム12桁数字>
- test_table
順にデータを生成しているコマンドを PostgreSQL のクエリログから抽出すると以下のようになります(メタデータ取得のクエリは省略しています)。
_airbyte_tmp_<ランダム3文字>_test_table
こちらは、変更差分データ(JSON形式)が単純にロードされるテーブルです。(JSONデータを大量に RDBMS に入れるのは若干気になると言えば気になるのですが。できれば Airbyte 側で JSON ⇒ 表形式に直してほしいなぁ。)
CREATE TABLE IF NOT EXISTS source_db._airbyte_tmp_pjk_test_table (
_airbyte_ab_id VARCHAR PRIMARY KEY,
_airbyte_data JSONB,
_airbyte_emitted_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
COPY source_db._airbyte_tmp_pjk_test_table FROM stdin DELIMITER ',' CSV;
_airbyte_raw_test_table
ロードしたデータを INSERT .. SELECT .. で移し替えているようですね。_airbyte_tmp_<ランダム3文字>_test_table はこの処理が終わるとテーブル削除されますが、こちらのテーブルはテーブル削除/データ削除はなく、ずっと蓄積されるデータのようです。(別にワンクッション挟まなくても良くないか?とも思わなくはないですが)
CREATE TABLE IF NOT EXISTS source_db._airbyte_raw_test_table (
_airbyte_ab_id VARCHAR PRIMARY KEY,
_airbyte_data JSONB,
_airbyte_emitted_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO source_db._airbyte_raw_test_table SELECT * FROM source_db._airbyte_tmp_pjk_test_table;
test_table_stg__dbt_tmp<ランダム12桁数字>
こちらのテーブルの処理は長いですが、やっていることは単純に以下です。
- JSONデータを表形式に変換
- データ型を変換
- 主キーの値から md5 チェックサムを計算してサロゲートキーを生成(_airbyte_test_table_hashid)
- test_table_stg に取り込み済みのデータより古いデータは除去
最後の条件が >=
になっているため、1回前の差分データも抽出しているような気がするのですが、何故こういう実装になってるんですかね?(これが未更新レコードの _airbyte_normalized_at も更新されていた理由だと思います)
create temporary table "test_table_stg__dbt_tmp084325661583"
as (
with __dbt__cte__test_table_ab1 as (
-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
-- depends_on: "target_db".source_db._airbyte_raw_test_table
select
jsonb_extract_path_text(_airbyte_data, 'id') as "id",
jsonb_extract_path_text(_airbyte_data, 'flag') as "flag",
jsonb_extract_path_text(_airbyte_data, 'name') as "name",
jsonb_extract_path_text(_airbyte_data, '_ab_cdc_log_pos') as _ab_cdc_log_pos,
jsonb_extract_path_text(_airbyte_data, '_ab_cdc_log_file') as _ab_cdc_log_file,
jsonb_extract_path_text(_airbyte_data, '_ab_cdc_deleted_at') as _ab_cdc_deleted_at,
jsonb_extract_path_text(_airbyte_data, '_ab_cdc_updated_at') as _ab_cdc_updated_at,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at
from "target_db".source_db._airbyte_raw_test_table as table_alias
-- test_table
where 1 = 1
), __dbt__cte__test_table_ab2 as (
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
-- depends_on: __dbt__cte__test_table_ab1
select
cast("id" as
float
) as "id",
cast("flag" as text) as "flag",
cast("name" as text) as "name",
cast(_ab_cdc_log_pos as
float
) as _ab_cdc_log_pos,
cast(_ab_cdc_log_file as text) as _ab_cdc_log_file,
cast(_ab_cdc_deleted_at as text) as _ab_cdc_deleted_at,
cast(_ab_cdc_updated_at as text) as _ab_cdc_updated_at,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at
from __dbt__cte__test_table_ab1
-- test_table
where 1 = 1
)-- SQL model to build a hash column based on the values of this record
-- depends_on: __dbt__cte__test_table_ab2
select
md5(cast(coalesce(cast("id" as text), '') || '-' || coalesce(cast("flag" as text), '') || '-' || coalesce(cast("name" as text), '') || '-' || coalesce(cast(_ab_cdc_log_pos as text), '') || '-' || coalesce(cast(_ab_cdc_log_file as text), '') || '-' || coalesce(cast(_ab_cdc_deleted_at as text), '') || '-' || coalesce(cast(_ab_cdc_updated_at as text), '') as text)) as _airbyte_test_table_hashid,
tmp.*
from __dbt__cte__test_table_ab2 tmp
-- test_table
where 1 = 1
and coalesce(
cast(_airbyte_emitted_at as
timestamp with time zone
) >= (select max(cast(_airbyte_emitted_at as
timestamp with time zone
)) from "target_db"._airbyte_source_db."test_table_stg"),
true)
);
test_table_stg
先ほどのテーブルから _airbyte_ab_id をキーにマージを実施しています。(_airbyte_ab_id は差分変更データ1行ごとに振られる ID だと思ってください)
delete from "target_db"._airbyte_source_db."test_table_stg"
where (_airbyte_ab_id) in (
select (_airbyte_ab_id)
from "test_table_stg__dbt_tmp084325661583"
);
insert into "target_db"._airbyte_source_db."test_table_stg" ("_airbyte_test_table_hashid", "id", "flag", "name", "_ab_cdc_log_pos", "_ab_cdc_log_file", "_ab_cdc_deleted_at", "_ab_cdc_updated_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at")
(
select "_airbyte_test_table_hashid", "id", "flag", "name", "_ab_cdc_log_pos", "_ab_cdc_log_file", "_ab_cdc_deleted_at", "_ab_cdc_updated_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at"
from "test_table_stg__dbt_tmp084325661583"
)
test_table_scd__dbt_tmp<ランダム12桁数字>
このテーブルの作成処理を説明するのは大変なので省略しますが、いわゆる Slowly Changing Demension Type 2 の処理になります。
create temporary table "test_table_scd__dbt_tmp084325868950"
as (
-- depends_on: ref('test_table_stg')
with
new_data as (
-- retrieve incremental "new" data
select
*
from "target_db"._airbyte_source_db."test_table_stg"
-- test_table from "target_db".source_db._airbyte_raw_test_table
where 1 = 1
and coalesce(
cast(_airbyte_emitted_at as
timestamp with time zone
) >= (select max(cast(_airbyte_emitted_at as
timestamp with time zone
)) from "target_db".source_db."test_table_scd"),
true)
),
new_data_ids as (
-- build a subset of _airbyte_unique_key from rows that are new
select distinct
md5(cast(coalesce(cast("id" as text), '') as text)) as _airbyte_unique_key
from new_data
),
empty_new_data as (
-- build an empty table to only keep the table's column types
select * from new_data where 1 = 0
),
previous_active_scd_data as (
-- retrieve "incomplete old" data that needs to be updated with an end date because of new changes
select
this_data."_airbyte_test_table_hashid",
this_data."id",
this_data."flag",
this_data."name",
this_data."_ab_cdc_log_pos",
this_data."_ab_cdc_log_file",
this_data."_ab_cdc_deleted_at",
this_data."_ab_cdc_updated_at",
this_data."_airbyte_ab_id",
this_data."_airbyte_emitted_at",
this_data."_airbyte_normalized_at"
from "target_db".source_db."test_table_scd" as this_data
-- make a join with new_data using primary key to filter active data that need to be updated only
join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key
-- force left join to NULL values (we just need to transfer column types only for the star_intersect macro on schema changes)
left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id
where _airbyte_active_row = 1
),
input_data as (
select "_airbyte_test_table_hashid",
"id",
"flag",
"name",
"_ab_cdc_log_pos",
"_ab_cdc_log_file",
"_ab_cdc_deleted_at",
"_ab_cdc_updated_at",
"_airbyte_ab_id",
"_airbyte_emitted_at",
"_airbyte_normalized_at" from new_data
union all
select "_airbyte_test_table_hashid",
"id",
"flag",
"name",
"_ab_cdc_log_pos",
"_ab_cdc_log_file",
"_ab_cdc_deleted_at",
"_ab_cdc_updated_at",
"_airbyte_ab_id",
"_airbyte_emitted_at",
"_airbyte_normalized_at" from previous_active_scd_data
),
scd_data as (
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
select
md5(cast(coalesce(cast("id" as text), '') as text)) as _airbyte_unique_key,
"id",
"flag",
"name",
_ab_cdc_log_pos,
_ab_cdc_log_file,
_ab_cdc_deleted_at,
_ab_cdc_updated_at,
_ab_cdc_updated_at as _airbyte_start_at,
lag(_ab_cdc_updated_at) over (
partition by cast("id" as text)
order by
_ab_cdc_updated_at is null asc,
_ab_cdc_updated_at desc,
_ab_cdc_updated_at desc,
_ab_cdc_log_pos desc,
_airbyte_emitted_at desc
) as _airbyte_end_at,
case when row_number() over (
partition by cast("id" as text)
order by
_ab_cdc_updated_at is null asc,
_ab_cdc_updated_at desc,
_ab_cdc_updated_at desc,
_ab_cdc_log_pos desc,
_airbyte_emitted_at desc
) = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
_airbyte_test_table_hashid
from input_data
),
dedup_data as (
select
-- we need to ensure de-duplicated rows for merge/update queries
-- additionally, we generate a unique key for the scd table
row_number() over (
partition by
_airbyte_unique_key,
_airbyte_start_at,
_airbyte_emitted_at, cast(_ab_cdc_deleted_at as text), cast(_ab_cdc_updated_at as text), cast(_ab_cdc_log_pos as text)
order by _airbyte_active_row desc, _airbyte_ab_id
) as _airbyte_row_num,
md5(cast(coalesce(cast(_airbyte_unique_key as text), '') || '-' || coalesce(cast(_airbyte_start_at as text), '') || '-' || coalesce(cast(_airbyte_emitted_at as text), '') || '-' || coalesce(cast(_ab_cdc_deleted_at as text), '') || '-' || coalesce(cast(_ab_cdc_updated_at as text), '') || '-' || coalesce(cast(_ab_cdc_log_pos as text), '') as text)) as _airbyte_unique_key_scd,
scd_data.*
from scd_data
)
select
_airbyte_unique_key,
_airbyte_unique_key_scd,
"id",
"flag",
"name",
_ab_cdc_log_pos,
_ab_cdc_log_file,
_ab_cdc_deleted_at,
_ab_cdc_updated_at,
_airbyte_start_at,
_airbyte_end_at,
_airbyte_active_row,
_airbyte_ab_id,
_airbyte_emitted_at,
now() as _airbyte_normalized_at,
_airbyte_test_table_hashid
from dedup_data where _airbyte_row_num = 1
);
test_table_scd
こちらは _airbyte_unique_key_scd をキーにした単純なデータマージです。
delete from "target_db".source_db."test_table_scd"
where (_airbyte_unique_key_scd) in (
select (_airbyte_unique_key_scd)
from "test_table_scd__dbt_tmp084325868950"
);
insert into "target_db".source_db."test_table_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "flag", "name", "_ab_cdc_log_pos", "_ab_cdc_log_file", "_ab_cdc_deleted_at", "_ab_cdc_updated_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_test_table_hashid")
(
select "_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "flag", "name", "_ab_cdc_log_pos", "_ab_cdc_log_file", "_ab_cdc_deleted_at", "_ab_cdc_updated_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_test_table_hashid"
from "test_table_scd__dbt_tmp084325868950"
)
test_table__dbt_tmp<ランダム12桁数字>
(省略)
test_table
(省略)
4-2. ソース DB(MySQL)に実行されるコマンド
こちらは簡単にですが、MySQL の一般ログを確認すると、Airbyte からバイナリログのダンプを実行しているようです。
2022-07-17T08:39:13.872714Z 25 Binlog Dump Log: 'binlog.000005' Pos: 1720
ちなみに、ターゲット DB が停止している場合はバイナリログの取得はスキップし、ターゲット DB が復活してから過去分含めての変更データを取得するようです。(Airbyte の内部に変更データを蓄積しておいて、ターゲット DB が復帰したのちに適用するというわけではなさそう)。
5. まとめ
本記事は Airbyte で MySQL から PostgreSQL への Change Data Capture を試してみて、背後で実行されるコマンドも確認してみました。
感想ですが、
- 動かすだけなら GUI でパパっと設定できるので簡単。
- カラムの順番が変わるのは気に入らない。(id name flag ⇒ id flag name)
- データ適用は中間テーブルを挟みながらも複数行の変更データを一括で適用しており、処理方式としては Qlik Replicate に近いかな(MySQL の row mode レプリケーションや Oracle GoldenGateとは違う)。大量の変更データの適用には有利っぽいけど、ラグ(適用遅延)が気になるかも(今回だと最小でも1分ぐらいは掛かっている)。
- _airbyte_normalized_at の更新範囲が広い。かつ内部では UPDATE ではなく DELETE/INSERT なので、性能や容量管理には注意する必要がありそう(VACUUM のある PostgreSQL やタイムトラベルを持つ Snowflake などは特に)。
- ターゲット DB が停止している間はソース DB から変更データを抽出しないので、ターゲット DB の長期計画停止などの場合には、ソース側でバイナリログの保存期間など気にする必要がありそう。(逆に Airbyte で変更データをため込まないならそれはそれで管理しやすいですが)
本番利用するにはまだまだ検証が必要ですが、OSS で Chanage Data Capture が簡単にできるようになるというのは、今後に期待したいと思います。