5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AzureAdvent Calendar 2021

Day 13

Azure Purview を遊びながら開発しよう(応用編)

Last updated at Posted at 2021-12-13

#はじめに
この記事は、Azure Purview を遊びながら開発しよう(基本編)の第二弾となります。

#データ変換
Azure Purview では、データの変換操作をエンティティとして保持することができます。そのためのエンティティとして Process 型が用意されています。

ひとまず以下のコードを実行します。

from pyapacheatlas.core import AtlasProcess

input01 = AtlasEntity("inputprocess01.csv","DataSet", "pyapacheatlas://inputprocess01", guid=-100)
output01= AtlasEntity("outputprocess01.csv","DataSet", "pyapacheatlas://outputprocess01", guid=-101)

process = AtlasProcess(
    name="データ変換処理1",
    typeName="Process",
    qualified_name="pyapacheatlas://hanaprocess1",
    inputs=[input01],
    outputs=[output01],
    guid=-102
)

results = client.upload_entities(
    batch=[input01, output01, process]
)

print(json.dumps(results, indent=2))

そして、"データ変換" などのキーワードでカタログを検索すると、Process 型のエンティティがヒットします。

011.png

pyapacheatlas.core の AtlasProcess クラスに inputs と outputs となるエンティティを指定してあげることで、データの処理過程を系列(Lineage)として表現することができます。Azure Purview では、Azure Data FactoryAzure Synapse のコピーアクティビティやデータフローアクティビティなどのデータ処理が系列として表示できます。これを自動的に連携するには事前にそれぞれのサービス側から Azure Purview へ接続しておく必要があります。

それでは、Azure Purview で定義されている Process 型についてもう少し詳しく調べてみましょう。

typedef = client.get_typedef(name="Process")
print(json.dumps(typedef, indent=2))

出力結果から、Asset 型を継承していることが分かります。つまり、Process 型は DataSet型と同階層に属する型ということです。

Referenceable
  └ Asset
    ├ DataSet
    └ Process
       ├ azure_synapse_activity
       ├ azure_synapse_pipeline
       ├ adf_process
       ├ azure_sql_stored_procedure
       ├ azure_sql_query_run
        ...46 items

ただし、それぞれの型の下にもサブタイプが存在し、結構複雑です。しっかり型定義を見ていく必要があります。例えば Azure Data Factory で Copy アクティビティを実行した場合、adf_copy_activity が作成されます。この型には、adf_pipeline 型のエンティティを必要としますし、adf_copy_operation 型のエンティティをオプションで受け入れます。

複数データソースを系列に追加する

ETL/ELT 処理では複数のデータソースを入力し、一つの出力を得ることもあるかと思いますがそれも簡単にできます。AtlasEntity を作成して、AtlasProcess の inputs 配列に追加するだけです。

from pyapacheatlas.core import AtlasProcess

input01 = AtlasEntity("inputprocess01.csv","DataSet", "pyapacheatlas://inputprocess01", guid=-100)
input02 = AtlasEntity("inputprocess02.csv","DataSet", "pyapacheatlas://inputprocess02", guid=-101)
output01= AtlasEntity("outputprocess01.csv","DataSet", "pyapacheatlas://outputprocess01", guid=-102)

process = AtlasProcess(
    name="データ変換処理1",
    typeName="Process",
    qualified_name="pyapacheatlas://hanaprocess1",
    inputs=[input01, input02],
    outputs=[output01],
    guid=-103
)

results = client.upload_entities(
    batch=[input01, input02, output01, process]
)

print(json.dumps(results, indent=2))

012.png

はい、ファイルが追加されましたね。もし、typeNameazure_synapse_pipeline に変えて実行すると…

013.png

こんな感じに変わります。ただ、属性や関連に何も値を指定していないので、クリックしても Azure Synapse の該当ページへのリンクも表示されません。

Azure Data Factory の Copy アクティビティの作成

通常、Azure Purview と正しく接続できていれば Copy アクティビティ実行後、Azure Purview 側で Scan することで系列が自動的に連携されます。でも、内部はどうなっているのか知りたいですよね… この Copy アクティビティは Atlas API から作成しようとするとかなり複雑ですが、以下のコードを実行すれば一発で作成することができます。

