5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AirbyteのChange Data Captureを試す

Posted at

この記事では、Airbyte の Change Data Capture(CDC)の機能を用い、MySQL から PostgreSQL にデータをレプリケーションしてみます。また、裏で DB に流れているコマンドについても簡単に確認してみます。

1. はじめに

ある DB(ソース DB)から別の DB(ターゲットDB)にデータを継続的に移動するというのは様々なユースケースで必要とされ、現時点では ETL 処理などのバッチ処理で実装されることが多いです。ただし、バッチ処理に関しては、以下のような問題もあります。

  • 対象テーブル数が多い、処理内容が複雑などの理由で、開発に時間と手間が掛かる。
  • 例えば ETL 処理を1日に1回実行する場合、ターゲット DB への最新データの反映は最大1日遅れる。

バッチ処理以外のデータ移動のアプローチとして Change Data Capture(CDC)があります。ソース DB のデータ変更内容を抽出し、ターゲットDBにその変更内容を反映させるという処理を自動的に行うことで、ソース DB 上のデータと同じ内容をターゲット DB 上に容易に実現する手法・ツールになります。

Change Data Capture の定義には本来、ターゲット DB にデータ変更内容を反映させるところまでは含まれず、変更内容に基づいて何かしらの処理を実行することを指しますが、本記事ではターゲット DB にデータを反映させるところまでを行うレプリケーションツールとして扱います。
レプリケーションは MySQL や PostgreSQL が標準で対応している機能ですが、これらは基本同じ種類の DBMS 間のデータ移動になります。CDC の場合はソース DB とターゲット DB が異なる種類の DBMS である場合にも対応しています。

Change Data Capture を実現するツールは昔から商用ツールを中心にいくつかあったのですが、昨今はオープンソースのツールでもこの機能を持つものが増えてきました。本記事ではそれらの1つである Airbyte を使って CDC をセットアップし、ソース DB(MySQL)からターゲット DB(PostgreSQL)への変更データの反映の結果を確認した結果を共有したいと思います。

また、主にターゲット DB 側で変更データ反映を実行しているクエリについても確認してみます。

2. セットアップ

今回は以下の環境でセットアップを行いました。

  • 実行サーバー:EC2 インスタンス(Amazon Linux 2, m6i.large)
  • Airbyte Open Source
  • ソース DB:MySQL 8.0.29
  • ターゲット DB:PostgreSQL 14.4

2-1. Airbyte のインストール

Docker Compose を利用します。以下の通り実施しています。Airbyte 自身の可用性とかを気にしなければ、コマンド3発なので簡単ですね。

2-2. MySQL のセットアップ

Airbyte から MySQL に接続する DB ユーザー(source_user)と、今回対象とするテーブルを含む DB(source_db)を作成します(rootユーザーで実施)。

-- Airbyte用のDBユーザー作成
create user source_user identified by '************';

-- DBを作成し、上で作成したユーザーに全権限を付与
create database source_db;
grant all privileges on source_db.* to source_user;

-- Airbyteはバイナリログから変更データを抽出するため、以下の権限付与が必要
grant reload, show databases, replication slave, replication client on *.* to source_user;

-- Airbyteから実行される処理を確認するために一般ログを有効化
set global general_log = on;

その後、source_user で source_db に接続し、簡単なテーブル(test_table)を作成しておきます。ちなみに、Airbyte の CDC は主キーが存在するテーブルのレプリケーションしかサポートしていません。

create table test_table (
    id int primary key,
    name varchar(10),
    flag char(1)
);

以上で MySQL のセットアップは完了です(本来はバイナリログを有効にする必要がありますが、公式 RPM パッケージからインストールした場合はデフォルトで有効になっています)。

2-3. PostgreSQL のセットアップ

Airbyte から PostgreSQL に接続するDBユーザー(target_user)と、今回対象とするテーブルを含む DB(target_db)を作成します(postgresユーザーで実施)。

-- DBユーザーの作成
create role target_user with login password '************';

-- DBを作成し、上で作成したDBユーザーに全権限を付与
create database target_db;
grant all privileges on database target_db to target_user;

その他、AirByte から実行されるコマンドを記録するために postgesql.conf に log_statement を all に設定しています。

以上で PostgreSQL のセットアップは完了です。

2-4. Airbyte の CDC セットアップ

以下の流れでセットアップします。

  • Source の設定
  • Destination の設定
  • Connection の設定

2-4-1. Source の設定

まずはソース DB を Airbyte に登録します。

