9
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Streamlit in SnowflakeとACCESS_HISTORYビューでリネージュ グラフを作成してみた

Last updated at Posted at 2023-11-16

実はSnowflakeではCTASなどでテーブルを更新すると、ACCESS_HISTORYビューにカラムリネージュの情報が記録されています。
この記事ではStreamlit in SnowflakeとACCESS_HISTORYビューでリネージュ グラフを作成してみます。

本記事の内容は2023年11月16日時点の情報に基づきます。

ACCESS_HISTORYビューのカラムリネージュ

概要

ACCOUNT_USAGEスキーマのACCESS_HISTORYビューにはオブジェクトへの操作記録が保存されています。
さらにテーブルを更新したとき、参照先テーブルのカラムが更新先テーブルのどのカラムに反映されているか(カラムリネージュ)についても記録されています。

カラムリネージュが記録される操作は以下のとおりです。

CREATE TABLE ... AS SELECT (CTAS)
CREATE TABLE ... CLONE
INSERT ... SELECT ...
MERGE
UPDATE

テーブルをCTASで作成して、ACCESS_HISTORYビューを確認してみます。

以下のように元テーブル t1 とその派生先テーブル ct1 を作成します。

create or replace table t1 (c1 int, c2 int);
create or replace table ct1 as select c1 as cc1, c2 as cc2, c1 + c2 as cc3 from t1;

ACCESS_HISTORYビューの OBJECTS_MODIFIED 列を参照します。
※1 ACCOUNT_USAGEへの反映は遅いため、少し待ちます。(最大3時間)
※2 履歴の量が多いためCTASで作成したテーブル名で絞り込みます。

select *
from snowflake.account_usage.access_history
where objects_modified[0].objectName = 'LINEAGE.PUBLIC.CT1'
order by query_start_time desc;

OBJECTS_MODIFIED 列の出力結果

[
  {
    "columns": [
      {
        "baseSources": [
          {
            "columnName": "C2",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          },
          {
            "columnName": "C1",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          }
        ],
        "columnId": 1034,
        "columnName": "CC3",
        "directSources": [
          {
            "columnName": "C2",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          },
          {
            "columnName": "C1",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "C2",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          }
        ],
        "columnId": 1033,
        "columnName": "CC2",
        "directSources": [
          {
            "columnName": "C2",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          }
        ]
      },
      {
        "baseSources": [
          {
            "columnName": "C1",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          }
        ],
        "columnId": 1032,
        "columnName": "CC1",
        "directSources": [
          {
            "columnName": "C1",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          }
        ]
      }
    ],
    "objectDomain": "Table",
    "objectId": 1040,
    "objectName": "LINEAGE.PUBLIC.CT1"
  }
]

多層構造になっているため、2層に分けて中身の解説をします。

  1. root
    変更のあったテーブルがリストされます。テーブル内のカラムの情報は colums に格納されます。

    [
      {
        "columns": []
        "objectDomain": "Table",
        "objectId": 1040,
        "objectName": "LINEAGE.PUBLIC.CT1"
      }
    ]
    
  2. columns の中身
    更新先テーブル( CT1 )のカラムがリストされます。下のJSONはリストされたカラムのうちの一つです。
    baseSources 内のカラムをソースとして columnName のカラムが作成されたことが記録されます。今回のリネージュグラフには baseSources の情報を使用します。

    ビューを経由してテーブルにアクセスした場合、 directSources に参照したビュー、 baseSources にビューが参照しているテーブルの情報が記録されます。 ビューがネストされている場合、直接参照したビューとビューが参照しているテーブルの情報以外は記録されません。ビューの関係がすべて記録されないため、今回のリネージュグラフに directSources は使いません。

      {
        "baseSources": [
          {
            "columnName": "C2",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          },
          {
            "columnName": "C1",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          }
        ],
        "columnId": 1034,
        "columnName": "CC3",
        "directSources": [
          {
            "columnName": "C2",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          },
          {
            "columnName": "C1",
            "objectDomain": "Table",
            "objectId": 10,
            "objectName": "LINEAGE.PUBLIC.T1"
          }
        ]
      },

リネージュ情報の出力

リネージュに必要な情報をSQLで出力します。

