はじめに
本記事では、Pythonの標準SDKを利用して、Dataplexのカスタムエントリとリネージを作成してみます。カスタムエントリは、Dataplexがメタデータの自動収集の対象としているプロダクト以外のメタデータ情報(例えばオンプレのDBのメタデータなど)をDataplexで管理する場合に用います。また、それらのカスタムエントリ感の依存関係を管理するリネージも同様に作成してみます。
カスタムエントリとは
Dataplexの機能であるメタデータ管理のData Catalogでは、BigQueryなどのデータストアのメタデータ管理ができます。BigQueryなどGoogle Cloudファースト製品に関しては、自動的にメタデータのエントリが作られ、例えばBigQueryテーブルのエントリの場合、テーブル間の依存関係も自動的にリネージとして登録され参照することができます。次のような感じです。
カスタムエントリグループの作成
BigQueryのテーブルのメタデータ情報などを管理するエントリは、上位にエントリグループを持ちます。エントリグループには複数のエントリを関連付けられます。エントリグループの作成は以下のコードを実行することでできます。(project_idは、適切に書き換えてください)
from google.cloud import datacatalog_v1
project_id = "my-project"
entry_group_id = "sample_custom_entry"
location = "us-central1"
datacatalog = datacatalog_v1.DataCatalogClient()
# Create an Entry Group.
entry_group_obj = datacatalog_v1.types.EntryGroup()
entry_group_obj.display_name = "Sample entry group"
entry_group_obj.description = "This Entry Group represents an external system"
entry_group = datacatalog.create_entry_group(
parent=datacatalog_v1.DataCatalogClient.common_location_path(
project_id, location
),
entry_group_id=entry_group_id,
entry_group=entry_group_obj,
)
entry_group_name = entry_group.name
print("Created entry group: {}".format(entry_group_name))
カスタムエントリの作成
次にカスタムエントリの作成です。ちなみに、公式サイトのサンプルコードをほぼそのまま利用しています。
from google.cloud import datacatalog_v1
project_id = "my-project"
entry_group_id = "sample_custom_entry"
entry_id = "source_entry"
location = "us-central1"
datacatalog = datacatalog_v1.DataCatalogClient()
# Create an Entry.
entry = datacatalog_v1.types.Entry()
entry.user_specified_system = "onprem_data_system"
entry.user_specified_type = "onprem_data_asset"
entry.display_name = "source"
entry.description = "This is the source for lineage."
entry.linked_resource = "//my-onprem-server.com/dataAssets/Source"
entry.fully_qualified_name = "custom:sample-lineage.source"
# Create the Schema, this is optional.
entry.schema.columns.append(
datacatalog_v1.types.ColumnSchema(
column="first_column",
type_="STRING",
description="This columns consists of ....",
mode=None,
)
)
entry.schema.columns.append(
datacatalog_v1.types.ColumnSchema(
column="second_column",
type_="DOUBLE",
description="This columns consists of ....",
mode=None,
)
)
entry = datacatalog.create_entry(
parent=datacatalog.entry_group_path(project_id, location, entry_group_id),
entry_id=entry_id,
entry=entry
)
print("Created entry: {}".format(entry.name))
次のリネージ作成用にもう一つカスタムエントリを fully_qualified_name = "custom:sample-lineage.target"
で作っておきます。他のエントリの中身は適当に入れてください。
リネージの作成
先ほど作った以下のfully_qualified_nameを持ったエントリをつなぐリネージを登録したいと思います。
custom:sample-lineage.source
custom:sample-lineage.target
from google.cloud import datacatalog_lineage_v1
from google.protobuf.timestamp_pb2 import Timestamp
project_id = "my-project"
# Create a client
client = datacatalog_lineage_v1.LineageClient()
# Process and Run
process = {
"name": f"projects/{project_id}/locations/us/processes/sample_proecess",
"display_name": "My Data Transformation Process",
}
start_timestamp = Timestamp()
start_timestamp.GetCurrentTime()
end_timestamp = Timestamp()
end_timestamp.FromSeconds(start_timestamp.ToSeconds() + 3600)
run = {
"name": process["name"] + "/runs/sample_run",
"display_name": "Run 1",
"start_time": start_timestamp, # Set start time
"end_time": end_timestamp, # Set end time
}
# Creating Lineage Event
source = {
"fully_qualified_name": "custom:sample-lineage.source"
}
target = {
"fully_qualified_name": "custom:sample-lineage.target"
}
lineage_event = {
"links": [{
"source": source,
"target": target,
}],
"start_time": start_timestamp,
"end_time": end_timestamp
}
process_result = client.create_process(request={"parent": f"projects/{project_id}/locations/us", "process": process})
print(f"Created process: {process_result.name}")
run_result = client.create_run(request={"parent": process["name"], "run": run})
print(f"Created run: {run_result.name}")
# Create the lineage event
event_result = client.create_lineage_event(request={"parent": run["name"], "lineage_event": lineage_event})
print(f"Created lineage event: {event_result.name}")
すると、こんなふうにリネージが可視化される
以上。
おまけ:Geminiでコード生成してみた
ちなみに、リネージ作るところはサンプルコードがちょっとシンプル過ぎたのと、リファレンス見るのがかったるかったのでGeminiにコード生成依頼してみたら、結構いい感じで帰ってきたので、細かい修正は入りましたがこれベースで書きました。