Airbyte の管理画面(http://localhost:8000)を開き、左側メニューの [Sources] を選択後、画面右上の [New Source] を押します。その後、以下のように入力します。入力後に画面一番下の [Set up source] を押せば、接続テストが実行され、テストが通ればソースDBが登録されます。

項目 入力値 説明
Source Type MySQL ソース DB の DBMS を指定する。
Name soruce_db (MySQL) 作成する Source の名前。分かり易いものをつけておく。
Host 172.16.1.226 ソース DB のホスト名 or IP アドレスを指定する。(左の値は EC2 インスタンスの IP アドレス)
Port 3306 ソース DB のポート番号を指定する。(今回はデフォルトポート番号を利用)
Database source_db 2-2.で作成した DB を指定する。
Username source_user 2-2.で作成した DB ユーザー名を指定する。
Password ******* 2-2.で作成した DB ユーザーのパスワードを指定する。
Replication Method CDC CDC をするためにはこの設定が必要。

01_ソース設定.png

2-4-2. Destination の設定

次にターゲット DB を Airbyte に登録します。(Airbyte ではターゲットを Destinationと言います)

Airbyte の管理画面の左側メニューから [Destinations] を選択後、画面右上の [New destination] を押します。その後、以下のように入力します。入力後に画面一番下の [Set up destination] を押せば、接続テストが実行され、テストが通ればターゲット DB が登録されます。

項目 入力値 説明
Destination Type Postgres ターゲット DB の DBMS を指定する。
Name target_db (PostgreSQL) 作成する Destination の名前。分かり易いものをつけておく。
Host 172.16.1.226 ターゲットDB のホスト名 or IP アドレスを指定する。(左の値は EC2 インスタンスの IP アドレス)
Port 5432 ターゲット DB のポート番号を指定する。(今回はデフォルトポート番号を利用)
DB Name target_db 2-3.で作成した DB を指定する。
Default Schema public 指定必須だが、今回は利用しない。
User target_user 2-3.で作成した DB ユーザー名を指定する。
Password ******* 2-3.で作成した DB ユーザーのパスワードを指定する。

02_ターゲット設定.png

2-4-3. Connection の設定

CDC の設定を行います。Airbyte ではデータ移動の処理のことを Connection と呼びます。

Airbyte の管理画面の左側メニューから [Connections] を選択後、画面右上の [New Connection] を押します。その後、[Select an existing source] において 2-4-1. で作成した Source を選択し、[Use existing source] を押します。

03_CDC設定(ソース).png

次の画面では [Select an existing destination] において 2-4-2. で作成した Destination を選択し、[Use existing destination] を押します。

04_CDC設定(ターゲット).png

次の画面では以下の入力をして、[Set up connection] を押します。(デフォルトで選択されていると思いますが、対象テーブルのチェックがオンになっていること、[Sync mode] が [Incremental Deduped + History] になっていることを確認しておいた方が良いかもしれません)

項目 入力値 説明
Name source_db to target_db Connection の名前。分かり易い名前を指定する。
Transfer - Replication frequency Every 5 minutes 今回は頻度高くデータ反映するので、GUI 上で指定できる最短の時間間隔を指定

05_CDC設定(コネクション).png

以上で Connection の設定は終了し、Change Data Capture が動き始めます。設定はとても簡単ですね。

3. 変更データの反映の確認

Airbyte の CDC ではターゲット DB 側にソース DB の DB 名(今回は source_db)と同じスキーマが自動で作成され、そのスキーマの中に対象テーブルと同じ名前のテーブル(今回は test_table)が作成され、そちらに変更データが反映されます。また、変更の履歴は <テーブル名>_scd という名前のテーブル(今回は test_table_scd)に保存されます。ちなみに、scd は Slowly Chanaging Demension の略です。

以下の操作をソース DB 側で実施し、ターゲット DB にどう反映されるか確認してみます。

  • レコードの INSERT
  • 主キー指定によるレコードの UPDATE
  • 主キー指定によるレコードの DELETE

それ以外にも、主キー列以外を WHERE に指定した UPDATE/DELETE 文を試していますが、挙動は想像通りなので割愛します。

3-1. レコードのINSERT

ソース DB(MySQL)側で以下の INSERT 文を実行します。

insert into test_table values 
    (1, 'AAAAA', 'Y'),
    (2, 'BBBBB', 'Y');

しばらくすると、ターゲット DB 側の test_table / test_table_scd で INSERT したデータが確認できます。(id name flag のカラム順番が変わっているのは気になりますが)

target_db=> select * from source_db.test_table;
       _airbyte_unique_key        | id | flag | name  | _ab_cdc_log_pos | _ab_cdc_log_file | _ab_cdc_deleted_at |  _ab_cdc_updated_at  |            _airbyte_ab_id            |    _airbyte_
emitted_at     |    _airbyte_normalized_at     |    _airbyte_test_table_hashid    
----------------------------------+----+------+-------+-----------------+------------------+--------------------+----------------------+--------------------------------------+-------------
---------------+-------------------------------+----------------------------------
 c4ca4238a0b923820dcc509a6f75849b |  1 | Y    | AAAAA |            1872 | binlog.000005    |                    | 2022-07-17T08:36:32Z | bdf2334f-a58c-46ef-9868-2791c39d32a0 | 2022-07-17 0
8:34:12.415+00 | 2022-07-17 08:36:40.787686+00 | 68a503bb2f7a322abf64b118e81bf581
 c81e728d9d4c2f636f067f89cc14862c |  2 | Y    | BBBBB |            1872 | binlog.000005    |                    | 2022-07-17T08:36:32Z | aacca064-a711-4368-bc16-1649b861a388 | 2022-07-17 0
8:34:12.415+00 | 2022-07-17 08:36:40.787686+00 | 07e5d2eeb4f4a6c0a78536fe0a5a4689
(2 rows)

target_db=> select * from source_db.test_table_scd
target_db-> ;
       _airbyte_unique_key        |     _airbyte_unique_key_scd      | id | flag | name  | _ab_cdc_log_pos | _ab_cdc_log_file | _ab_cdc_deleted_at |  _ab_cdc_updated_at  |  _airbyte_start_
at   | _airbyte_end_at | _airbyte_active_row |            _airbyte_ab_id            |    _airbyte_emitted_at     |    _airbyte_normalized_at     |    _airbyte_test_table_hashid    
----------------------------------+----------------------------------+----+------+-------+-----------------+------------------+--------------------+----------------------+-----------------
-----+-----------------+---------------------+--------------------------------------+----------------------------+-------------------------------+----------------------------------
 c4ca4238a0b923820dcc509a6f75849b | 4ebb0ca68102de10f265559739637a66 |  1 | Y    | AAAAA |            1872 | binlog.000005    |                    | 2022-07-17T08:36:32Z | 2022-07-17T08:36
:32Z |                 |                   1 | bdf2334f-a58c-46ef-9868-2791c39d32a0 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:36:40.737628+00 | 68a503bb2f7a322abf64b118e81bf581
 c81e728d9d4c2f636f067f89cc14862c | ab25f5fe8d4608d536f04553d6b2102d |  2 | Y    | BBBBB |            1872 | binlog.000005    |                    | 2022-07-17T08:36:32Z | 2022-07-17T08:36
:32Z |                 |                   1 | aacca064-a711-4368-bc16-1649b861a388 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:36:40.737628+00 | 07e5d2eeb4f4a6c0a78536fe0a5a4689
(2 rows)

Airbyte の管理画面からは、2行ソース DB から emitted され(抽出され)、ターゲット DB に2行 committed されたことが分かります。(ちなみに、2 m 38 s 時間が掛っているように見えるのは、Sync の処理自体はソースでの INSERT 文実行の前から開始されているため、少し長めに出ています)

06_INSERT.png

3-2. 主キー指定によるレコードの UPDATE

次に、ソース DB(MySQL)側で以下の UPDATE 文を実行します。

update test_table set name = 'bbbbb' where id = 2;

しばらくすると、ターゲット DB 側にもデータが反映されます。test_table テーブルは実際に値が更新されていますが、test_table_scd テーブルは変更データが追記されています。古いデータ(name = 'BBBBB' のレコード)は _airbyte_active_row が 0 になっています。

一見想定通りな気がしますが、

  • test_table テーブルの id = 1 のレコード(未更新レコード)の _airbyte_normalized_at が更新されている。
  • test_table_scd テーブルの id = 1 のレコードの _airbyte_normalized_at も更新されている。

というのは気になりますので、後でも少し触れます。

target_db=> select * from source_db.test_table;
       _airbyte_unique_key        | id | flag | name  | _ab_cdc_log_pos | _ab_cdc_log_file | _ab_cdc_deleted_at |  _ab_cdc_updated_at  |            _airbyte_ab_id            |    _airbyte_
emitted_at     |    _airbyte_normalized_at     |    _airbyte_test_table_hashid    
----------------------------------+----+------+-------+-----------------+------------------+--------------------+----------------------+--------------------------------------+-------------
---------------+-------------------------------+----------------------------------
 c4ca4238a0b923820dcc509a6f75849b |  1 | Y    | AAAAA |            1872 | binlog.000005    |                    | 2022-07-17T08:36:32Z | bdf2334f-a58c-46ef-9868-2791c39d32a0 | 2022-07-17 0
8:34:12.415+00 | 2022-07-17 08:43:25.959628+00 | 68a503bb2f7a322abf64b118e81bf581
 c81e728d9d4c2f636f067f89cc14862c |  2 | Y    | bbbbb |            2204 | binlog.000005    |                    | 2022-07-17T08:43:18Z | 828d0f8d-5cd9-4627-8542-38bfc5848118 | 2022-07-17 0
8:39:11.725+00 | 2022-07-17 08:43:25.959628+00 | 9758d4a2d9c58ad80bcd0f88e1e1e195
(2 rows)

target_db=> select * from source_db.test_table_scd;
       _airbyte_unique_key        |     _airbyte_unique_key_scd      | id | flag | name  | _ab_cdc_log_pos | _ab_cdc_log_file | _ab_cdc_deleted_at |  _ab_cdc_updated_at  |  _airbyte_start_
at   |   _airbyte_end_at    | _airbyte_active_row |            _airbyte_ab_id            |    _airbyte_emitted_at     |    _airbyte_normalized_at     |    _airbyte_test_table_hashid    
----------------------------------+----------------------------------+----+------+-------+-----------------+------------------+--------------------+----------------------+-----------------
-----+----------------------+---------------------+--------------------------------------+----------------------------+-------------------------------+----------------------------------
 c4ca4238a0b923820dcc509a6f75849b | 4ebb0ca68102de10f265559739637a66 |  1 | Y    | AAAAA |            1872 | binlog.000005    |                    | 2022-07-17T08:36:32Z | 2022-07-17T08:36
:32Z |                      |                   1 | bdf2334f-a58c-46ef-9868-2791c39d32a0 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:43:25.798248+00 | 68a503bb2f7a322abf64b118e81bf581
 c81e728d9d4c2f636f067f89cc14862c | ab25f5fe8d4608d536f04553d6b2102d |  2 | Y    | BBBBB |            1872 | binlog.000005    |                    | 2022-07-17T08:36:32Z | 2022-07-17T08:36
:32Z | 2022-07-17T08:43:18Z |                   0 | aacca064-a711-4368-bc16-1649b861a388 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:43:25.798248+00 | 07e5d2eeb4f4a6c0a78536fe0a5a4689
 c81e728d9d4c2f636f067f89cc14862c | 33d0bfeebc74af1f66c53a0b2651d347 |  2 | Y    | bbbbb |            2204 | binlog.000005    |                    | 2022-07-17T08:43:18Z | 2022-07-17T08:43
:18Z |                      |                   1 | 828d0f8d-5cd9-4627-8542-38bfc5848118 | 2022-07-17 08:39:11.725+00 | 2022-07-17 08:43:25.798248+00 | 9758d4a2d9c58ad80bcd0f88e1e1e195
(3 rows)

Airbyte の管理画面からは、ソース DB から1行変更が抽出され、ターゲット DB に1行反映されたことが分かります。(4 m 23 s 掛かっているように見えるのは、3-1. と同じ理由です)

06_UPDATE.png

3-3. 主キー指定によるレコードの DELETE

次に、ソース DB(MySQL)側で以下の DELETE 文を実行します。

delete from test_table where id = 1;

しばらくすると、ターゲット DB 側にもデータが反映されます。test_table テーブルは該当レコードが削除されていますが、test_table_scd テーブルは削除情報が一番下に追記され、2行目の削除前のデータは _airbyte_active_row が 0 に更新されています。(すいません、test_table のデータを取り損ねたので、以下は test_table_scd のみです)

target_db=> select * from source_db.test_table_scd order by _airbyte_emitted_at;
       _airbyte_unique_key        |     _airbyte_unique_key_scd      | id | flag | name  | _ab_cdc_log_pos | _ab_cdc_log_file |  _ab_cdc_deleted_at  |  _ab_cdc_updated_at  |  _airbyte_star
t_at   |   _airbyte_end_at    | _airbyte_active_row |            _airbyte_ab_id            |    _airbyte_emitted_at     |    _airbyte_normalized_at     |    _airbyte_test_table_hashid    
----------------------------------+----------------------------------+----+------+-------+-----------------+------------------+----------------------+----------------------+---------------
-------+----------------------+---------------------+--------------------------------------+----------------------------+-------------------------------+----------------------------------
 c81e728d9d4c2f636f067f89cc14862c | ab25f5fe8d4608d536f04553d6b2102d |  2 | Y    | BBBBB |            1872 | binlog.000005    |                      | 2022-07-17T08:36:32Z | 2022-07-17T08:
36:32Z | 2022-07-17T08:43:18Z |                   0 | aacca064-a711-4368-bc16-1649b861a388 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:43:25.798248+00 | 07e5d2eeb4f4a6c0a78536fe0a5a4689
 c4ca4238a0b923820dcc509a6f75849b | 4ebb0ca68102de10f265559739637a66 |  1 | Y    | AAAAA |            1872 | binlog.000005    |                      | 2022-07-17T08:36:32Z | 2022-07-17T08:
36:32Z | 2022-07-17T08:45:25Z |                   0 | bdf2334f-a58c-46ef-9868-2791c39d32a0 | 2022-07-17 08:34:12.415+00 | 2022-07-17 08:45:33.593568+00 | 68a503bb2f7a322abf64b118e81bf581
 c81e728d9d4c2f636f067f89cc14862c | 33d0bfeebc74af1f66c53a0b2651d347 |  2 | Y    | bbbbb |            2204 | binlog.000005    |                      | 2022-07-17T08:43:18Z | 2022-07-17T08:
43:18Z |                      |                   1 | 828d0f8d-5cd9-4627-8542-38bfc5848118 | 2022-07-17 08:39:11.725+00 | 2022-07-17 08:45:33.593568+00 | 9758d4a2d9c58ad80bcd0f88e1e1e195
 c4ca4238a0b923820dcc509a6f75849b | 5eafa570326946a66b534b916815a9db |  1 | Y    | AAAAA |            2528 | binlog.000005    | 2022-07-17T08:45:25Z | 2022-07-17T08:45:25Z | 2022-07-17T08:
45:25Z |                      |                   0 | 9c4983a7-a0f9-4947-bd7a-ee718565be5f | 2022-07-17 08:44:12.231+00 | 2022-07-17 08:45:33.593568+00 | 07bd0b71b94017f4bc27e04cd560f218
(4 rows)

4. Airbyte から実行されるコマンド

4-1. ターゲット DB(PostgreSQL)に実行されるコマンド

直前の 3. で示した通り Airbyte の CDC は test_table と test_table_scd のデータを作ることが最終ゴールなのですが、その過程で以下の順で中間テーブルが生成されます。テーブルの名前を見ると分かりますが、内部では dbt を使っているようですね。

  • _airbyte_tmp_<ランダム3文字>_test_table
  • _airbyte_raw_test_table
  • test_table_stg__dbt_tmp<ランダム12桁数字>
  • test_table_stg
  • test_table_scd__dbt_tmp<ランダム12桁数字>
  • test_table_scd
  • test_table__dbt_tmp<ランダム12桁数字>
  • test_table

順にデータを生成しているコマンドを PostgreSQL のクエリログから抽出すると以下のようになります(メタデータ取得のクエリは省略しています)。

_airbyte_tmp_<ランダム3文字>_test_table

こちらは、変更差分データ(JSON形式)が単純にロードされるテーブルです。(JSONデータを大量に RDBMS に入れるのは若干気になると言えば気になるのですが。できれば Airbyte 側で JSON ⇒ 表形式に直してほしいなぁ。)

CREATE TABLE IF NOT EXISTS source_db._airbyte_tmp_pjk_test_table ( 
	_airbyte_ab_id VARCHAR PRIMARY KEY,
	_airbyte_data JSONB,
	_airbyte_emitted_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
	);

COPY source_db._airbyte_tmp_pjk_test_table FROM stdin DELIMITER ',' CSV;

_airbyte_raw_test_table

ロードしたデータを INSERT .. SELECT .. で移し替えているようですね。_airbyte_tmp_<ランダム3文字>_test_table はこの処理が終わるとテーブル削除されますが、こちらのテーブルはテーブル削除/データ削除はなく、ずっと蓄積されるデータのようです。(別にワンクッション挟まなくても良くないか?とも思わなくはないですが)

CREATE TABLE IF NOT EXISTS source_db._airbyte_raw_test_table ( 
	_airbyte_ab_id VARCHAR PRIMARY KEY,
	_airbyte_data JSONB,
	_airbyte_emitted_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
	);

INSERT INTO source_db._airbyte_raw_test_table SELECT * FROM source_db._airbyte_tmp_pjk_test_table;

test_table_stg__dbt_tmp<ランダム12桁数字>

こちらのテーブルの処理は長いですが、やっていることは単純に以下です。

  • JSONデータを表形式に変換
  • データ型を変換
  • 主キーの値から md5 チェックサムを計算してサロゲートキーを生成(_airbyte_test_table_hashid)
  • test_table_stg に取り込み済みのデータより古いデータは除去

最後の条件が >= になっているため、1回前の差分データも抽出しているような気がするのですが、何故こういう実装になってるんですかね?(これが未更新レコードの _airbyte_normalized_at も更新されていた理由だと思います)

	  create temporary table "test_table_stg__dbt_tmp084325661583"
	  as (
	    
	with __dbt__cte__test_table_ab1 as (
	
	-- SQL model to parse JSON blob stored in a single column and extract into separated field columns as described by the JSON Schema
	-- depends_on: "target_db".source_db._airbyte_raw_test_table
	select
	    jsonb_extract_path_text(_airbyte_data, 'id') as "id",
	    jsonb_extract_path_text(_airbyte_data, 'flag') as "flag",
	    jsonb_extract_path_text(_airbyte_data, 'name') as "name",
	    jsonb_extract_path_text(_airbyte_data, '_ab_cdc_log_pos') as _ab_cdc_log_pos,
	    jsonb_extract_path_text(_airbyte_data, '_ab_cdc_log_file') as _ab_cdc_log_file,
	    jsonb_extract_path_text(_airbyte_data, '_ab_cdc_deleted_at') as _ab_cdc_deleted_at,
	    jsonb_extract_path_text(_airbyte_data, '_ab_cdc_updated_at') as _ab_cdc_updated_at,
	    _airbyte_ab_id,
	    _airbyte_emitted_at,
	    now() as _airbyte_normalized_at
	from "target_db".source_db._airbyte_raw_test_table as table_alias
	-- test_table
	where 1 = 1
	
	),  __dbt__cte__test_table_ab2 as (
	
	-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
	-- depends_on: __dbt__cte__test_table_ab1
	select
	    cast("id" as 
	    float
	) as "id",
	    cast("flag" as text) as "flag",
	    cast("name" as text) as "name",
	    cast(_ab_cdc_log_pos as 
	    float
	) as _ab_cdc_log_pos,
	    cast(_ab_cdc_log_file as text) as _ab_cdc_log_file,
	    cast(_ab_cdc_deleted_at as text) as _ab_cdc_deleted_at,
	    cast(_ab_cdc_updated_at as text) as _ab_cdc_updated_at,
	    _airbyte_ab_id,
	    _airbyte_emitted_at,
	    now() as _airbyte_normalized_at
	from __dbt__cte__test_table_ab1
	-- test_table
	where 1 = 1
	
	)-- SQL model to build a hash column based on the values of this record
	-- depends_on: __dbt__cte__test_table_ab2
	select
	    md5(cast(coalesce(cast("id" as text), '') || '-' || coalesce(cast("flag" as text), '') || '-' || coalesce(cast("name" as text), '') || '-' || coalesce(cast(_ab_cdc_log_pos as text), '') || '-' || coalesce(cast(_ab_cdc_log_file as text), '') || '-' || coalesce(cast(_ab_cdc_deleted_at as text), '') || '-' || coalesce(cast(_ab_cdc_updated_at as text), '') as text)) as _airbyte_test_table_hashid,
	    tmp.*
	from __dbt__cte__test_table_ab2 tmp
	-- test_table
	where 1 = 1
	
	and coalesce(
	    cast(_airbyte_emitted_at as 
	    timestamp with time zone
	) >= (select max(cast(_airbyte_emitted_at as 
	    timestamp with time zone
	)) from "target_db"._airbyte_source_db."test_table_stg"),
	    
	    
	    true)
	
	  );

test_table_stg

先ほどのテーブルから _airbyte_ab_id をキーにマージを実施しています。(_airbyte_ab_id は差分変更データ1行ごとに振られる ID だと思ってください)

	    delete from "target_db"._airbyte_source_db."test_table_stg"
	    where (_airbyte_ab_id) in (
	        select (_airbyte_ab_id)
	        from "test_table_stg__dbt_tmp084325661583"
	    );
	    
	
	    insert into "target_db"._airbyte_source_db."test_table_stg" ("_airbyte_test_table_hashid", "id", "flag", "name", "_ab_cdc_log_pos", "_ab_cdc_log_file", "_ab_cdc_deleted_at", "_ab_cdc_updated_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at")
	    (
	        select "_airbyte_test_table_hashid", "id", "flag", "name", "_ab_cdc_log_pos", "_ab_cdc_log_file", "_ab_cdc_deleted_at", "_ab_cdc_updated_at", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at"
	        from "test_table_stg__dbt_tmp084325661583"
	    )

test_table_scd__dbt_tmp<ランダム12桁数字>

このテーブルの作成処理を説明するのは大変なので省略しますが、いわゆる Slowly Changing Demension Type 2 の処理になります。

	  create temporary table "test_table_scd__dbt_tmp084325868950"
	  as (
	    
	-- depends_on: ref('test_table_stg')
	with
	
	new_data as (
	    -- retrieve incremental "new" data
	    select
	        *
	    from "target_db"._airbyte_source_db."test_table_stg"
	    -- test_table from "target_db".source_db._airbyte_raw_test_table
	    where 1 = 1
	    
	and coalesce(
	    cast(_airbyte_emitted_at as 
	    timestamp with time zone
	) >= (select max(cast(_airbyte_emitted_at as 
	    timestamp with time zone
	)) from "target_db".source_db."test_table_scd"),
	    
	    
	    true)
	
	),
	new_data_ids as (
	    -- build a subset of _airbyte_unique_key from rows that are new
	    select distinct
	        md5(cast(coalesce(cast("id" as text), '') as text)) as _airbyte_unique_key
	    from new_data
	),
	empty_new_data as (
	    -- build an empty table to only keep the table's column types
	    select * from new_data where 1 = 0
	),
	previous_active_scd_data as (
	    -- retrieve "incomplete old" data that needs to be updated with an end date because of new changes
	    select
	        this_data."_airbyte_test_table_hashid",
	  this_data."id",
	  this_data."flag",
	  this_data."name",
	  this_data."_ab_cdc_log_pos",
	  this_data."_ab_cdc_log_file",
	  this_data."_ab_cdc_deleted_at",
	  this_data."_ab_cdc_updated_at",
	  this_data."_airbyte_ab_id",
	  this_data."_airbyte_emitted_at",
	  this_data."_airbyte_normalized_at"
	    from "target_db".source_db."test_table_scd" as this_data
	    -- make a join with new_data using primary key to filter active data that need to be updated only
	    join new_data_ids on this_data._airbyte_unique_key = new_data_ids._airbyte_unique_key
	    -- force left join to NULL values (we just need to transfer column types only for the star_intersect macro on schema changes)
	    left join empty_new_data as inc_data on this_data._airbyte_ab_id = inc_data._airbyte_ab_id
	    where _airbyte_active_row = 1
	),
	input_data as (
	    select "_airbyte_test_table_hashid",
	  "id",
	  "flag",
	  "name",
	  "_ab_cdc_log_pos",
	  "_ab_cdc_log_file",
	  "_ab_cdc_deleted_at",
	  "_ab_cdc_updated_at",
	  "_airbyte_ab_id",
	  "_airbyte_emitted_at",
	  "_airbyte_normalized_at" from new_data
	    union all
	    select "_airbyte_test_table_hashid",
	  "id",
	  "flag",
	  "name",
	  "_ab_cdc_log_pos",
	  "_ab_cdc_log_file",
	  "_ab_cdc_deleted_at",
	  "_ab_cdc_updated_at",
	  "_airbyte_ab_id",
	  "_airbyte_emitted_at",
	  "_airbyte_normalized_at" from previous_active_scd_data
	),
	
	scd_data as (
	    -- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
	    select
	      md5(cast(coalesce(cast("id" as text), '') as text)) as _airbyte_unique_key,
	      "id",
	      "flag",
	      "name",
	      _ab_cdc_log_pos,
	      _ab_cdc_log_file,
	      _ab_cdc_deleted_at,
	      _ab_cdc_updated_at,
	      _ab_cdc_updated_at as _airbyte_start_at,
	      lag(_ab_cdc_updated_at) over (
	        partition by cast("id" as text)
	        order by
	            _ab_cdc_updated_at is null asc,
	            _ab_cdc_updated_at desc,
	            _ab_cdc_updated_at desc,
	            _ab_cdc_log_pos desc,
	            _airbyte_emitted_at desc
	      ) as _airbyte_end_at,
	      case when row_number() over (
	        partition by cast("id" as text)
	        order by
	            _ab_cdc_updated_at is null asc,
	            _ab_cdc_updated_at desc,
	            _ab_cdc_updated_at desc,
	            _ab_cdc_log_pos desc,
	            _airbyte_emitted_at desc
	      ) = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row,
	      _airbyte_ab_id,
	      _airbyte_emitted_at,
	      _airbyte_test_table_hashid
	    from input_data
	),
	dedup_data as (
	    select
	        -- we need to ensure de-duplicated rows for merge/update queries
	        -- additionally, we generate a unique key for the scd table
	        row_number() over (
	            partition by
	                _airbyte_unique_key,
	                _airbyte_start_at,
	                _airbyte_emitted_at, cast(_ab_cdc_deleted_at as text), cast(_ab_cdc_updated_at as text), cast(_ab_cdc_log_pos as text)
	            order by _airbyte_active_row desc, _airbyte_ab_id
	        ) as _airbyte_row_num,
	        md5(cast(coalesce(cast(_airbyte_unique_key as text), '') || '-' || coalesce(cast(_airbyte_start_at as text), '') || '-' || coalesce(cast(_airbyte_emitted_at as text), '') || '-' || coalesce(cast(_ab_cdc_deleted_at as text), '') || '-' || coalesce(cast(_ab_cdc_updated_at as text), '') || '-' || coalesce(cast(_ab_cdc_log_pos as text), '') as text)) as _airbyte_unique_key_scd,
	        scd_data.*
	    from scd_data
	)
	select
	    _airbyte_unique_key,
	    _airbyte_unique_key_scd,
	    "id",
	    "flag",
	    "name",
	    _ab_cdc_log_pos,
	    _ab_cdc_log_file,
	    _ab_cdc_deleted_at,
	    _ab_cdc_updated_at,
	    _airbyte_start_at,
	    _airbyte_end_at,
	    _airbyte_active_row,
	    _airbyte_ab_id,
	    _airbyte_emitted_at,
	    now() as _airbyte_normalized_at,
	    _airbyte_test_table_hashid
	from dedup_data where _airbyte_row_num = 1
	  );

test_table_scd

こちらは _airbyte_unique_key_scd をキーにした単純なデータマージです。

	    delete from "target_db".source_db."test_table_scd"
	    where (_airbyte_unique_key_scd) in (
	        select (_airbyte_unique_key_scd)
	        from "test_table_scd__dbt_tmp084325868950"
	    );
	    
	
	    insert into "target_db".source_db."test_table_scd" ("_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "flag", "name", "_ab_cdc_log_pos", "_ab_cdc_log_file", "_ab_cdc_deleted_at", "_ab_cdc_updated_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_test_table_hashid")
	    (
	        select "_airbyte_unique_key", "_airbyte_unique_key_scd", "id", "flag", "name", "_ab_cdc_log_pos", "_ab_cdc_log_file", "_ab_cdc_deleted_at", "_ab_cdc_updated_at", "_airbyte_start_at", "_airbyte_end_at", "_airbyte_active_row", "_airbyte_ab_id", "_airbyte_emitted_at", "_airbyte_normalized_at", "_airbyte_test_table_hashid"
	        from "test_table_scd__dbt_tmp084325868950"
	    )

test_table__dbt_tmp<ランダム12桁数字>

(省略)

test_table

(省略)

4-2. ソース DB(MySQL)に実行されるコマンド

こちらは簡単にですが、MySQL の一般ログを確認すると、Airbyte からバイナリログのダンプを実行しているようです。

2022-07-17T08:39:13.872714Z	   25 Binlog Dump	Log: 'binlog.000005'  Pos: 1720

ちなみに、ターゲット DB が停止している場合はバイナリログの取得はスキップし、ターゲット DB が復活してから過去分含めての変更データを取得するようです。(Airbyte の内部に変更データを蓄積しておいて、ターゲット DB が復帰したのちに適用するというわけではなさそう)。

5. まとめ

本記事は Airbyte で MySQL から PostgreSQL への Change Data Capture を試してみて、背後で実行されるコマンドも確認してみました。

感想ですが、

  • 動かすだけなら GUI でパパっと設定できるので簡単。
  • カラムの順番が変わるのは気に入らない。(id name flag ⇒ id flag name)
  • データ適用は中間テーブルを挟みながらも複数行の変更データを一括で適用しており、処理方式としては Qlik Replicate に近いかな(MySQL の row mode レプリケーションや Oracle GoldenGateとは違う)。大量の変更データの適用には有利っぽいけど、ラグ(適用遅延)が気になるかも(今回だと最小でも1分ぐらいは掛かっている)。
  • _airbyte_normalized_at の更新範囲が広い。かつ内部では UPDATE ではなく DELETE/INSERT なので、性能や容量管理には注意する必要がありそう(VACUUM のある PostgreSQL やタイムトラベルを持つ Snowflake などは特に)。
  • ターゲット DB が停止している間はソース DB から変更データを抽出しないので、ターゲット DB の長期計画停止などの場合には、ソース側でバイナリログの保存期間など気にする必要がありそう。(逆に Airbyte で変更データをため込まないならそれはそれで管理しやすいですが)

本番利用するにはまだまだ検証が必要ですが、OSS で Chanage Data Capture が簡単にできるようになるというのは、今後に期待したいと思います。

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?