1. はじめに
1-1 ご挨拶
初めまして、井村と申します。
Microsoft Fabricは、データの統合、エンジニアリング、分析、ビジネスインテリジェンスなどの機能を統合したSaaS型データ分析ツールです。
そしてMicrosoft Fabricは60日間のフリートライアル期間があります。
さらに、Microsoftが提供する無料のオンライン学習プラットフォームであるMicrosoft Learn(MSLearn)にはたくさんのMicrosoft Fabricに関する記事があります。
本記事はフリートライアル期間を利用してMSLearnの演習を行っていきます。
演習を通して気づいた点やTipsを、なるべく多くのスクリーンショットとともに備忘録として残します。
1-2 MSLearn
今回はMicrosoft Fabric を使用してプロセスとデータ移動を調整するを行います。
この演習の学習の目的は以下の通りです。
- Microsoft Fabric でのパイプライン機能について説明する。
- パイプラインのデータ コピー アクティビティを使用する。
- 定義済みのテンプレートに基づいてパイプラインを作成する。
- パイプラインを実行して監視する。
1-3 Get started with Microsoft Fabric
以下からMicrosoft Fabricのフリートライアルを開始できます。
Get started with Microsoft Fabric
2. パイプラインについて
パイプラインについての概要になります。
Microsoft Fabric のパイプラインには、データの転送と処理のタスクを実行する一連の "アクティビティ" がカプセル化されています。
データの転送と処理のアクティビティを定義し、分岐、ループ、その他の一般的な処理ロジックを管理する制御フロー アクティビティを通してこれらのアクティビティを調整できます。
Fabric ユーザー インターフェイスのグラフィカル パイプライン "キャンバス" を使用すると、最小限のコーディングまたはコーディングなしで複雑なパイプラインを構築できます。
アクティビティ
パイプライン内のアクティビティには、大きく 2 つのカテゴリがあります。
- データ変換アクティビティ:データ転送および操作をカプセル化するアクティビティ。
- データコピーアクティビティ:ソースからデータを抽出して変換先に読み込むアクティビティ。
- データフロー (Gen2):転送時にデータに変換を適用する複雑なデータフローアクティビティ。
- Notebookアクティビティ:Sparkノートブックを実行するアクティビティ。
- ストアドプロシージャアクティビティ:SQLコードを実行するアクティビティ。
- データ削除アクティビティ:既存のデータを削除するアクティビティ。
OneLakeは、レイクハウス、ウェアハウス、SQLデータベースなど、さまざまな宛先にデータを保存できます。
- 制御フローアクティビティ:ループの実装、条件分岐、変数とパラメーター値の管理に使用できるアクティビティ。 さまざまな制御フロー アクティビティを使用して複雑なパイプライン ロジックを実装し、データ インジェストと変換フローを調整できます。
パラメーター
パイプラインはパラメーター化できるため、パイプラインを実行するたびに使用する特定の値を指定できます。 たとえば、パイプラインを使用して、取り込まれたデータをフォルダーに保存できますが、パイプラインを実行するたびにフォルダー名を柔軟に指定できます。
パイプライン実行
パイプラインが実行されるたびに、"データ パイプライン実行" が開始されます。 実行は、Fabric ユーザー インターフェイスでオンデマンドで開始することも、特定の頻度で開始するようにスケジュールすることもできます。
3. 演習スタート
上記URLから演習を開始できます。実際のMicrosoft Fabricを使うため、とても勉強になります。
3-1 ワークスペースの作成
1 . 【Azure】Microsoft Fabric レイクハウス内にあるファイルとテーブルにデータを取り込む。(3-1 レイクハウスを作成する)をご参照ください。
3-2 レイクハウスを作成する
1 . レイクハウスの作成は上記リンクをご参照ください。
2 . 今回作成する [新しいサブフォルダー]名 は [new_data] になります。
検証用のcsvファイルはパイプラインを用いてレイクハウスに転送します。
3-3 パイプラインを作成する
データを簡単に取り込むには、パイプラインの [データのコピー] アクティビティを使用して、データをソースから抽出し、レイクハウス内のファイルにコピーします。
1 . レイクハウスの上側[ホーム] - [データの取得] - [新しいデータパイプライン]を押下します。
2 . [新しいパイプライン] ダイアログボックスにて [Ingest Sales Data] という名前の入力し、[作成]を押下します。
しばらくすると[レイクハウスにデータをコピーする] ウィザードが表示されます。
3 . [データ ソースの選択] ページで、検索バーに「HTTP」と入力し、[新しいソース] セクションで [HTTP] を選択します。
4 . [データ ソースへの接続] ウィンドウで、データ ソースへの接続に関する以下の設定を入力します。入力後、[次へ]を押下します。
接続設定
項目 | 値 |
---|---|
URL | https://raw.githubusercontent.com/MicrosoftLearning/dp-data/main/sales.csv |
接続の資格情報
項目 | 値 |
---|---|
接続 | 新しい接続の作成 |
接続名 | sales.csv |
データ ゲートウェイ | (なし) |
認証の種類 | 匿名 |
プライバシーレベル | なし |
5 . [要求メソッド]の値は[GET]を選択し[次へ]を押下します。
6 . ファイルに関する以下の設定を入力します。入力後、[データのプレビュー]を押下します。
項目 | 値 |
---|---|
ファイル形式 | DelimitedText |
列区切り記号 | Comma (,) |
行区切り記号 | Line feed (\n) |
先頭行をヘッダーとして | ✓ |
圧縮の種類 | 圧縮なし |
5 . [データのプレビュー]を確認し[次へ]を押下します。
6 . [データ変換先に接続] ウィンドウで、データ コピーオプションの設定を入力します。入力後、[次へ]を押下します。
項目 | 値 |
---|---|
ルートフォルダー | ファイル |
フォルダーのパス | new_data/ |
ファイル名 | sales.csv |
コピー動作 | なし |
7 . ファイルに関する以下の設定を入力します。入力後、[次へ]を押下します。
項目 | 値 |
---|---|
ファイル形式 | DelimitedText |
列区切り記号 | Comma (,) |
行区切り記号 | Line feed (\n) |
先頭行をヘッダーとして | ✓ |
圧縮の種類 | 圧縮なし |
8 . [レビューと保存] ウィンドウにて内容を確認後、[保存と実行]を押下します。
9 . [データのコピー] アクティビティを含む新しいパイプラインが作成されます。また、パイプライン デザイナーの [出力]ペインで [アクティビティの状態]が成功になることを確認します。
左側ペインの作成したレイクハウスを押下します。
10 . [Files] を展開し、 [new_data] フォルダーを選択して、 [sales.csv] ファイルがコピーされていることを確認します。
3-4 ノートブックを作成する
1 . レイクハウスの [ホーム] ページの [ノートブックを開く] メニューで、 [新しいノートブック] を選択します。
2 . ノートブック内の既存のセルを選択します。既定のコードを次の変数宣言に置き換えます。
table_name = "sales"
3 . セルの […] メニュー (右上にあります) で、 パラメーター セルの切り替え (Toggle parameter cell)] を選択します。 これにより、セルは、パイプラインからノートブックを実行するときに、そのセル内で宣言された変数をパラメーターとして扱うように構成されます。
[ ▷ ] (実行)を押下します。
[ parameters ]セルとなります。
4 . セルすぐ下の [+コード](コード セルの追加)を押下します。
5 . 次にそのセルに次のコードを追加・実行します。
from pyspark.sql.functions import *
# Read the new sales data
df = spark.read.format("csv").option("header","true").load("Files/new_data/*.csv")
## Add month and year columns
df = df.withColumn("Year", year(col("OrderDate"))).withColumn("Month", month(col("OrderDate")))
# Derive FirstName and LastName columns
df = df.withColumn("FirstName", split(col("CustomerName"), " ").getItem(0)).withColumn("LastName", split(col("CustomerName"), " ").getItem(1))
# Filter and reorder columns
df = df["SalesOrderNumber", "SalesOrderLineNumber", "OrderDate", "Year", "Month", "FirstName", "LastName", "EmailAddress", "Item", "Quantity", "UnitPrice", "TaxAmount"]
# Load the data into a table
df.write.format("delta").mode("append").saveAsTable(table_name)
このコードは、 [データのコピー] アクティビティによって取り込まれた [sales.csv] ファイルからデータを読み込み、いくつかの変換ロジックを適用し、変換されたデータをテーブルとして保存します。テーブルが既に存在する場合は、データを追加します。
6 . ノートブックの実行が完了したら、 [Tables] の […] メニューにある [最新の情報に更新] を選択し、[sales] テーブルが作成されていることを確認します。
3-5 パイプラインを変更する
データを変換してテーブルに読み込むためのノートブックを実装したので、そのノートブックをパイプラインに組み込んで、再利用可能な ETL プロセスを作成できます。
1 . 左側のハブ メニュー バーで、先ほど作成した [Ingest Sales Data] パイプラインを選択します。
2 . [アクティビティ] タブの [すべてのアクティビティ] 一覧で、[データの削除] を選択します。
3 . 次に、次に示すように、新しいデータの削除アクティビティをデータのコピー アクティビティの左側に配置し、その [完了時] 出力をデータのコピー アクティビティに接続します。
4 . データの削除アクティビティを選択し、デザイン キャンバスの下のペインで、次のプロパティを設定します。
全般
項目 | 値 |
---|---|
名前 | Delete old files |
ソース
項目 | 値 |
---|---|
接続 | 自分が作成したレイクハウス |
ファイル パスの種類 | ワイルドカード ファイル パス |
フォルダーのパス | Files / new_data |
Wildcard file name | *.csv |
Recursively | ✓ |
ログの設定
項目 | 値 |
---|---|
ログを有効にする |
これらの設定により、sales.csv ファイルをコピーする前に、既存の .csv ファイルが確実に削除されます。
5 . [ノートブック] アクティビティを選択します。[ノートブック] アクティビティを[データのコピー] アクティビティの右側に移動させます。[データのコピー] アクティビティの [完了時] 出力を [ノートブック] アクティビティに接続します。
6 . [ノートブック] アクティビティを選択し、デザイン キャンバスの下のペインで、次のプロパティを設定します。
全般
項目 | 値 |
---|---|
名前 | Load Sales notebook |
設定
項目 | 値 |
---|---|
ワークスペース | 自分が作成したワークスペース |
ノートブック | Load Sales |
ベース パラメーター
名前 | 種類 | 種類 |
---|---|---|
table_name | String | new_sales |
table_name パラメーターはノートブックに渡され、[parameters] セル内の table_name 変数に割り当てられている既定値がオーバーライドされます。
7 . [ホーム] タブで [保存] を使用してパイプラインを保存します。
8 . 次に、[ ▷ ] [実行] を使用してパイプラインを実行し、すべてのアクティビティが完了するのを待ちます。
9 . [出力] ペインの [アクティビティの状態] がすべて成功であることを確認します。確認後、自身が作成した [レイクハウス]を押下します。
10 . [Tables] を展開し、 [new_sales] テーブルを選択して、それに含まれるデータのプレビューを表示します。このテーブルは、パイプラインによって実行された際の [ノートブック] アクティビティにより作成されました。
以上で演習は終了になります。
この演習では、Microsoft Fabric でパイプラインを実装する方法を学習しました。
3-6 リソースをクリーンアップする
1 . 【Azure】Microsoft Fabric レイクハウス内にあるファイルとテーブルにデータを取り込む。(3-7 リソースをクリーンアップする)をご参照ください。
以上でワークスペースが削除されます。お疲れ様でした!
本演習を通じて Microsoft Fabric の演習 が一覧化されていることを初めて知りましたので共有致します。