LoginSignup
2
3

More than 3 years have passed since last update.

Azure Synapse AnalyticsのパイプラインからDatabricksでETL処理を行う

Last updated at Posted at 2020-07-08

Azure Synapse Analyticsとは

Azure Synapse は、Azure SQL Data Warehouse の進化版です
Azure Synapse は、エンタープライズ データ ウェアハウスとビッグ データ分析がまとめられた無制限の分析サービスです。サーバーレスまたはプロビジョニング済みリソースのいずれかを使用して、自分の条件でデータを自在かつ大規模にクエリすることができます。Azure Synapse では、これら 2 つの機能が 1 つのエクスペリエンスに統合され、データの取り込み、準備、管理、提供を行い、BI や機械学習でのニーズにすばやく対応することができます。

とあるように、分析のあらゆるニーズにこたえる統合的なプラットフォームです。もちろんAzure Data Factoryの機能も統合されているためパイプラインを記述することが可能です。今回行うことは基本的にADFで行うことと同じになります。

流れ

  • データを所定の場所にコピー
  • Databricksノートブックの実装
  • 動作の確認

Step1:パイプラインへのコンポーネントの配置

image.png

今回はCopy dataとDatabricsk notebookのコンポーネントのみを用います。
Copy dataに関しては、データソースとデータのシンク先を設定します。外部のデータソースからCSVを引っ張ってきてサブスクリプション内のStorageに保存するようなイメージです。

Databricksに関しては、以下のようなパラメータを設定します。このようにすることで、DatabricksのノートブックからStorageにアクセスするための情報をコード内に記述することなく環境変数的に実行することが可能になります。
image.png

Step2:Databricks側の設定

DatabricksのNotebook内で行うことは主に4つあります。

  • Storageへのアクセス情報が記載されたパラメータの取得
  • Storageへの接続
  • データの加工
  • データの保存

パラメータの取得はdbutilsライブラリを用いることで取得が可能です。

# widgets.text("参照名","初期値","表示名")
dbutils.widgets.text("accountName","","Account Name")
dbutils.widgets.text("accountKey","","Account Key")
dbutils.widgets.text("containerName","","Container Name")

accountName = dbutils.widgets.get("accountName")
accountKey = dbutils.widgets.get("accountKey")
containerName = dbutils.widgets.get("containerName")

取得した情報からStorageへの接続に必要な情報を構成し、接続します。今回はAzure Data Lake Storage Gen2を用いるため以下のように記載します。接続先によって若干異なるのでこちらを参照ください。

# 接続文字列を作成
# locals()はローカル変数の辞書を返すため、%locals()として変数で渡すことで%(containerName)等が%locals()の辞書からcontainerNameを呼び出せる
connectionString = "abfss://%(containerName)s@%(accountName)s.dfs.core.windows.net/" % locals()
# connectionString = "abfss://%(containerName)s@%(accountName)s.dfs.core.windows.net/"
print(connectionString)
# Storage Accountへの接続
spark.conf.set("fs.azure.account.key." + accountName + ".dfs.core.windows.net", accountKey)

#ファイルの読み込み
df = spark.read.option("header",True).csv("%(connectionString)s/databricksdata/data.csv" %locals())

適当にデータを加工します。

etldf = df.select(df["Year"],df["Month"],df["CRSDepTime"],df["DepDelay"],df["CRSArrTime"],df["ArrDelay"],df["OriginAirportName"],df["OriginLatitude"],df["OriginLongitude"],df["DestAirportName"])

加工したデータを保存します。

etldf.write.mode("overwrite").saveAsTable("airport")

保存するとDatabricksのDataタブからTableが確認できます。
image.png

最後に、Notebookを終了してレスポンスを返す部分をdbutilsで記述します。

import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "message": "Cleaned data and created persistent table",
  "tables": ["airport"]
}))

これでパイプラインの準備は整いました。

Step3:パイプラインの実行と監視

パイプラインのTrigger nowをクリックして実行します。
image.png
監視についてはSynapseは基本Monitorハブから行います。
image.png
詳細を確認したい実行に入ると以下のようなActivity毎のステータスが表示されます。
image.png
眼鏡のマークをクリックすると詳細が確認できます。
image.png
Databricksの場合はDatabrickのモニターへのリンクが提供され、どこまで実行されているか等コードベースで確認することができます。
image.png

作業は以上です。実際は手動でのTriggerではなく変更フィードやバッチ処理などが考えられると思います。Azure Synapse Studioは絶賛パブリックプレビュー中ですので遊んでみるのをお勧めします。私も分からないなりに遊んでいます笑

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3