select
    query_id,
    om.value:objectDomain::string as om_object_domain,
    om.value:objectName::string as om_object_name,
    col.value:columnName::string as om_column_name,
    bs.value:columnName::string as bs_column_name,
    bs.value:objectName::string as bs_object_name
from 
    snowflake.account_usage.access_history as ah, 
    lateral flatten(input => ah.objects_modified) as om,
    lateral flatten(input => om.value:columns) as col,
    lateral flatten(input => col.value:baseSources) as bs
;

出力例

QUERY_ID OM_OBJECT_DOMAIN OM_OBJECT_NAME OM_COLUMN_NAME BS_COLUMN_NAME BS_OBJECT_NAME
01afc0b1-3202-0f64-0002-4542000141d2 Table LINEAGE.PUBLIC.CT1 CC2 C2 LINEAGE.PUBLIC.T1
01afc0b1-3202-0f64-0002-4542000141d2 Table LINEAGE.PUBLIC.CT1 CC3 C1 LINEAGE.PUBLIC.T1
01afc0b1-3202-0f64-0002-4542000141d2 Table LINEAGE.PUBLIC.CT1 CC3 C2 LINEAGE.PUBLIC.T1
01afc0b1-3202-0f64-0002-4542000141d2 Table LINEAGE.PUBLIC.CT1 CC1 C1 LINEAGE.PUBLIC.T1

Graphviz

リネージュグラフの作成にGraphvizを使用します。

概要

GraphvizはOSSのグラフ可視化ツールです。dot言語という独自言語で図形を作成できます。

Streamlit in Snowflakeでの利用

Streamlit in Snowflakeではstreamlitオブジェクトの graphviz_chart 関数にdot言語を直接入力することで、Graphvizの図を作成することができます。Streamlit in SnowflakeではPythonのgraphvizライブラリを使用できない点に注意してください。

表の表示

Graphvizでは表とグラフを組み合わせた図を作成可能です。今回のリネージュグラフにはこの機能を使います。

公式ドキュメント

dot言語でリネージュを表現してみます。

リネージュで表現したいSQL(再掲)

create or replace table t1 (c1 int, c2 int);
create or replace table ct1 as select c1 as cc1, c2 as cc2, c1 + c2 as cc3 from t1;

dot言語による表記

digraph {
graph [pad="0.5", nodesep="0.5", ranksep="2"];
node [shape=plain]
rankdir=LR;

"LINEAGE.PUBLIC.T1" [
    label=<
        <table border="0" cellborder="1" cellspacing="0">
        <tr><td><i>LINEAGE.PUBLIC.T1</i></td></tr>
        <tr><td port="C2">C2</td></tr>
        <tr><td port="C1">C1</td></tr>
        </table>
    >
];

"LINEAGE.PUBLIC.CT1" [
    label=<
        <table border="0" cellborder="1" cellspacing="0">
        <tr><td><i>LINEAGE.PUBLIC.CT1</i></td></tr>
        <tr><td port="CC2">CC2</td></tr>
        <tr><td port="CC1">CC1</td></tr>
        <tr><td port="CC3">CC3</td></tr>
        </table>
    >
];

"LINEAGE.PUBLIC.T1":"C2" -> "LINEAGE.PUBLIC.CT1":"CC2";
"LINEAGE.PUBLIC.T1":"C1" -> "LINEAGE.PUBLIC.CT1":"CC1";
"LINEAGE.PUBLIC.T1":"C1" -> "LINEAGE.PUBLIC.CT1":"CC3";
"LINEAGE.PUBLIC.T1":"C2" -> "LINEAGE.PUBLIC.CT1":"CC3";
}

Graphvizによる画像出力

image.png

コード

以上を踏まえて、Streamlitのコードに落とし込むと以下のようになります。
テーブル数が多いと全く表示されないため、 start_table と関係のあるテーブルだけ表示しています。

import json

import pandas as pd

from snowflake.snowpark import Session
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.exceptions import SnowparkSessionException
import streamlit as st


# Sessionの取得
try:
    # Streamlit in Snowflakeで実行する場合
    session = get_active_session()
except SnowparkSessionException:
    # ローカル環境で実行する場合
    with open('./snowflake_connection_parameters.json') as f:
        connection_parameters = json.load(f)
    session = Session.builder.configs(connection_parameters).create() 