# qualified_name のパスを指定しておく
adf_qualifiedPath = "/subscriptions/12345/resourceGroups/purviewhanatest1/providers/Microsoft.DataFactory/factories"
adf_qName = adf_qualifiedPath + "/adf01"
adf_pipeline_qName = adf_qName + "/pipelines/hanaadf_pipeline1"
adf_activity_qName = adf_pipeline_qName + "/activities/hanaadf_copy_activity1"
adf_operation_qName = adf_activity_qName + "#hanaadf_copy_operation1"
# 最上位は azure_data_factory 型
adf = AtlasEntity("adf01","azure_data_factory", adf_qName, guid=-100)
# Process 型への input と output
input01 = AtlasEntity("sql_input01","azure_sql_dw_table", "pyapacheatlas://sql_input01", guid=-101)
output01= AtlasEntity("sql_output01","azure_sql_table", "pyapacheatlas://sql_output01", guid=-102)
# Pipeline の作成
proc_pipeline = AtlasProcess(
    name="データ変換処理_パイプライン",
    typeName="adf_pipeline",
    qualified_name=adf_pipeline_qName,
    guid=-103,
    inputs=[],
    outputs=[]
)
# Activity の作成
proc_activity = AtlasProcess(
    name="データ変換処理_Copyアクティビティ",
    typeName="adf_copy_activity",
    qualified_name=adf_activity_qName,
    inputs=[],
    outputs=[],
    guid=-104,
    attributes = {"status": "Completed"}
)
# Operation の作成
proc_operation = AtlasProcess(
    name="データ変換処理_オペレーション",
    typeName="adf_copy_operation",
    qualified_name=adf_operation_qName,
    guid=-105,
    inputs=[input01],
    outputs=[output01],
)
# Relationship の作成
proc_activity.addRelationship(subProcesses=[proc_operation.to_json(minimum=True)])
proc_pipeline.addRelationship(subProcesses=[proc_activity.to_json(minimum=True)])
adf.addRelationship(pipelines=[proc_pipeline.to_json(minimum=True)])

results = client.upload_entities(
    batch=[adf, proc_pipeline, proc_activity, proc_operation, input01, output01]
)
print(json.dumps(results, indent=2))

015.png

各 Process 型に特有の設定がありますが、Azure Data Factory の Copy アクティビティの例を知っておけば他にも応用が利くと思います。単に adf_copy_activity 型のエンティティに input/output を指定するだけでは正しく系列が表示されません。この場合は、adf_copy_activity の下に adf_copy_operation 型のエンティティを作成し、そちらに指定してあげる必要があります。そして、上位のエンティティとの間に関係を作成します。

さらに、正しい Copy アクティビティはこのように階層が表示されます。

016.png

この階層表示および Related タブの階層は、関係を作成しただけでは表示されません。既定のパス形式に沿って設定された qualified_name が必要です。adf_copy_activity の場合は上記コードのようなパスが必要ですが、カスタムする場合でも qualified_name の命名規則はしっかり決めておいた方がいいです。

ちなみに、系列ボックス上の「Open in Azure Data Factory」ボタンで開かれるリンク先 URL の生成に qualified_name が使用されます。

系列のカラムマッピング作成

これまではデータセット粒度での系列でしたが、Azure Purview ではさらにカラム粒度での系列を表示し、データ変換処理を経て、どのようにカラムが伝播していったのかを追跡することができます。

from pyapacheatlas.core import AtlasProcess
from pyapacheatlas.core.typedef import EntityTypeDef, AtlasAttributeDef

# columnMapping 属性の定義を含むカスタムプロセスエンティティ型を定義
procType = EntityTypeDef(
    "ProcessWithColumnMapping",
    superTypes=["Process"],
    attributeDefs = [
        AtlasAttributeDef("columnMapping")
    ]
)
# 型定義のアップロード
type_results = client.upload_typedefs(entityDefs=[procType], force_update=True)
print(json.dumps(type_results,indent=2))

input01 = AtlasEntity("sql_col_input01","azure_sql_dw_table", "pyapacheatlas://sql_col_input01", guid=-100)
output01= AtlasEntity("sql_col_output01","azure_sql_table", "pyapacheatlas://sql_col_output01", guid=-101)
# カラムマッピングの定義
column_mapping = [
    {"ColumnMapping": [
        {"Source": "In01_ID", "Sink": "Out01_ID"},
        {"Source": "In01_Name", "Sink": "Out01_Name"}],
        "DatasetMapping": {
        "Source": input01.qualifiedName, "Sink": output01.qualifiedName}
     }
]

process = AtlasProcess(
    name="データ変換処理_カラムマッピング",
    typeName="ProcessWithColumnMapping",
    qualified_name="pyapacheatlas://hana_col_process1",
    inputs=[input01],
    outputs=[output01],
    guid=-102,
    attributes={"columnMapping": json.dumps(column_mapping)}
)

results = client.upload_entities(
    batch=[input01, output01, process]
)

print(json.dumps(results, indent=2))

017.png

このように入力テーブルのカラムがデータ変換処理の後、どこへ伝播したかを示す系列を作成できました。この時、EntityTypeDef クラスを使って columnMapping 属性を持つ新たな Process 型のエンティティ ProcessWithColumnMapping を定義しました。これは既定でカラムマッピング属性を保持できるシンプルな Process 型が無かったためです。azure_synapse_operation 型や adf_copy_activity 型、ssis_package_process 型などは既定でカラムマッピング属性を保持できますが、前述のとおり作るのがめんどくさいです。

