7
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Supershipグループ Advent Calendar 2024Advent Calendar 2024

Day 14

GCPのDataLineageAPI使ってみよう!

Last updated at Posted at 2024-12-13

Supershipグループ Advent Calendar 2024の14日目の記事になります。

概要

業務で、GCPで取り扱っているデータのDataLineageの実装のPoCを行っていました。
日本語文献も少なくなく、分かりづらい点も多く四苦八苦したので書き記します!

DataLineageとは?

データの由来(ソース)や、どのように変換・加工されて現在の状態に至ったのかを追跡して、データの履歴を追う為の可視化を行うものです。

下記の図のように、「データ1とデータ2をdataprocジョブで加工して、データ3に変換した」ことを可視化を行います。

この可視化により特にデータマネジメントにおけるメリットが多々あります。

  • 工程の可視化による、データの不整合の原因の特定が容易
  • データの透明性が得られ、分析結果の信頼性の向上
  • データの参照先や参照元の追跡や確認が容易になるため、データガバナンスの向上

GCPにおけるDataLineage

GCPでは、Dataplex(旧Data Catalog)の機能でDataLineageを行うことができます。

Datalineage APIを有効化することで、BigQueryをはじめとする様々なGCPサービスは自動的に Datalineageを行い、自動化出来ないサービスもDataLineageのAPIを叩く事でリネージを行うことができます。

DataLineage API

DataLineag APIで3つのコンポーネントを操作することで、DataLineageを行うことができます。GCPサービスの自動化も裏でこのAPIを叩いてます。

1. Processes

Processesはデータ処理の定義を指し、特定のデータ処理ワークフローの全体像を把握するための単位として利用されています。

リネージ画面のオレンジ色のマークの部分であり、未定義のプロセスとして表示されている部分

image.png

2. Runs

Runはデータ処理の実行インスタンス(実態)を指します。Processesで定義したデータ処理の状態や実行時間などの情報を管理し、一つのProcessesに複数のRunを関連付けることができます。

リネージ画面の実行 タブの部分で表示される情報

image.png

3. Events

Eventsはデータ処理を行なった結果のデータの流れを示し、データ処理全体の実行時間とデータのソースとターゲットのペアを管理しています。

リネージ画面の矢印の部分

image.png

注意点

リネージ画面に表示するには、Dataplex(Data Catalog)で追跡可能なリソース同士をEventsで関連付けた時だけです。

GCSのデータをLineageしたいならDataLake等でGCSのオブジェクトを追跡できるようにしてください。

APIの仕様と3要素の関係

基本的なDataLineageEventは下記のURLにリクエストを投げることでProcessesRunsEvents を操作できます。

https://datalineage.googleapis.com/v1/{parent}/{API Event}

3要素についての段で述べたように、Processes→Runs→Events のような親子関係になっており{parent}の中身はこの親子関係を指し示す必要があります。

# 各リソースのparent

# Processesの場合
projects/your-project/locations/your-location

# Runsの場合
projects/your-project/locations/your-location/processes/xxxx

# Eventsの場合
projects/your-project/locations/your-locationprocesses/xxxx/runs/yyyy

APIを実際に叩いてみる

APIはCLIで直接実行できないため、PythonやJavaのSDKを使用することをお勧めします。

ドキュメントを参考に実装していきます。

  • Processの作成
  • Runの作成
  • Eventの作成

を行います

基本的にBigquery内で行う処理は自動でLineage取ってくれるのですが、手動でAPIを叩いてLineageを記録します。

準備

  1. DataLineageAPIを有効化します

  2. BigQueryで二つのテーブルを作成します(スキーマは要らないです)

    image.png

  3. リネージの画面を表示して確認します

    image.png

APIを叩く

from google.cloud import datacatalog_lineage_v1
from logging import getLogger, StreamHandler, INFO, Formatter
import sys
import traceback

logger = getLogger(__name__)
logger.setLevel(INFO)
console_handler = StreamHandler(sys.stdout)
logger.addHandler(console_handler)

## Create a Process
def create_process(client: datacatalog_lineage_v1.LineageClient, parent_value: str, process_name: str) -> datacatalog_lineage_v1.Process:
    process = datacatalog_lineage_v1.Process(
        name = parent_value+"/processes/"+process_name
    )
    process_request = datacatalog_lineage_v1.CreateProcessRequest(
        parent = parent_value,
        process = process
    )
    # Make the request
    return client.create_process(request=process_request)

