この記事は 気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022 の20日目の記事です。
OpenMetadata とは
OpenMetadata は OSS のデータカタログツールです。最近、SaaS 版の Collate も登場しました。OpenMetadata では、主にメタデータの管理、テーブルレベルのデータリネージやデータプロファイルなどの機能を持っています。
機能の概要については Classmethod さんの記事がわかりやすいです
https://dev.classmethod.jp/articles/introduction-of-open-metadata/
なぜデータカタログが必要か
まず、データカタログは分析基盤の補助システムという位置付けです。
分析基盤の利用者が困ることとして、
- 欲しいデータがどこにあるのかわからない
- データがいつ更新されるのかわからない
- カラムの意味がわからない
- データの分布が知りたい
- データがおかしいときにどこに問い合わせれば良いかわからない
などなど、分析の前に様々なハードルが存在します。
そうしたときに予めドキュメントが整備してあれば、利用者はとても助かると思います。
また、利用者だけでなく運用者にとっても、分析基盤の守りを固める意味でデータカタログを整備していくことがデータの価値を毀損させないためにも重要だと考えます(得てして分析基盤は勝手に壊れることが多いため)。
調査のモチベーション
ずばり、あるテーブルのデータの分布を時系列で知りたい!です。プロダクトのリリース後にデータの分布が変わることはよくあります(想定された変化であったり、バグであったり)。僕の経験から、こういうときは利用者からの問い合わせで発覚することが多かったです。
分析基盤をもっと強固にしたい!利用者から問い合わせを受ける前にデータの分布が変わったことを察知したい!そんな動機から何か使えそうなツールがないかデータカタログの調査を始めました。
そうした時に、OpenMetadata の Profiler が気になったので調べてみることにしました。テーブルのプロファイリングは恐らく様々な SQL が発行されるはずなので、重いテーブルに対して SQL が発行されまくって課金死しないか調べたいと思います(BigQuery を想定しています)。
OpenMetadata の Architecture
https://docs.open-metadata.org/developers/architecture より引用
- データカタログを利用するための Web UI
- API Server(Drop Wizard)
- Web UI からメタデータを検索するための検索エンジン(Elasticsearch)
- 定期的にメタデータを取り込むためのスケジューラー(Airflow)
- メタデータを格納するための DB(MySQL)
触ってみる
やること
- docker-compose.yml を取得する
- OpenMetadata Web UI にアクセスできることを確認する
- Airflow Web UI にアクセスできることを確認する
- OpenMetadata Web UI で BigQuery への接続設定を行う
- BigQuery にサンプルデータを用意する
- OpenMetadata Web UI 上から BigQuery のサンプルデータの Metadata を取り込む
- OpenMetadata Web UI 上から BigQuery のサンプルデータの Profile を行う
- BigQuery で OpenMetadata からどんなクエリが発行されたか確認する
docker-compose.yml を取得する
のページから docker-compose.yml を取得して
$ docker-compose up -d
を実行します(それなりに時間が掛かります)
続いて OpenMetadata の Web UI と Airflow の Web UI を確認してみます
OpenMetadata Web UI にアクセスできることを確認する
http://localhost:8585
admin/admin でログインします
Airflow Web UI にアクセスできることを確認する
http://localhost:8080
admin/admin でログインします
初期状態でいくつか Sample DAG が登録されています
OpenMetadata Web UI で BigQuery への接続設定を行う
Settings を押下します
Databases を押下し、Add new Database Service を押下します
BigQuery を選択して、画面下部の Next を押下します
Service Name を入力します
GCP 上でサービスアカウントを作成、クレデンシャルキーを発行して登録します
接続テストを行い、接続設定を登録します
BigQuery にサンプルデータを用意する
サンプルとして、bigquery-public-data の taxi トリップデータを使うことにします
bigquery-public-data.chicago_taxi_trips.taxi_trips
テーブルを自身のプロジェクトにコピーします
約74GB なので、フルスキャン数回レベルでは課金死には至らなさそうです
OpenMetadata Web UI 上から BigQuery の Metadata を取り込む
さきほどの OpenMetadata の接続設定の画面から Add Ingestion を押下します
取り込むテーブルのフィルタリングを行います(特に設定しない)
dbt の設定ファイルを読み込めるようですが今回はスキップします
取り込み間隔の設定です
今回は手動で実行したいので none を選択して、Add & Deploy を押下します
取り込み設定ができました。Run を押下します
Airflow の Web UI を確認すると、DAG が追加されています
無事、完了したようです
取り込まれたデータを確認します。トップ画面から Tables へ遷移します
taxi_trips が取り込まれています
カラム、型、Description が同期されています
OpenMetadata Web UI 上から BigQuery のサンプルデータの Profile を行う
続いて Profile を行います。Database Services の BigQuery の接続設定から Add Ingestion を行います
ここから Add Profiler Ingestion を押下します
Metadata を取り込んだ時と流れは同じです
Profiler Ingestion のジョブを実行するために、Run を押下します
同じように Airflow に DAG が追加されています
ジョブが完了したら、taxi_trips の Profiler 画面を確認します
Profiler では Null の割合、Unique の割合、Distinct の割合などが確認できます
試しに fare (乗車料金)カラムを見てみます
1回だけの取り込みなのであまり面白くないですが、定点でデータを取り込むとデータの変化を確認することができそうです
(Test を設定できるので、そこから通知ができるのかどうか確認したい。。。時間がなくて確認できず)
BigQuery で OpenMetadata からどんなクエリが発行されたか確認する
下記コマンドで確認すると、Profiling のために発行されたクエリは71本でした
$ bq ls -j -a -n 100
これらのクエリを詳しく見ていきたいと思います
テーブルのレコード数の集計
/* {"app": "OpenMetadata", "version": "0.13.0.1"} */
SELECT count(*) AS `rowCount`, @`param_1` AS `columnCount`, @`param_2` AS `columnNames`
FROM `chicago_taxi_trips`.`taxi_trips`
LIMIT @`param_3`
各カラムに対して、count, min, max, null count などの集計
/* {"app": "OpenMetadata", "version": "0.13.0.1"} */
SELECT avg(LENGTH(`taxi_id`)) AS `mean`, count(`taxi_id`) AS `valuesCount`, count(DISTINCT `taxi_id`) AS `distinctCount`, NULL AS `anon_1`, min(LENGTH(`taxi_id`)) AS `minLength`, NULL AS `anon__1`, max(LENGTH(`taxi_id`)) AS `maxLength`, SUM(CAST(CASE WHEN (`taxi_id` IS NULL) THEN @`param_1` ELSE @`param_2` END AS NUMERIC)) AS `nullCount`, NULL AS `anon__2`, NULL AS `anon__3`
FROM `chicago_taxi_trips`.`taxi_trips`
LIMIT @`param_3`
各カラムに対して、ユニーク数のカウント
/* {"app": "OpenMetadata", "version": "0.13.0.1"} */
WITH `only_once` AS
(SELECT count(`unique_key`) AS `count_1`
FROM `chicago_taxi_trips`.`taxi_trips` GROUP BY `unique_key`
HAVING count(`unique_key`) = @`count_2`)
SELECT count(*) AS `uniqueCount`
FROM `only_once`
LIMIT @`param_1`
何をやっているのか不明です。恐らくカラムの型によって本来発行不要なクエリを発行してそうな感じがします
/* {"app": "OpenMetadata", "version": "0.13.0.1"} */
SELECT NULL AS `anon_1`
FROM `chicago_taxi_trips`.`taxi_trips`
LIMIT @`param_1`
数値型カラムに対して、median の集計
/* {"app": "OpenMetadata", "version": "0.13.0.1"} */
SELECT percentile_cont(`trip_seconds` , 0.5) OVER() AS `median`
FROM `chicago_taxi_trips`.`taxi_trips`
LIMIT @`param_2`
恐らくランダムサンプリング
/* {"app": "OpenMetadata", "version": "0.13.0.1"} */
WITH `taxi_trips_rnd` AS
(SELECT `taxi_trips`.`unique_key` AS `unique_key`, `taxi_trips`.`taxi_id` AS `taxi_id`, `taxi_trips`.`trip_start_timestamp` AS `trip_start_timestamp`, `taxi_trips`.`trip_end_timestamp` AS `trip_end_timestamp`, `taxi_trips`.`trip_seconds` AS `trip_seconds`, `taxi_trips`.`trip_miles` AS `trip_miles`, `taxi_trips`.`pickup_census_tract` AS `pickup_census_tract`, `taxi_trips`.`dropoff_census_tract` AS `dropoff_census_tract`, `taxi_trips`.`pickup_community_area` AS `pickup_community_area`, `taxi_trips`.`dropoff_community_area` AS `dropoff_community_area`, `taxi_trips`.`fare` AS `fare`, `taxi_trips`.`tips` AS `tips`, `taxi_trips`.`tolls` AS `tolls`, `taxi_trips`.`extras` AS `extras`, `taxi_trips`.`trip_total` AS `trip_total`, `taxi_trips`.`payment_type` AS `payment_type`, `taxi_trips`.`company` AS `company`, `taxi_trips`.`pickup_latitude` AS `pickup_latitude`, `taxi_trips`.`pickup_longitude` AS `pickup_longitude`, `taxi_trips`.`pickup_location` AS `pickup_location`, `taxi_trips`.`dropoff_latitude` AS `dropoff_latitude`, `taxi_trips`.`dropoff_longitude` AS `dropoff_longitude`, `taxi_trips`.`dropoff_location` AS `dropoff_location`, MOD(CAST(100*RAND() AS INT64), @`param_1`) AS `random`
FROM `chicago_taxi_trips`.`taxi_trips` )
SELECT `taxi_trips_rnd`.`unique_key` AS `taxi_trips_rnd_unique_key`, `taxi_trips_rnd`.`taxi_id` AS `taxi_trips_rnd_taxi_id`, `taxi_trips_rnd`.`trip_start_timestamp` AS `taxi_trips_rnd_trip_start_timestamp`, `taxi_trips_rnd`.`trip_end_timestamp` AS `taxi_trips_rnd_trip_end_timestamp`, `taxi_trips_rnd`.`trip_seconds` AS `taxi_trips_rnd_trip_seconds`, `taxi_trips_rnd`.`trip_miles` AS `taxi_trips_rnd_trip_miles`, `taxi_trips_rnd`.`pickup_census_tract` AS `taxi_trips_rnd_pickup_census_tract`, `taxi_trips_rnd`.`dropoff_census_tract` AS `taxi_trips_rnd_dropoff_census_tract`, `taxi_trips_rnd`.`pickup_community_area` AS `taxi_trips_rnd_pickup_community_area`, `taxi_trips_rnd`.`dropoff_community_area` AS `taxi_trips_rnd_dropoff_community_area`, `taxi_trips_rnd`.`fare` AS `taxi_trips_rnd_fare`, `taxi_trips_rnd`.`tips` AS `taxi_trips_rnd_tips`, `taxi_trips_rnd`.`tolls` AS `taxi_trips_rnd_tolls`, `taxi_trips_rnd`.`extras` AS `taxi_trips_rnd_extras`, `taxi_trips_rnd`.`trip_total` AS `taxi_trips_rnd_trip_total`, `taxi_trips_rnd`.`payment_type` AS `taxi_trips_rnd_payment_type`, `taxi_trips_rnd`.`company` AS `taxi_trips_rnd_company`, `taxi_trips_rnd`.`pickup_latitude` AS `taxi_trips_rnd_pickup_latitude`, `taxi_trips_rnd`.`pickup_longitude` AS `taxi_trips_rnd_pickup_longitude`, `taxi_trips_rnd`.`pickup_location` AS `taxi_trips_rnd_pickup_location`, `taxi_trips_rnd`.`dropoff_latitude` AS `taxi_trips_rnd_dropoff_latitude`, `taxi_trips_rnd`.`dropoff_longitude` AS `taxi_trips_rnd_dropoff_longitude`, `taxi_trips_rnd`.`dropoff_location` AS `taxi_trips_rnd_dropoff_location`
FROM `taxi_trips_rnd`
LIMIT @`param_2`
以上、Profiling のために発行されたクエリでした
これらのクエリを発行するときにテーブルのパーティションキーを設定する方法があるのではないのかと思うのですが、
(そうでないと課金死の問題が解決できない)今後の課題としたいと思います
まとめ
本記事では OpenMetadata の紹介をさせて頂きました。
当初の目的だった Profilier について残課題はいくつかありますが、
なんとなくどういう動きをするのかイメージがついたのではないかと思います。
docker-compose.yml さえあれば簡単に動かせると思いますので、
皆様も一度 OpenMetadata を触ってみてはいかがでしょうか・