0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Dataplex でカスタムエントリとリネージを手動で登録する

Last updated at Posted at 2024-05-17

はじめに

本記事では、Pythonの標準SDKを利用して、Dataplexのカスタムエントリとリネージを作成してみます。カスタムエントリは、Dataplexがメタデータの自動収集の対象としているプロダクト以外のメタデータ情報(例えばオンプレのDBのメタデータなど)をDataplexで管理する場合に用います。また、それらのカスタムエントリ感の依存関係を管理するリネージも同様に作成してみます。

カスタムエントリとは

Dataplexの機能であるメタデータ管理のData Catalogでは、BigQueryなどのデータストアのメタデータ管理ができます。BigQueryなどGoogle Cloudファースト製品に関しては、自動的にメタデータのエントリが作られ、例えばBigQueryテーブルのエントリの場合、テーブル間の依存関係も自動的にリネージとして登録され参照することができます。次のような感じです。

カスタムエントリグループの作成

BigQueryのテーブルのメタデータ情報などを管理するエントリは、上位にエントリグループを持ちます。エントリグループには複数のエントリを関連付けられます。エントリグループの作成は以下のコードを実行することでできます。(project_idは、適切に書き換えてください)

create_custom_entry_group.py
from google.cloud import datacatalog_v1

project_id = "my-project"
entry_group_id = "sample_custom_entry"
location = "us-central1"

datacatalog = datacatalog_v1.DataCatalogClient()

# Create an Entry Group.
entry_group_obj = datacatalog_v1.types.EntryGroup()
entry_group_obj.display_name = "Sample entry group"
entry_group_obj.description = "This Entry Group represents an external system"

entry_group = datacatalog.create_entry_group(
    parent=datacatalog_v1.DataCatalogClient.common_location_path(
        project_id, location
    ),
    entry_group_id=entry_group_id,
    entry_group=entry_group_obj,
)
entry_group_name = entry_group.name
print("Created entry group: {}".format(entry_group_name))

カスタムエントリの作成

次にカスタムエントリの作成です。ちなみに、公式サイトのサンプルコードをほぼそのまま利用しています。

create_custom_entry.py
from google.cloud import datacatalog_v1

project_id = "my-project"
entry_group_id = "sample_custom_entry"
entry_id = "source_entry"
location = "us-central1"

datacatalog = datacatalog_v1.DataCatalogClient()


# Create an Entry.
entry = datacatalog_v1.types.Entry()
entry.user_specified_system = "onprem_data_system"
entry.user_specified_type = "onprem_data_asset"
entry.display_name = "source"
entry.description = "This is the source for lineage."
entry.linked_resource = "//my-onprem-server.com/dataAssets/Source"
entry.fully_qualified_name = "custom:sample-lineage.source"

# Create the Schema, this is optional.
entry.schema.columns.append(
    datacatalog_v1.types.ColumnSchema(
        column="first_column",
        type_="STRING",
        description="This columns consists of ....",
        mode=None,
    )
)

entry.schema.columns.append(
    datacatalog_v1.types.ColumnSchema(
        column="second_column",
        type_="DOUBLE",
        description="This columns consists of ....",
        mode=None,
    )
)

entry = datacatalog.create_entry(
    parent=datacatalog.entry_group_path(project_id, location, entry_group_id),
    entry_id=entry_id,
    entry=entry
)
print("Created entry: {}".format(entry.name))

次のリネージ作成用にもう一つカスタムエントリを fully_qualified_name = "custom:sample-lineage.target" で作っておきます。他のエントリの中身は適当に入れてください。

リネージの作成

先ほど作った以下のfully_qualified_nameを持ったエントリをつなぐリネージを登録したいと思います。

  • custom:sample-lineage.source
  • custom:sample-lineage.target
create_lineageEvent.py
from google.cloud import datacatalog_lineage_v1
from google.protobuf.timestamp_pb2 import Timestamp

project_id = "my-project"

# Create a client
client = datacatalog_lineage_v1.LineageClient()

# Process and Run
process = {
    "name": f"projects/{project_id}/locations/us/processes/sample_proecess",
    "display_name": "My Data Transformation Process",
}

start_timestamp = Timestamp()
start_timestamp.GetCurrentTime()

end_timestamp = Timestamp()
end_timestamp.FromSeconds(start_timestamp.ToSeconds() + 3600)

run = {
    "name": process["name"] + "/runs/sample_run",
    "display_name": "Run 1",
    "start_time": start_timestamp,  # Set start time
    "end_time": end_timestamp,    # Set end time
}

# Creating Lineage Event
source = {
    "fully_qualified_name": "custom:sample-lineage.source"
}
target = {
    "fully_qualified_name": "custom:sample-lineage.target"
}

lineage_event = {
    "links": [{
        "source": source,
        "target": target,
    }],
    "start_time": start_timestamp,
    "end_time": end_timestamp
}

process_result = client.create_process(request={"parent": f"projects/{project_id}/locations/us", "process": process})
print(f"Created process: {process_result.name}")

run_result = client.create_run(request={"parent": process["name"], "run": run})
print(f"Created run: {run_result.name}")

# Create the lineage event
event_result = client.create_lineage_event(request={"parent": run["name"], "lineage_event": lineage_event})
print(f"Created lineage event: {event_result.name}")

すると、こんなふうにリネージが可視化される

Screenshot 2024-05-17 at 16.30.35.png

以上。

おまけ:Geminiでコード生成してみた

ちなみに、リネージ作るところはサンプルコードがちょっとシンプル過ぎたのと、リファレンス見るのがかったるかったのでGeminiにコード生成依頼してみたら、結構いい感じで帰ってきたので、細かい修正は入りましたがこれベースで書きました。
Screenshot 2024-05-17 at 15.49.12.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?