1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Azure】Microsoft Fabric でパイプラインを使用してデータを取り込む

Posted at

1. はじめに

1-1 ご挨拶

初めまして、井村と申します。

Microsoft Fabricは、データの統合、エンジニアリング、分析、ビジネスインテリジェンスなどの機能を統合したSaaS型データ分析ツールです。
そしてMicrosoft Fabricは60日間のフリートライアル期間があります。

さらに、Microsoftが提供する無料のオンライン学習プラットフォームであるMicrosoft Learn(MSLearn)にはたくさんのMicrosoft Fabricに関する記事があります。

本記事はフリートライアル期間を利用してMSLearnの演習を行っていきます。
演習を通して気づいた点やTipsを、なるべく多くのスクリーンショットとともに備忘録として残します。

1-2 MSLearn

今回はMicrosoft Fabric を使用してプロセスとデータ移動を調整するを行います。

この演習の学習の目的は以下の通りです。

  • Microsoft Fabric でのパイプライン機能について説明する。
  • パイプラインのデータ コピー アクティビティを使用する。
  • 定義済みのテンプレートに基づいてパイプラインを作成する。
  • パイプラインを実行して監視する。

1-3 Get started with Microsoft Fabric

以下からMicrosoft Fabricのフリートライアルを開始できます。

Get started with Microsoft Fabric

2. パイプラインについて

パイプラインについての概要になります。

Microsoft Fabric のパイプラインには、データの転送と処理のタスクを実行する一連の "アクティビティ" がカプセル化されています。

データの転送と処理のアクティビティを定義し、分岐、ループ、その他の一般的な処理ロジックを管理する制御フロー アクティビティを通してこれらのアクティビティを調整できます。

Fabric ユーザー インターフェイスのグラフィカル パイプライン "キャンバス" を使用すると、最小限のコーディングまたはコーディングなしで複雑なパイプラインを構築できます。

アクティビティ

パイプライン内のアクティビティには、大きく 2 つのカテゴリがあります。

  • データ変換アクティビティ:データ転送および操作をカプセル化するアクティビティ。
    • データコピーアクティビティ:ソースからデータを抽出して変換先に読み込むアクティビティ。
    • データフロー (Gen2):転送時にデータに変換を適用する複雑なデータフローアクティビティ。
    • Notebookアクティビティ:Sparkノートブックを実行するアクティビティ。
    • ストアドプロシージャアクティビティ:SQLコードを実行するアクティビティ。
    • データ削除アクティビティ:既存のデータを削除するアクティビティ。

OneLakeは、レイクハウス、ウェアハウス、SQLデータベースなど、さまざまな宛先にデータを保存できます。

  • 制御フローアクティビティ:ループの実装、条件分岐、変数とパラメーター値の管理に使用できるアクティビティ。 さまざまな制御フロー アクティビティを使用して複雑なパイプライン ロジックを実装し、データ インジェストと変換フローを調整できます。

パラメーター

パイプラインはパラメーター化できるため、パイプラインを実行するたびに使用する特定の値を指定できます。 たとえば、パイプラインを使用して、取り込まれたデータをフォルダーに保存できますが、パイプラインを実行するたびにフォルダー名を柔軟に指定できます。

パイプライン実行

パイプラインが実行されるたびに、"データ パイプライン実行" が開始されます。 実行は、Fabric ユーザー インターフェイスでオンデマンドで開始することも、特定の頻度で開始するようにスケジュールすることもできます。

3. 演習スタート

演習 - パイプラインを使用してデータを取り込む

上記URLから演習を開始できます。実際のMicrosoft Fabricを使うため、とても勉強になります。

3-1 ワークスペースの作成

1 . 【Azure】Microsoft Fabric レイクハウス内にあるファイルとテーブルにデータを取り込む。(3-1 レイクハウスを作成する)をご参照ください。

3-2 レイクハウスを作成する

1 . レイクハウスの作成は上記リンクをご参照ください。

2 . 今回作成する [新しいサブフォルダー]名 は [new_data] になります。

008.png

検証用のcsvファイルはパイプラインを用いてレイクハウスに転送します。

3-3 パイプラインを作成する

データを簡単に取り込むには、パイプラインの [データのコピー] アクティビティを使用して、データをソースから抽出し、レイクハウス内のファイルにコピーします。

1 . レイクハウスの上側[ホーム] - [データの取得] - [新しいデータパイプライン]を押下します。

009.png

2 . [新しいパイプライン] ダイアログボックスにて [Ingest Sales Data] という名前の入力し、[作成]を押下します。

010.png

しばらくすると[レイクハウスにデータをコピーする] ウィザードが表示されます。

011.png

3 . [データ ソースの選択] ページで、検索バーに「HTTP」と入力し、[新しいソース] セクションで [HTTP] を選択します。

012.png

4 . [データ ソースへの接続] ウィンドウで、データ ソースへの接続に関する以下の設定を入力します。入力後、[次へ]を押下します。

013.png

接続設定

項目
URL https://raw.githubusercontent.com/MicrosoftLearning/dp-data/main/sales.csv

接続の資格情報

項目
接続 新しい接続の作成
接続名 sales.csv
データ ゲートウェイ (なし)
認証の種類 匿名
プライバシーレベル なし

5 . [要求メソッド]の値は[GET]を選択し[次へ]を押下します。

014.png

6 . ファイルに関する以下の設定を入力します。入力後、[データのプレビュー]を押下します。

項目
ファイル形式 DelimitedText
列区切り記号 Comma (,)
行区切り記号 Line feed (\n)
先頭行をヘッダーとして
圧縮の種類 圧縮なし

015.png

5 . [データのプレビュー]を確認し[次へ]を押下します。

016.png

