LoginSignup
2
0

More than 3 years have passed since last update.

【備忘録】Azure Databricksを使ってDataWarehouseにデータを書き出してみる

Last updated at Posted at 2020-06-11

Azureを触る機会があり、分析基盤についていろいろと学ぶ必要が出てきたので、Databricksの使い方について備忘録を残しておきます。

前提

下記リソースを作成しておきます。

  • Azure Data Lake Storage Gen2
  • Azure Blob Storage
  • Azure Synapse Analytics

サービスプリシンパルの作成

無人操作を実行する目的でテナント内で作成するAzure Active Directoryアプリのリソースです。
アプリID、パスワード、証明書が付与された独自のユーザーIDで、ロール割り当てやアクセス許可で定義されているタスクの実行権限が与えられます。
ここではアプリ登録し、Gen2にBLOB共同作成者のロールを割り当てていきます。

ハンバーガーアイコンをクリックし、Azure Active Directoryを選択します。
左メニューから「アプリの登録」を選択、任意の名前を入力して登録します。
※アプリケーションIDと、ついでにテナントIDを控えておきます。

Gen2リソースを開き、左メニューから「アクセス制御(IAM)」を選択します。
「ロールの割り当てを追加する」から下記を入力し保存します。

  • 役割:ストレージBLOB共同作成者
  • 選択:AADでアプリ登録したもの

作成したアプリを選択して、新しくクライアントシークレットを作成します。
AADから作成したアプリを開きます。
左メニューから「証明書とシークレット」を選択して追加します。
※有効期限は「1年」にしました。

忘れずに、クライアントシークレットの値を控えておきます。

Databricksリソースを作成

AzurePortalにサインインし、「リソースの作成」から「Azure Databricks」を選択します。
リソース作成.png

対象のサブスクリプション、リソースグループを選択します。
ワークスペース名を任意で入力します。
価格レベルは、ここでは「試用版」を選択しました。
ネットワークタブの「自分の仮想ネットワーク (VNet) に Azure Databricks ワークスペースをデプロイします」は「いいえ」にしました。

bricks構成.png

作成ボタンを押下しデプロイが終わるまで多少時間かかります。
※なお、Azure無料アカウントだと「従量課金制」にアップグレードしないとデプロイエラーになります。

クラスターの作成

Databricksのデプロイが終わったらリソースを開き、ワークスペースを起動します。
↓この画面です。
ワークスペース.png

Common Tasksの「New Cluster」からクラスターを作成します。
赤枠以外は既定値で作成しました。
「Terminate after 60 minutes of inactivity」は既定では「120」ですが「60」に変更しました。
※60分間動作しなかったらクラスターを落とすようです。無駄なリソースコストの削減にもつながります。
ワークスペース構成.png

作成が終わると起動処理が始まりますので、ワークスペース名の左側が「緑色の●」なるとOKです。
クラスタ2.png

Notebookを作成してデータ読込や編集、書出しをおこないます

左側のメニューから「Workspace」→「Workspace」→「Create」→「Notebook」を選択して新しいNotebookを作成します。

対話式のエディタが起動します。

このエディタで、Gen2からCSVファイルを読込んでデータフレームに格納します。
その後、必要な列の絞り込みや列名の変更をおこないます。
最後にDataWarehouseに新しいテーブルとして書き出します。

Gen2やテナント情報を変数に格納して設定します。

val storageAccountName = "Gen2のストレージアカウント名"
val appID = "アプリケーションID"
val password = "クライアントシークレット"
val fileSystemName = "Gen2のコンテナ名"
val tenantID = "テナントID"

spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + password + "")
spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")

githubからcsvデータを一時取込み

%sh wget -P /tmp https://raw.githubusercontent.com/■■■■■/validation/master/datarank.csv

一時取込みしたcsvデータをGen2へコピー

dbutils.fs.cp("file:///tmp/datarank.csv", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

データフレームに読込み

val df2 = spark.read.csv("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/datarank.csv")

中身を確認

df2.show()

こんな感じで中身が見れます。
データフレーム.png

必要な列だけに絞り込み

カラム「_C0」~「_C5」までに絞り込みます。

val specificColumnsDf2 = df2.select("_C0", "_C1", "_C2", "_C3", "_C4", "_C5")
specificColumnsDf2.show()

列名の変更

ここでは列名が分からないので、分かりやすい列名にします。
DataWarehouseに書き出す際に列名を合わせるために、、とかもあります。

val renamedColumnsDFtmp1 = specificColumnsDf2.withColumnRenamed("_C0", "SEQID")
specificColumnsDf2.show()

こんな感じで加工が終わりました。
加工済み.png

DataWarehouseに書き出し

Blobにテンポラリデータとして格納後、DataWarehouseに書き出す準備をします。

val blobStorage = "Blobストレージアカウント名.blob.core.windows.net"
val blobContainer = "Blobコンテナー名"
val blobAccessKey =  "Blobストレージのアクセスキー"

val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"

val acntInfo = "fs.azure.account.key."+ blobStorage
sc.hadoopConfiguration.set(acntInfo, blobAccessKey)

DataWarehouse情報の変数宣言と設定

val dwDatabase = "データベース名"
val dwServer = "SQLサーバー名.database.windows.net"
val dwUser = "SQLサーバー作成時のユーザー名"
val dwPass = "そのパスワード"
val dwJdbcPort =  "1433"
val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass

SampleTableというテーブルを作成して、データフレームのデータを格納

spark.conf.set(
    "spark.sql.parquet.writeLegacyFormat",
    "true")

renamedColumnsDF2.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable")       .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()

確認はDataWarehouseのクエリエディタからSSMSでできます。
とりあえず以上です。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0