def query_lineage_df(session):
    # Snowflakeからリネージュの情報を取得
    query_text = '''
select
    query_id,
    om.value:objectDomain::string as om_object_domain,
    om.value:objectName::string as om_object_name,
    col.value:columnName::string as om_column_name,
    bs.value:columnName::string as bs_column_name,
    bs.value:objectName::string as bs_object_name
from 
    snowflake.account_usage.access_history as ah, 
    lateral flatten(input => ah.objects_modified) as om,
    lateral flatten(input => om.value:columns) as col,
    lateral flatten(input => col.value:baseSources) as bs
;
'''
    return session.sql(query_text).to_pandas()

def query_lineage_parent_df(session, start_table):
    # Snowflakeから親側のリネージュの情報を取得
    query_text = f'''
with lineage as (
    select
        query_id,
        om.value:objectDomain::string as om_object_domain,
        om.value:objectName::string as om_object_name,
        col.value:columnName::string as om_column_name,
        bs.value:columnName::string as bs_column_name,
        bs.value:objectName::string as bs_object_name
    from 
        snowflake.account_usage.access_history as ah, 
        lateral flatten(input => ah.objects_modified) as om,
        lateral flatten(input => om.value:columns) as col,
        lateral flatten(input => col.value:baseSources) as bs
)
select
    om_object_name,
    om_column_name,
    bs_column_name,
    bs_object_name
from lineage
    start with om_object_name = '{start_table}'
    connect by om_object_name = prior bs_object_name
;
'''
    return session.sql(query_text).to_pandas()

def query_lineage_child_df(session, start_table):
    # Snowflakeから子側のリネージュの情報を取得
    query_text = f'''
with lineage as (
    select
        query_id,
        om.value:objectDomain::string as om_object_domain,
        om.value:objectName::string as om_object_name,
        col.value:columnName::string as om_column_name,
        bs.value:columnName::string as bs_column_name,
        bs.value:objectName::string as bs_object_name
    from 
        snowflake.account_usage.access_history as ah, 
        lateral flatten(input => ah.objects_modified) as om,
        lateral flatten(input => om.value:columns) as col,
        lateral flatten(input => col.value:baseSources) as bs
)
select
    om_object_name,
    om_column_name,
    bs_column_name,
    bs_object_name
from lineage
    start with bs_object_name = '{start_table}'
    connect by bs_object_name = prior om_object_name
;
'''
    return session.sql(query_text).to_pandas()

def query_column_df(session):
    # テーブルとカラムの情報を取得
    # データベースによらず情報を取得するためaccount_usageから取得する。
    query_text = """
select
    column_name,
    table_catalog || '.' || table_schema || '.' || table_name as table_name
from snowflake.account_usage.columns;
"""
    return session.sql(query_text).to_pandas()

def query_column_df(session):
    # テーブルとカラムの情報を取得
    # データベースによらず情報を取得するためaccount_usageから取得する。
    query_text = """
select
    column_name,
    table_catalog || '.' || table_schema || '.' || table_name as table_name
from snowflake.account_usage.columns;
"""
    return session.sql(query_text).to_pandas()

class Table:
    def __init__(self, name, columns):
        self.name = name
        self.columns = columns

def generate_table(table_name, column_names):
    column_table_str = ''
    for column_name in column_names:
        column_table_str += f'        <tr><td port="{column_name}">{column_name}</td></tr>\n'
    return f'''
"{table_name}" [
    label=<
        <table border="0" cellborder="1" cellspacing="0">
        <tr><td><i>{table_name}</i></td></tr>
{column_table_str}
        </table>
    >
];
'''

def genetate_node(from_table_names, from_column_names, to_table_names, to_column_names):
    result = ''
    for from_table_name, from_column_name, to_table_name, to_column_name in zip(from_table_names, from_column_names, to_table_names, to_column_names):
        result += f'''"{from_table_name}":"{from_column_name}" -> "{to_table_name}":"{to_column_name}";\n'''
    return result

def generate_graph(graph_parts):
    return f'''digraph {{
graph [pad="0.5", nodesep="0.5", ranksep="2"];
node [shape=plain]
rankdir=LR;
{graph_parts}
}}'''