6 . [データ変換先に接続] ウィンドウで、データ コピーオプションの設定を入力します。入力後、[次へ]を押下します。

項目
ルートフォルダー ファイル
フォルダーのパス new_data/
ファイル名 sales.csv
コピー動作 なし

017.png

7 . ファイルに関する以下の設定を入力します。入力後、[次へ]を押下します。

項目
ファイル形式 DelimitedText
列区切り記号 Comma (,)
行区切り記号 Line feed (\n)
先頭行をヘッダーとして
圧縮の種類 圧縮なし

018.png

8 . [レビューと保存] ウィンドウにて内容を確認後、[保存と実行]を押下します。

019.png

9 . [データのコピー] アクティビティを含む新しいパイプラインが作成されます。また、パイプライン デザイナーの [出力]ペインで [アクティビティの状態]が成功になることを確認します。

020.png

左側ペインの作成したレイクハウスを押下します。

10 . [Files] を展開し、 [new_data] フォルダーを選択して、 [sales.csv] ファイルがコピーされていることを確認します。

021.png

3-4 ノートブックを作成する

1 . レイクハウスの [ホーム] ページの [ノートブックを開く] メニューで、 [新しいノートブック] を選択します。

022.png

2 . ノートブック内の既存のセルを選択します。既定のコードを次の変数宣言に置き換えます。

023.png

table_name = "sales"

3 . セルの […] メニュー (右上にあります) で、 パラメーター セルの切り替え (Toggle parameter cell)] を選択します。 これにより、セルは、パイプラインからノートブックを実行するときに、そのセル内で宣言された変数をパラメーターとして扱うように構成されます。
[ ▷ ] (実行)を押下します。

024.png

[ parameters ]セルとなります。

4 . セルすぐ下の [+コード](コード セルの追加)を押下します。

025.png

5 . 次にそのセルに次のコードを追加・実行します。

python
 from pyspark.sql.functions import *

# Read the new sales data
df = spark.read.format("csv").option("header","true").load("Files/new_data/*.csv")

## Add month and year columns
df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))

# Derive FirstName and LastName columns
df = df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))

# Filter and reorder columns
df = df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "EmailAddress", "Item", "Quantity", "UnitPrice", "TaxAmount"]

# Load the data into a table
df.write.format("delta").mode("append").saveAsTable(table_name)

このコードは、 [データのコピー] アクティビティによって取り込まれた [sales.csv] ファイルからデータを読み込み、いくつかの変換ロジックを適用し、変換されたデータをテーブルとして保存します。テーブルが既に存在する場合は、データを追加します。

026.png

6 . ノートブックの実行が完了したら、 [Tables] の […] メニューにある [最新の情報に更新] を選択し、[sales] テーブルが作成されていることを確認します。

028.png

3-5 パイプラインを変更する

データを変換してテーブルに読み込むためのノートブックを実装したので、そのノートブックをパイプラインに組み込んで、再利用可能な ETL プロセスを作成できます。

1 . 左側のハブ メニュー バーで、先ほど作成した [Ingest Sales Data] パイプラインを選択します。

2 . [アクティビティ] タブの [すべてのアクティビティ] 一覧で、[データの削除] を選択します。

029.png

3 . 次に、次に示すように、新しいデータの削除アクティビティをデータのコピー アクティビティの左側に配置し、その [完了時] 出力をデータのコピー アクティビティに接続します。

030.png

031.png

4 . データの削除アクティビティを選択し、デザイン キャンバスの下のペインで、次のプロパティを設定します。

032.png

全般

項目
名前 Delete old files

033.png

ソース

項目
接続 自分が作成したレイクハウス
ファイル パスの種類 ワイルドカード ファイル パス
フォルダーのパス Files / new_data
Wildcard file name *.csv
Recursively

ログの設定

項目
ログを有効にする

034.png

これらの設定により、sales.csv ファイルをコピーする前に、既存の .csv ファイルが確実に削除されます。

5 . [ノートブック] アクティビティを選択します。[ノートブック] アクティビティを[データのコピー] アクティビティの右側に移動させます。[データのコピー] アクティビティの [完了時] 出力を [ノートブック] アクティビティに接続します。

035.png

6 . [ノートブック] アクティビティを選択し、デザイン キャンバスの下のペインで、次のプロパティを設定します。

全般

項目
名前 Load Sales notebook

036.png

設定

項目
ワークスペース 自分が作成したワークスペース
ノートブック Load Sales

ベース パラメーター

名前 種類 種類
table_name String new_sales

table_name パラメーターはノートブックに渡され、[parameters] セル内の table_name 変数に割り当てられている既定値がオーバーライドされます。

7 . [ホーム] タブで [保存] を使用してパイプラインを保存します。

038.png

8 . 次に、[ ▷ ] [実行] を使用してパイプラインを実行し、すべてのアクティビティが完了するのを待ちます。

039.png

9 . [出力] ペインの [アクティビティの状態] がすべて成功であることを確認します。確認後、自身が作成した [レイクハウス]を押下します。

041.png

10 . [Tables] を展開し、 [new_sales] テーブルを選択して、それに含まれるデータのプレビューを表示します。このテーブルは、パイプラインによって実行された際の [ノートブック] アクティビティにより作成されました。

042.png

以上で演習は終了になります。
この演習では、Microsoft Fabric でパイプラインを実装する方法を学習しました。

3-6 リソースをクリーンアップする

1 . 【Azure】Microsoft Fabric レイクハウス内にあるファイルとテーブルにデータを取り込む。(3-7 リソースをクリーンアップする)をご参照ください。

以上でワークスペースが削除されます。お疲れ様でした!

本演習を通じて Microsoft Fabric の演習 が一覧化されていることを初めて知りましたので共有致します。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?