## Create a Run
def create_run(client: datacatalog_lineage_v1.LineageClient, parent_value: str, run_name: str, start_time: str, state: int) -> datacatalog_lineage_v1.Run:
    run = datacatalog_lineage_v1.Run(
        name = parent_value+"/runs/"+run_name,
        start_time = start_time,
        state = datacatalog_lineage_v1.Run.State(state)
    )
    run_request = datacatalog_lineage_v1.CreateRunRequest(
        parent = parent_value,
        run = run
    )
    # Make the request
    return client.create_run(request=run_request)

## Create a EventLink
def create_lineage_events_link(source_fqdn: str, target_fqdn: str) -> datacatalog_lineage_v1.EventLink:
    if source_fqdn is None or target_fqdn is None:
        logger.error("Source and Target FQDN are required")
        raise ValueError("Source and Target FQDN are required")
    source = datacatalog_lineage_v1.EntityReference(
        fully_qualified_name = source_fqdn
    )
    target = datacatalog_lineage_v1.EntityReference(
        fully_qualified_name = target_fqdn
    )
    link = datacatalog_lineage_v1.EventLink(
        source = source,
        target = target
    )
    return link

## Create a Lineage-Event
def create_lineage_events(client: datacatalog_lineage_v1.LineageClient ,parent_value: str ,lineage_event_name: str,start_time,links) -> datacatalog_lineage_v1.LineageEvent:
    lineage_event = datacatalog_lineage_v1.LineageEvent(
        name = parent_value+"/lineageEvents/"+lineage_event_name,
        start_time = start_time,
        links = links
    )
    lineage_event_request = datacatalog_lineage_v1.CreateLineageEventRequest(
        parent=parent_value,
        lineage_event=lineage_event
    )
    # Make the request
    return client.create_lineage_event(request=lineage_event_request)

# 以下は削除したい時に使う
# nameの値は、コンソールlogに表示されるLineage Event created: の後の値を使う
def delete_lineage_event(client: datacatalog_lineage_v1.LineageClient, name: str):
    request = datacatalog_lineage_v1.DeleteLineageEventRequest(name=name)
    client.delete_lineage_event(request=request)
    logger.info("Lineage Event deleted: %s", name)

def main():
    logger.info("===== Start =====")

    # 4つの値を設定する
    parent_value = "projects/your-project/locations/your-location"
    start_time = "2024-12-12T15:39:54.704678+09:00" # ISO 8601 format
    source_fqdn = "bigquery:your-project.your-BQ-dataset.your-BQ-table"
    target_fqdn = "bigquery:your-project.your-BQ-dataset.your-BQ-table"

    
    client = datacatalog_lineage_v1.LineageClient()
    try:
        logger.info("===== Creating a Process =====")
        process_response = create_process(client, parent_value, "process_name7")
        parent_value = process_response.name
        logger.info("Process created: %s", process_response.name)

        logger.info("===== Creating a Run =====")
        run_response = create_run(client, parent_value, "run_name7", start_time, 2)
        parent_value = run_response.name
        logger.info("Run created: %s", run_response.name)

        logger.info("===== Creating a Lineage Event =====")
        link = create_lineage_events_link(source_fqdn, target_fqdn)
        lineage_event_response = create_lineage_events(client, parent_value, "lineage_event_name7", start_time, [link])
        logger.info("Lineage Event created: %s", lineage_event_response.name)
        return lineage_event_response.name
    
    except Exception as e:
        logger.error("Error: %s", e)
        logger.error(traceback.format_exc())
        return None

if __name__ == "__main__":
    main()

main関数にある4つの値を各々の環境に合わせて設定して下さい!

  • parent_value = "projects/your-project/locations/your-location"
  • start_time = "2024-12-12T15:39:54.704678+09:00" # ISO 8601 format
  • source_fqdn = srcにしたいテーブルの完全修飾名
  • target_fqdn = targetにしたいテーブルの完全修飾名

実行するとこのようになります。

$ python3 datalineage_api.py
===== Start =====
===== Creating a Process =====
Process created: projects/xxxxx/locations/asia-northeast1/processes/process_test
===== Creating a Run =====
Run created: projects/xxxxx/locations/asia-northeast1/processes/process_test/runs/run_test
===== Creating a Lineage Event =====
Lineage Event created: projects/xxxx/locations/asia-northeast1/processes/process_test/runs/run_test/lineageEvents/lineage_event_test

Lineage画面を確認する

image.png

srcとtargetの二つのテーブルがLineage画面上で追えることが出来るようになります。

いっぱい出来るとこのように、複数のデータから作成したことが分かるようになります。
image.png

おわりに

Supershipではプロダクト開発やサービス開発に関わる人を絶賛募集しております。
ご興味がある方は以下リンクよりご確認ください。

Supership 採用サイト

是非ともよろしくお願いします。

7
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?