1. はじめに
本記事では、オープンソースのデータカタログツールである OpenMetadata を利用して、以下のようなデータパイプラインの End-to-End のデータリネージの取得を試してみた内容を紹介します。
- 一番左側の業務システムの販売管理 DB(sales_db/MySQL)に格納されている2つのテーブル(sales_detail と item)を Airbyte を利用して、データ分析用の DWH(dwh/PostgreSQL)に複製する。
- DWH 内に複製された sales_detail テーブルと item テーブルから dbt を利用してデータマート用のテーブル sales_summary を作成する。
- データマートテーブル sales_summary を BI ツール Metabase から参照する。
結果としては、以下のように何とかデータリネージを取ることはできましたが、結構問題点や不満な点が多いので記事タイトルは「~してみたかった」にしています(なにが問題点や不満な点なのかは後述します)。
2. 利用ソフトウェア
- データカタログ:OpenMetadata 0.13.1
- データベース:MySQL 8.0.31(ソースDB)、PostgreSQL 15.1(DWH)
- データパイプライン:Airbytes 0.40.26、dbt 1.3.1
- BI ツール:Metabase 0.45.1
今回はすべて Oracle Linux 8.7 (x86_64) 上で動作させています。
3. パイプラインの作成
まずは、データパイプラインをどう作ったか示します。データリネージを取得できるようにするためにはこの時点から幾つか気を付ける点があります。
3-1. sales_db でのテーブル/データ作成
まずは、一番左の sales_db(MySQL)に sales_detail テーブルと item テーブルを作成します。
create table item (
item_cd varchar(5) primary key,
item_category varchar(30) not null
);
create table sales_detail (
sales_id int primary key,
item_cd varchar(5) not null,
sales_quantity int not null,
foreign key item_cd_fk (item_cd) references item (item_cd)
);
insert into item values
('i0001', 'C001'),
('i0002', 'C002'),
('i0003', 'C002')
;
insert into sales_detail values
(1, 'i0001', 1),
(2, 'i0001', 3),
(3, 'i0002', 2),
(4, 'i0003', 1)
;
3-2. Airbyte による sales_db ⇒ dwh へのデータ複製
Airbyte で sales_db から dwh に2つのテーブルを複製する設定をしていきます。
まず、複製元の DB(sales_db/MySQL)を Source として Airbytes に登録します。
- 左のメニューから [Sources] を選択する。
- [Source type] で MySQL を選択する。
- [Source name] に
sales_db
と入力する。(ここでの名前は Airbyte 上の表示名だが、実際の DB 名と一致させておいた方が良い) - [Host] ~ [Password] に DB 接続情報を入力する。
- [SSL Connection] は無効にする。(今回の環境は SSL 接続の設定をしていないため)
- 一番下の [Set up source] ボタンを押して登録する。
次に、複製先の DB(dwh/PostgreSQL)を Destination として Airbyte に登録します。
- 左のメニューから [Destinations] を選択する。
- [Destination type] で Postgres を選択する。
- [Destination name] に
dwh
と入力する。(Source と同様、実際の DB 名と一致させておいた方が良い) - [Host] ~ [Password] に DB 接続情報を入力する。([Default Schema] は今回の Airbyte では使わないが
public
で OK) - 一番下の [Set up destination] ボタンを押して登録する。
以降では、Source ⇒ Destination テーブル/データ複製の設定を行います。
左のメニューで [Connections] を選択し、[Create your first connecition] ボタンを押します。すると以下のような画面になるので、先ほど登録した複製元 DB を選択し [Use existing source] ボタンを押します。
次に複製先 DB を選択し [Use existing destination] ボタンを押します。
しばらくすると以下のような画面になるため、以下の設定をして一番下の [Set up connection] ボタンを押して設定は完了です。
- [Replication frequency] を [Manual] に変更(今回は複製を手動実行します)
- [Destination Namespace] が [Mirror source structure] になっていることを確認([Destination default] の設定ではデータリネージは取得できなかった)
設定後は以下のような画面になるので、[Sync now] ボタンをクリックして複製を実行します。
実行が成功すると、データベース dwh の sales_db スキーマに以下のようにテーブルとデータが複製されます。
dwh=> \dt sales_db.*
リレーション一覧
スキーマ | 名前 | タイプ | 所有者
----------+---------------------------+----------+-------------
sales_db | _airbyte_raw_item | テーブル | test_user01
sales_db | _airbyte_raw_sales_detail | テーブル | test_user01
sales_db | item | テーブル | test_user01
sales_db | sales_detail | テーブル | test_user01
(4 行)
dwh=> select * from sales_db.item;
item_cd | item_category | _airbyte_ab_id | _airbyte_emitted_at | _airbyte_normalized_at | _airbyte_item_hashid
---------+---------------+--------------------------------------+----------------------------+-------------------------------+----------------------------------
i0001 | C001 | 41125a8a-7f77-4357-970a-1b7affc29f9f | 2023-01-11 23:53:34.755+09 | 2023-01-11 23:53:45.566484+09 | 1bf22c33e00bb9a0d1d81adc5efcc87e
i0002 | C002 | 6da1da7d-4dec-42a6-b542-3840d60f94b7 | 2023-01-11 23:53:34.755+09 | 2023-01-11 23:53:45.566484+09 | b977cd76d1af376e16ed3c43efc63b2c
i0003 | C002 | 2aa754e2-7ef1-4a7e-9c51-b7e854a03db4 | 2023-01-11 23:53:34.755+09 | 2023-01-11 23:53:45.566484+09 | 3eae70311aed412f890e0ce10bbd8151
(3 行)
dwh=> select * from sales_db.sales_detail;
item_cd | sales_id | sales_quantity | _airbyte_ab_id | _airbyte_emitted_at | _airbyte_normalized_at | _airbyte_sales_detail_hashid
---------+----------+----------------+--------------------------------------+----------------------------+-------------------------------+----------------------------------
i0001 | 1 | 1 | d3c3f7da-acda-430a-b6aa-763efc50d7bc | 2023-01-11 23:53:34.755+09 | 2023-01-11 23:53:45.562124+09 | 6f9a8123c8fee40c2ea42b7eef11c86b
i0001 | 2 | 3 | ff34a427-e2b5-4f1f-8fd2-49958677ff0b | 2023-01-11 23:53:34.755+09 | 2023-01-11 23:53:45.562124+09 | 29cbc74b633abacd0cfb6f59f800061c
i0002 | 3 | 2 | 16c52b48-d57a-4989-93de-7d184420eeb6 | 2023-01-11 23:53:34.755+09 | 2023-01-11 23:53:45.562124+09 | 15d56b6e9d693b62f744098f5bb8e7b7
i0003 | 4 | 1 | fce10bb2-28e2-45d8-8e7b-982bc64a7bb1 | 2023-01-11 23:53:34.755+09 | 2023-01-11 23:53:45.562124+09 | d06b125c234a4df2190876b9193a6f67
(4 行)
3-3. dbt によるデータマートの作成
dwh データベースの sales_db スキーマ内に複製された sales_detail テーブルと item テーブルから sales_summary テーブルを dbt で作成していきます。
dbt のプロジェクト初期設定や DB 接続情報登録は割愛します。
まず、元となる2つのテーブルを dbt の sources に登録します。
version: 2
sources:
- name: sales_db
tables:
- name: item
- name: sales_detail
次に sales_summary テーブルを作成するモデルを登録します。
{{
config(materialized='table')
}}
select
d.sales_id sales_id,
i.item_cd item_cd,
i.item_category item_category,
d.sales_quantity sales_quantity
from
{{ source('sales_db', 'sales_detail') }} d
inner join
{{ source('sales_db', 'item') }} i
on
d.item_cd = i.item_cd
これで設定は完了なので実行します。
$ dbt run
01:40:58 Running with dbt=1.3.1
01:40:58 Found 1 model, 0 tests, 0 snapshots, 0 analyses, 289 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics
01:40:58
01:40:58 Concurrency: 1 threads (target='dev')
01:40:58
01:40:58 1 of 1 START sql table model public.sales_summary .............................. [RUN]
01:40:59 1 of 1 OK created sql table model public.sales_summary ......................... [SELECT 4 in 0.22s]
01:40:59
01:40:59 Finished running 1 table model in 0 hours 0 minutes and 0.61 seconds (0.61s).
01:40:59
01:40:59 Completed successfully
01:40:59
01:40:59 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
成功すると、データマート sales_summary テーブルが作成されます。
dwh=> \d
リレーション一覧
スキーマ | 名前 | タイプ | 所有者
----------+---------------+----------+-------------
public | sales_summary | テーブル | test_user01
(1 行)
dwh=> select * from public.sales_summary;
sales_id | item_cd | item_category | sales_quantity
----------+---------+---------------+----------------
1 | i0001 | C001 | 1
2 | i0001 | C001 | 3
3 | i0002 | C002 | 2
4 | i0003 | C002 | 1
(4 行)
3-4. Metadata による BI ダッシュボードの作成
データベース dwh に作成したテーブル sales_summary を参照する簡単なダッシュボードを Metabase で作成します。
まず、はじめにデータリネージを取得するための重要な設定をします。この設定をしないと、sales_summary というテーブル名を Metabase に取り込んだ場合、Metabase 上の表示名が Sales Summary と Pascal Case + 空白アリに変換され、OpenMetadata がメタデータを取り込んだ際にデータリネージが繋がらなくなります。
- Metabaseにログイン後、画面右上の歯車アイコンから [管理者設定] を選択する。
- 画面上部のメニューの [設定] ⇒ 左メニューの [一般] を選択する。
- [わかりやすいテーブル名とフィールド名] の設定を無効にする。
- [管理者設定] 画面に移動する。
- 画面上のメニュー [データベース] ⇒ 画面右上の [データベースを追加] を選択する。
- [データベースのタイプ] で [PostgreSQL] を選択する。
- [表示名] に
dwh
と入力する。(データベース名と一致させる) - [ホスト名] ~ [パスワード] に DB 接続情報を入力する。
- [Schema] は [全て] を選択する。(デフォルト、変更 NG)
- [保存] ボタンを押す。
最後に、ダッシュボードを作成します。Metabase におけるダッシュボード作成手順は割愛しますが、今回は以下のようなダッシュボードを作成しました。
- X 軸が item_category、Y 軸が sales_quantity の合計を表す縦棒グラフを Sales Chart という名前で作成する。
- Sales Chart を1つ含むダッシュボード Sales Dashboard を作成する。
以上でデータパイプラインの作成は完了です。
4. OpenMetadata へのメタデータ取り込み
DB、Aribyte、dbt、Metabase からメタデータを取り込みます。
4-1. DB からのメタデータ取り込み
まずデータベース sales_db(MySQL)から取り込みます。
画面右上の [Settings] ⇒ 左メニューから [Database] と選択し、[Add new] ボタンを押します。すると以下のような DB の種類の選択画面になるので、[Mysql] を選択します。
次の画面で sales_db のOpenMetadata 上での表示名を入力しますが、sales_db
を DB 名と一致させます(厳密には Airbyte における DB 登録名と一致させる必要があります)。
次の画面で sales_db への接続情報を求められるので入力し、[Test Connection] を押してテスト接続に問題がないことを確認後、[Save] を押します。
次の画面で [Add Ingestion] を押すと以下のような画面になるので、メタデータを取り込むスキーマを設定します(スクリーンショットでは間違えて public を含めていますが、sales_db のみで OK です)。加えて [Enable Debug Log] を有効にして [Next] を押します(デバッグログはデータリネージが取得できなかった際の原因調査に役に立ちます)。
次の画面で、メタデータを取り込むスケジュール実行の設定が可能ですが、今回は手動で実行するので [none] を選択します。
これで設定は完了なので、実際に取り込み処理を実行します。以下の画面で [Run] を押し、[Recent Runs] が [Success] になれば成功です(ステータス更新にはブラウザリロードが必要)。
画面上メニューの [Explore] をクリックすると、sales_db(MySQL)のテーブル2つが表示されていることが確認できます。
以上で sales_db(MySQL)からのメタデータ取り込みは完了です。dwh(PostgreSQL)からも同様にメタデータを取り込みます。
4-2. Airbyte からのメタデータ取り込み
Airbyte は Pipeline Service として取り込みます。
画面右上の [Setting] ⇒ 左メニューの [Pipelines] ⇒ [Add new] と選択し、次の画面で [Airbyte] を選択、さらに次の画面で名前(なんでもよい)を入力すると、Airbyte の接続情報を入力する画面になるので、必要情報を入力します。
あとはデータベースの時と同様に Ingestion の作成と実行を行えば、Airbyte からのメタデータ取り込みは完了です。
4-3. dbt からのメタデータ取り込み
OpenMetadata が dbt からメタデータを取り込む際は dbt catalog を利用するので、以下のように dbt catalog の生成とサーバー起動を行います。
$ dbt docs generate
02:52:41 Running with dbt=1.3.1
02:52:41 Found 1 model, 0 tests, 0 snapshots, 0 analyses, 289 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics
02:52:41
02:52:41 Concurrency: 1 threads (target='dev')
02:52:41
02:52:41 Done.
02:52:41 Building catalog
02:52:41 Catalog written to /home/mabe/OpenMetadata/target/catalog.json
$ dbt docs serve --port 8001
02:53:04 Running with dbt=1.3.1
02:53:04 Serving docs at 0.0.0.0:8001
02:53:04 To access from your browser, navigate to: http://localhost:8001
02:53:04
02:53:04
02:53:04 Press Ctrl+C to exit.
その後、OpenMetadata に戻って、[Settings] ⇒ [Database Services] ⇒ [dwh] ⇒ [Ingestions] と移動し、[Add Ingestion] から [Add dbt Ingestion] を選択します。
その後、dbt catalog への接続情報を聞かれるので、以下のように入力します。
あとは、Ingestion のスケジュール設定、実行を DB, Airbyte と同様に行えば、メタデータ取り込みは完了です。
4-3. Metabase からのメタデータ取り込み
最後に Metabase からダッシュボードのメタデータを取り込みます。
OpenMetadata の画面で、画面右上の [Settings] ⇒ 左メニューの [Dashboards] ⇒ [Add new] を選択し、次の画面で [Metabase] を選択します。さらに次の画面で名前(なんでもよい)を入力すると、Metabase の接続情報を入力する画面になるので、必要情報を入力します([Host And Port] では最後に /
を入れるとテスト接続がエラーになります)。
その後の Ingestion の画面では、ダッシュボードが参照している DB のサービス名(OpenMetadata 上での名前)を入力します。これがデータリネージを取得するためには必須になります。([Enable Debug Log] は不要ですが有効にすることをお勧めします)
後の Ingestion の実行は他のサービスと同じです。
以上で、OpenMetadata へのメタデータの取り込みは完了です。
5. データリネージの確認
メタデータ取り込み後、例えば [Explore] ⇒ [Dashboards] ⇒ [Sales Dashboard] ⇒ [Lineage] と選択していくと、ダッシュボードのデータがどこから来たかが大元の sales_db から辿ることができます。
OpenMetadata で特徴的なのは dbt による sales_detail / item テーブルから sales_summary テーブルの作成処理で列レベルリネージが取得できていることです。dbt catalog では列レベルリネージは取得できないのですが、OpenMetadata は dbt のコンパイル済み SQL 文をパーズして列レベルリネージの情報を取得しています(内部では SQLLineage という Python ライブラリを使っていたと記憶しています)。
一方で、以下の点は不満です。
- Airbyte による処理は表レベルリネージにとどまっており、列レベルリネージは取れていない。
- DB テーブル ⇒ Metabase 間のリネージがテーブル-ダッシュボード間のリネージになっている。
- 列レベルリネージはあきらめるとしても、少なくともダッシュボードではなくチャートとテーブルのリネージが取れてほしい(ダッシュボードには複数のチャートが含まれるケースもあるので)が、それは実現できていない。
また、データリネージを取得するためには、以下のような制約がありました。
- Airbyte における Source/Destination の名前と、OpenMetadata 上の DB のサービス名が一致する必要がある。
- テーブル名にはスペースを含めてはいけない。(最初、"Sales Summary" というテーブル名を用いていましたが、これだとデータリネージは取れないです)
- Metabase でテーブル/カラム名を分かり易くする変換を無効化する必要がある。
- 本質は DB の物理名と Metabase の論理名を一致させる必要があるということですが、DB 上では英数字で、BI ツール上ではユーザーに分かり易くするために日本語名でというケースは多いので、この制約はちょっと苦しい。
これらは OpenMetadata の実装がまだこなれてない部分なので、今後に期待したいと思います。個人的には現時点ではちょっと運用は辛いかなと思います。