def groupby_table(df, table_column_name, column_column_name):
    result = []
    for name, group in df.groupby(table_column_name):
        columns = group[column_column_name].values.tolist()
        result.append(Table(name, columns))
    return result

# Streamlitの画面表示
st.title("Lineage generator")

start_table = st.text_input('Input table name', 'DATABASE.SCHEMA.TABLE')
# Snowflakeからデータの取得
# リネージュ情報を取得
# lineage_df = query_lineage_df(session)
lineage_child_df = query_lineage_child_df(session, start_table) 
lineage_parent_df = query_lineage_parent_df(session, start_table)
# リネージュに関係のあるテーブルを抽出
lineage_table_df = pd.Series()
lineage_table_df = pd.concat([lineage_table_df, lineage_child_df['BS_OBJECT_NAME'], lineage_child_df['OM_OBJECT_NAME']])
lineage_table_df = pd.concat([lineage_table_df, lineage_parent_df['BS_OBJECT_NAME'], lineage_parent_df['OM_OBJECT_NAME']])
# 各テーブルとカラムを取得
column_df = query_column_df(session)
column_df = column_df[column_df['TABLE_NAME'].isin(lineage_table_df)]

# Graphvizで表示するためにdot言語の文を生成
graph_parts = ''
table_list = groupby_table(column_df, 'TABLE_NAME', 'COLUMN_NAME')
for table in table_list:
    graph_parts += generate_table(table.name, table.columns)
# graph_parts += genetate_node(lineage_df['BS_OBJECT_NAME'], lineage_df['BS_COLUMN_NAME'], lineage_df['OM_OBJECT_NAME'], lineage_df['OM_COLUMN_NAME'])
graph_parts += genetate_node(lineage_child_df['BS_OBJECT_NAME'], lineage_child_df['BS_COLUMN_NAME'], lineage_child_df['OM_OBJECT_NAME'], lineage_child_df['OM_COLUMN_NAME'])
graph_parts += genetate_node(lineage_parent_df['BS_OBJECT_NAME'], lineage_parent_df['BS_COLUMN_NAME'], lineage_parent_df['OM_OBJECT_NAME'], lineage_parent_df['OM_COLUMN_NAME'])
graph = generate_graph(graph_parts)

# Graphvizのグラフを表示
st.graphviz_chart(graph)

アプリ画面

image.png

注意点

2023年10月1日現在、Streamlit in Snowflakeで動作するPythonのバージョンは 3.8 です。そのため dataclass でlistを指定できなかったりと、意外とはまるポイントがあります。

image.png

仲間募集

NTTデータ テクノロジーコンサルティング事業本部 では、以下の職種を募集しています。

1. クラウド技術を活用したデータ分析プラットフォームの開発・構築(ITアーキテクト/クラウドエンジニア)

クラウド/プラットフォーム技術の知見に基づき、DWH、BI、ETL領域におけるソリューション開発を推進します。
https://enterprise-aiiot.nttdata.com/recruitment/career_sp/cloud_engineer

2. データサイエンス領域(データサイエンティスト/データアナリスト)

データ活用/情報処理/AI/BI/統計学などの情報科学を活用し、よりデータサイエンスの観点から、データ分析プロジェクトのリーダーとしてお客様のDX/デジタルサクセスを推進します。
https://enterprise-aiiot.nttdata.com/recruitment/career_sp/datascientist

3.お客様のAI活用の成功を推進するAIサクセスマネージャー

DataRobotをはじめとしたAIソリューションやサービスを使って、
お客様のAIプロジェクトを成功させ、ビジネス価値を創出するための活動を実施し、
お客様内でのAI活用を拡大、NTTデータが提供するAIソリューションの利用継続を推進していただく人材を募集しています。
https://nttdata.jposting.net/u/job.phtml?job_code=804

4.DX/デジタルサクセスを推進するデータサイエンティスト《管理職/管理職候補》 データ分析プロジェクトのリーダとして、正確な課題の把握、適切な評価指標の設定、分析計画策定や適切な分析手法や技術の評価・選定といったデータ活用の具現化、高度化を行い分析結果の見える化・お客様の納得感醸成を行うことで、ビジネス成果・価値を出すアクションへとつなげることができるデータサイエンティスト人材を募集しています。

https://nttdata.jposting.net/u/job.phtml?job_code=898

