Supershipグループ Advent Calendar 2024の14日目の記事になります。
概要
業務で、GCPで取り扱っているデータのDataLineageの実装のPoCを行っていました。
日本語文献も少なくなく、分かりづらい点も多く四苦八苦したので書き記します!
DataLineageとは?
データの由来(ソース)や、どのように変換・加工されて現在の状態に至ったのかを追跡して、データの履歴を追う為の可視化を行うものです。
下記の図のように、「データ1とデータ2をdataprocジョブで加工して、データ3に変換した」ことを可視化を行います。
この可視化により特にデータマネジメントにおけるメリットが多々あります。
- 工程の可視化による、データの不整合の原因の特定が容易
- データの透明性が得られ、分析結果の信頼性の向上
- データの参照先や参照元の追跡や確認が容易になるため、データガバナンスの向上
GCPにおけるDataLineage
GCPでは、Dataplex(旧Data Catalog)の機能でDataLineageを行うことができます。
Datalineage APIを有効化することで、BigQueryをはじめとする様々なGCPサービスは自動的に Datalineageを行い、自動化出来ないサービスもDataLineageのAPIを叩く事でリネージを行うことができます。
DataLineage API
DataLineag APIで3つのコンポーネントを操作することで、DataLineageを行うことができます。GCPサービスの自動化も裏でこのAPIを叩いてます。
1. Processes
Processesはデータ処理の定義を指し、特定のデータ処理ワークフローの全体像を把握するための単位として利用されています。
リネージ画面のオレンジ色のマークの部分であり、未定義のプロセス
として表示されている部分
2. Runs
Runはデータ処理の実行インスタンス(実態)を指します。Processesで定義したデータ処理の状態や実行時間などの情報を管理し、一つのProcessesに複数のRunを関連付けることができます。
リネージ画面の実行
タブの部分で表示される情報
3. Events
Eventsはデータ処理を行なった結果のデータの流れを示し、データ処理全体の実行時間とデータのソースとターゲットのペアを管理しています。
リネージ画面の矢印の部分
注意点
リネージ画面に表示するには、Dataplex(Data Catalog)で追跡可能なリソース同士をEventsで関連付けた時だけです。
GCSのデータをLineageしたいならDataLake等でGCSのオブジェクトを追跡できるようにしてください。
APIの仕様と3要素の関係
基本的なDataLineageEventは下記のURLにリクエストを投げることでProcesses
、Runs
、Events
を操作できます。
https://datalineage.googleapis.com/v1/{parent}/{API Event}
3要素についての段で述べたように、Processes→Runs→Events
のような親子関係になっており{parent}の中身はこの親子関係を指し示す必要があります。
# 各リソースのparent
# Processesの場合
projects/your-project/locations/your-location
# Runsの場合
projects/your-project/locations/your-location/processes/xxxx
# Eventsの場合
projects/your-project/locations/your-locationprocesses/xxxx/runs/yyyy
APIを実際に叩いてみる
APIはCLIで直接実行できないため、PythonやJavaのSDKを使用することをお勧めします。
ドキュメントを参考に実装していきます。
- Processの作成
- Runの作成
- Eventの作成
を行います
基本的にBigquery内で行う処理は自動でLineage取ってくれるのですが、手動でAPIを叩いてLineageを記録します。
準備
APIを叩く
from google.cloud import datacatalog_lineage_v1
from logging import getLogger, StreamHandler, INFO, Formatter
import sys
import traceback
logger = getLogger(__name__)
logger.setLevel(INFO)
console_handler = StreamHandler(sys.stdout)
logger.addHandler(console_handler)
## Create a Process
def create_process(client: datacatalog_lineage_v1.LineageClient, parent_value: str, process_name: str) -> datacatalog_lineage_v1.Process:
process = datacatalog_lineage_v1.Process(
name = parent_value+"/processes/"+process_name
)
process_request = datacatalog_lineage_v1.CreateProcessRequest(
parent = parent_value,
process = process
)
# Make the request
return client.create_process(request=process_request)
## Create a Run
def create_run(client: datacatalog_lineage_v1.LineageClient, parent_value: str, run_name: str, start_time: str, state: int) -> datacatalog_lineage_v1.Run:
run = datacatalog_lineage_v1.Run(
name = parent_value+"/runs/"+run_name,
start_time = start_time,
state = datacatalog_lineage_v1.Run.State(state)
)
run_request = datacatalog_lineage_v1.CreateRunRequest(
parent = parent_value,
run = run
)
# Make the request
return client.create_run(request=run_request)
## Create a EventLink
def create_lineage_events_link(source_fqdn: str, target_fqdn: str) -> datacatalog_lineage_v1.EventLink:
if source_fqdn is None or target_fqdn is None:
logger.error("Source and Target FQDN are required")
raise ValueError("Source and Target FQDN are required")
source = datacatalog_lineage_v1.EntityReference(
fully_qualified_name = source_fqdn
)
target = datacatalog_lineage_v1.EntityReference(
fully_qualified_name = target_fqdn
)
link = datacatalog_lineage_v1.EventLink(
source = source,
target = target
)
return link
## Create a Lineage-Event
def create_lineage_events(client: datacatalog_lineage_v1.LineageClient ,parent_value: str ,lineage_event_name: str,start_time,links) -> datacatalog_lineage_v1.LineageEvent:
lineage_event = datacatalog_lineage_v1.LineageEvent(
name = parent_value+"/lineageEvents/"+lineage_event_name,
start_time = start_time,
links = links
)
lineage_event_request = datacatalog_lineage_v1.CreateLineageEventRequest(
parent=parent_value,
lineage_event=lineage_event
)
# Make the request
return client.create_lineage_event(request=lineage_event_request)
# 以下は削除したい時に使う
# nameの値は、コンソールlogに表示されるLineage Event created: の後の値を使う
def delete_lineage_event(client: datacatalog_lineage_v1.LineageClient, name: str):
request = datacatalog_lineage_v1.DeleteLineageEventRequest(name=name)
client.delete_lineage_event(request=request)
logger.info("Lineage Event deleted: %s", name)
def main():
logger.info("===== Start =====")
# 4つの値を設定する
parent_value = "projects/your-project/locations/your-location"
start_time = "2024-12-12T15:39:54.704678+09:00" # ISO 8601 format
source_fqdn = "bigquery:your-project.your-BQ-dataset.your-BQ-table"
target_fqdn = "bigquery:your-project.your-BQ-dataset.your-BQ-table"
client = datacatalog_lineage_v1.LineageClient()
try:
logger.info("===== Creating a Process =====")
process_response = create_process(client, parent_value, "process_name7")
parent_value = process_response.name
logger.info("Process created: %s", process_response.name)
logger.info("===== Creating a Run =====")
run_response = create_run(client, parent_value, "run_name7", start_time, 2)
parent_value = run_response.name
logger.info("Run created: %s", run_response.name)
logger.info("===== Creating a Lineage Event =====")
link = create_lineage_events_link(source_fqdn, target_fqdn)
lineage_event_response = create_lineage_events(client, parent_value, "lineage_event_name7", start_time, [link])
logger.info("Lineage Event created: %s", lineage_event_response.name)
return lineage_event_response.name
except Exception as e:
logger.error("Error: %s", e)
logger.error(traceback.format_exc())
return None
if __name__ == "__main__":
main()
main関数にある4つの値を各々の環境に合わせて設定して下さい!
- parent_value = "projects/your-project/locations/your-location"
- start_time = "2024-12-12T15:39:54.704678+09:00" # ISO 8601 format
- source_fqdn = srcにしたいテーブルの完全修飾名
- target_fqdn = targetにしたいテーブルの完全修飾名
実行するとこのようになります。
$ python3 datalineage_api.py
===== Start =====
===== Creating a Process =====
Process created: projects/xxxxx/locations/asia-northeast1/processes/process_test
===== Creating a Run =====
Run created: projects/xxxxx/locations/asia-northeast1/processes/process_test/runs/run_test
===== Creating a Lineage Event =====
Lineage Event created: projects/xxxx/locations/asia-northeast1/processes/process_test/runs/run_test/lineageEvents/lineage_event_test
Lineage画面を確認する
srcとtargetの二つのテーブルがLineage画面上で追えることが出来るようになります。
いっぱい出来るとこのように、複数のデータから作成したことが分かるようになります。
おわりに
Supershipではプロダクト開発やサービス開発に関わる人を絶賛募集しております。
ご興味がある方は以下リンクよりご確認ください。
是非ともよろしくお願いします。