1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Microsoft FabricAdvent Calendar 2024

Day 15

イベントハウスを使用したデータパイプラインの結果の保管・分析

Last updated at Posted at 2024-12-17

はじめに

イベントハウスにパイプラインからアクティビティの出力を記録する方法を記載します。

準備

  1. Microsoft Fabric でパイプラインを使用してデータを取り込む を参考にデータを取り込むパイプラインを作成します。
    image.png

  2. ノートブックの最後には 終了で結果を返すようにしておきます。

    pyspark
    
    import json
    
    count = df.count()
    
    # ノートブックの実行結果を返す
    result = {
        "df_count" : count,
    }
    result_str=json.dumps(result)
    mssparkutils.notebook.exit(result_str)
    
    

    image.png

  3. 実行を確認して、出力値を確認しておきます。これらが KQLに入力したい値です。
    image.png

手順

  1. イベントハウスを作成します。
    image.png

  2. KQL データベース上で、テーブルを作成します。

    KQL
    
    // Create table command
    ////////////////////////////////////////////////////////////
    .create table ['pipeline']  (['result']:dynamic)
    
    

    image.png

  3. パイプラインに戻って、KQL スクリプトアクティビティを追加します。
    image.png

    エラー時の情報を取得する場合は、以下のようにエラー分岐ごとに KQL を設置するほかないようです。

    image.png
    :

  4. 設定で、あてさきのKQLデータベースを選択し、動的なコンテンツを開きます。
    image.png

  5. 動的なコンテンツに以下を入力します。

    pipeline式
    
    @concat('
    .ingest inline into table pipeline with (format = "json") <|
    {"result":{'
    ,' "workspaceId":"',pipeline().DataFactory,'"'
    ,',"pipelineId":"',pipeline().Pipeline,'"'
    ,',"pipelineRunId":"',pipeline().RunId,'"'
    ,',"pipelineTriggerTime":"',pipeline().TriggerTime,'"'
    ,',"pipelineEndTime":"',utcNow(),'"'
    ,',"status":"success"'
    ,',"activities":['
    ,' {"name":"<アクティビティ名を入れる>"'
    ,',"result":',,'}'
    ,']}}'
    )
    
    
  6. 入力したい出力をもつアクティビティを追加します。

    image.png

  7. 複数入れる場合は、カンマの数などに注意して追加していきます。以下のようになりました。(activities 先頭のオブジェクトだけカンマ無し、後はあり。)
    image.png

  8. パラメータなどほかの値を入れる場合はこのようにします。
    image.png

  9. 実行し、成功すると、KQL データベース上のテーブルに結果が格納されます。

    kql
    
    // Use 'take' to view a sample number of records in the table and check the data.
    pipeline
    | take 100
    
    
    

    image.png

    KQL でエラーがある場合、HasErrorsでのみわかる点に注意
    image.png
    :

  10. KQL例

    kql
    
    
    pipeline
    | project 
        pipelineTriggerTime = todatetime(result.pipelineTriggerTime)
        ,pipelineRunId = tostring(result.pipelineRunId)
        ,duration =  bin(todatetime(result.pipelineEndTime)-todatetime(result.pipelineTriggerTime),1s) 
        ,status = tostring(result.status)
        ,delete_result = result.activities[0].result 
        ,copy_result = result.activities[1].result 
        ,notebook_result = result.activities[2].result 
        ,url = strcat('https://app.powerbi.com/workloads/data-pipeline/monitoring/workspaces/',result.workspaceId,'/pipelines/',result.pipelineId,'/',result.pipelineRunId)
    | order by ingestion_time()desc 
    
    

    delete result
    image.png
    copy_result
    image.png
    notebook_result
    image.png

  11. KQL を基にダッシュボード化することも可能です。この例では URL を作りこんでいるので、直接監視ハブに遷移可能です。

    image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?