ソリューション紹介

Trusted Data Foundationについて

~データ資産を分析活用するための環境をオールインワンで提供するソリューション~
https://enterprise-aiiot.nttdata.com/tdf/
最新のクラウド技術を採用して弊社が独自に設計したリファレンスアーキテクチャ(Datalake+DWH+AI/BI)を顧客要件に合わせてカスタマイズして提供します。
可視化、機械学習、DeepLearningなどデータ資産を分析活用するための環境がオールインワンで用意されており、これまでとは別次元の量と質のデータを用いてアジリティ高くDX推進を実現できます。

TDFⓇ-AM(Trusted Data Foundation - Analytics Managed Service)について

~データ活用基盤の段階的な拡張支援(Quick Start) と保守運用のマネジメント(Analytics Managed)をご提供することでお客様のDXを成功に導く、データ活用プラットフォームサービス~
https://enterprise-aiiot.nttdata.com/service/tdf/tdf_am
TDFⓇ-AMは、データ活用をQuickに始めることができ、データ活用の成熟度に応じて段階的に環境を拡張します。プラットフォームの保守運用はNTTデータが一括で実施し、お客様は成果創出に専念することが可能です。また、日々最新のテクノロジーをキャッチアップし、常に活用しやすい環境を提供します。なお、ご要望に応じて上流のコンサルティングフェーズからAI/BIなどのデータ活用支援に至るまで、End to Endで課題解決に向けて伴走することも可能です。

NTTデータとTableauについて

ビジュアル分析プラットフォームのTableauと2014年にパートナー契約を締結し、自社の経営ダッシュボード基盤への採用や独自のコンピテンシーセンターの設置などの取り組みを進めてきました。さらに2019年度にはSalesforceとワンストップでのサービスを提供開始するなど、積極的にビジネスを展開しています。

これまでPartner of the Year, Japanを4年連続で受賞しており、2021年にはアジア太平洋地域で最もビジネスに貢献したパートナーとして表彰されました。
また、2020年度からは、Tableauを活用したデータ活用促進のコンサルティングや導入サービスの他、AI活用やデータマネジメント整備など、お客さまの企業全体のデータ活用民主化を成功させるためのノウハウ・方法論を体系化した「デジタルサクセス」プログラムを提供開始しています。
https://enterprise-aiiot.nttdata.com/service/tableau

NTTデータとAlteryxについて
Alteryxは、業務ユーザーからIT部門まで誰でも使えるセルフサービス分析プラットフォームです。

Alteryx導入の豊富な実績を持つNTTデータは、最高位にあたるAlteryx Premiumパートナーとしてお客さまをご支援します。

導入時のプロフェッショナル支援など独自メニューを整備し、特定の業種によらない多くのお客さまに、Alteryxを活用したサービスの強化・拡充を提供します。

https://enterprise-aiiot.nttdata.com/service/alteryx

NTTデータとDataRobotについて
DataRobotは、包括的なAIライフサイクルプラットフォームです。

NTTデータはDataRobot社と戦略的資本業務提携を行い、経験豊富なデータサイエンティストがAI・データ活用を起点にお客様のビジネスにおける価値創出をご支援します。

https://enterprise-aiiot.nttdata.com/service/datarobot

NTTデータとInformaticaについて

データ連携や処理方式を専門領域として10年以上取り組んできたプロ集団であるNTTデータは、データマネジメント領域でグローバルでの高い評価を得ているInformatica社とパートナーシップを結び、サービス強化を推進しています。
https://enterprise-aiiot.nttdata.com/service/informatica

NTTデータとSnowflakeについて
NTTデータでは、Snowflake Inc.とソリューションパートナー契約を締結し、クラウド・データプラットフォーム「Snowflake」の導入・構築、および活用支援を開始しています。

NTTデータではこれまでも、独自ノウハウに基づき、ビッグデータ・AIなど領域に係る市場競争力のあるさまざまなソリューションパートナーとともにエコシステムを形成し、お客さまのビジネス変革を導いてきました。
Snowflakeは、これら先端テクノロジーとのエコシステムの形成に強みがあり、NTTデータはこれらを組み合わせることでお客さまに最適なインテグレーションをご提供いたします。

https://enterprise-aiiot.nttdata.com/service/snowflake

9
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?