複数のカラムマッピング作成

データ変換処理で 1 つのカラムから複数のカラムを生成するパターンもよくあると思います。これも簡単にできます。

column_mapping = [
    {"ColumnMapping": [
        {"Source": "In01_ID", "Sink": "Out01_ID"},
        {"Source": "In01_Name", "Sink": "Out01_LastName"},
        {"Source": "In01_Name", "Sink": "Out01_FirstName"}],
        "DatasetMapping": {
        "Source": input01.qualifiedName, "Sink": output01.qualifiedName}
     }
]

process = AtlasProcess(
    name="データ変換処理_カラムマッピング",
    typeName="ProcessWithColumnMapping",
    qualified_name="pyapacheatlas://hana_col_process1",
    inputs=[input01],
    outputs=[output01],
    guid=-102,
    attributes={"columnMapping": json.dumps(column_mapping)}
)

results = client.upload_entities(
    batch=[input01, output01, process]
)

print(json.dumps(results, indent=2))

018.png

いいですね!
試しにさっき作った Azure Data Factory の Copy アクティビティにカラムマッピングを追加するにはどうすればいいかというと、

input01 = AtlasEntity("sql_input01","azure_sql_dw_table", "pyapacheatlas://sql_input01", guid=-101)
output01= AtlasEntity("sql_output01","azure_sql_table", "pyapacheatlas://sql_output01", guid=-102)

column_mapping = [
    {"ColumnMapping": [
        {"Source": "In01_ID", "Sink": "Out01_ID"},
        {"Source": "In01_Name", "Sink": "Out01_Name"}],
        "DatasetMapping": {
        "Source": input01.qualifiedName, "Sink": output01.qualifiedName}
     }
]

proc_activity = AtlasProcess(
    name="データ変換処理_Copyアクティビティ",
    typeName="adf_copy_activity",
    qualified_name=ADF_QUALIFIED_NAME + "/adf01/pipelines/hanaadf_pipeline1/activities/hanaadf_copy_activity1",
    inputs=[],
    outputs=[],
    guid=-103,
    attributes = {"status": "Completed", "columnMapping": json.dumps(column_mapping)}
)

results = client.upload_entities(
    batch=[proc_activity]
)
print(json.dumps(results, indent=2))

019.png

サクッと行けましたね!
Process 型に columnMapping という属性が無いと追加できないということです。

さらに次のデータ処理を追加する

前回の出力データに対して、さらに後段のデータ処理を追加するには、以下のように書きます。

from pyapacheatlas.core import AtlasEntity, AtlasProcess

output01= AtlasEntity("sql_output01","azure_sql_table", "pyapacheatlas://sql_output01", guid=-100)
next01  = AtlasEntity("sql_next01","azure_sql_table", "pyapacheatlas://sql_next01", guid=-101)

process = AtlasProcess(
    name="データ変換処理その2",
    typeName="Process",
    qualified_name= "pyapacheatlas://hanaprocess2",
    inputs=[output01],
    outputs=[next01],
    guid=-102
)

results = client.upload_entities(
    batch=[output01, next01, process]
)

print(json.dumps(results, indent=2))

020.png

簡単にできましたね。 AtlasProcess の inputs に前回の AtlasEntity を指定するだけです。

関係を作成する別の方法

すでに作成済みのエンティティ同士を関連付けるには、upload_relationship メソッドを使用することもできます。

relationship01 = client.get_entity(qualifiedName="pyapacheatlas://sql_output01", typeName="azure_sql_table")
relationship02 = client.get_entity(qualifiedName="pyapacheatlas://hanaprocess3", typeName="Process")

relationship = {
    "typeName": "dataset_process_inputs",
    "attributes": {},
    "guid": -100,
    "end1": {
        "guid": relationship01['entities'][0]['guid']
    },
    "end2": {
        "guid": relationship02['entities'][0]['guid']
    }
}

# 新しい関係をアップロードする
results = client.upload_relationship(relationship)
print(json.dumps(results, indent=2))

021.png

既定で作成できる関係は、データセットとデータ変換やテーブルとカラムなどの定義された関係に限られます。この関係の事前定義については、get_all_typedefs()root/relationshipDefs 以下に定義されています。データ変換とインプット側の関係を作成する際は dataset_process_inputs 型を、データ変換とアウトプット側の関係を作成する場合は、process_dataset_outputs 型を使用します。

さいごに

はい、こんな感じで Azure Purview の開発を楽しんでいただければと思います。Azure Purview にはこの他にも色々な機能があるので試してみるのも面白いのではないでしょうか。そして Azure Purview の開発を通して、新たなデータガバナンスエコシステムが活性化されることを楽しみにしています。

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?