Azureを触る機会があり、分析基盤についていろいろと学ぶ必要が出てきたので、Databricksの使い方について備忘録を残しておきます。
前提
下記リソースを作成しておきます。
- Azure Data Lake Storage Gen2
- Azure Blob Storage
- Azure Synapse Analytics
サービスプリシンパルの作成
無人操作を実行する目的でテナント内で作成するAzure Active Directoryアプリのリソースです。
アプリID、パスワード、証明書が付与された独自のユーザーIDで、ロール割り当てやアクセス許可で定義されているタスクの実行権限が与えられます。
ここではアプリ登録し、Gen2にBLOB共同作成者のロールを割り当てていきます。
ハンバーガーアイコンをクリックし、Azure Active Directoryを選択します。
左メニューから「アプリの登録」を選択、任意の名前を入力して登録します。
※アプリケーションIDと、ついでにテナントIDを控えておきます。
Gen2リソースを開き、左メニューから「アクセス制御(IAM)」を選択します。
「ロールの割り当てを追加する」から下記を入力し保存します。
- 役割:ストレージBLOB共同作成者
- 選択:AADでアプリ登録したもの
作成したアプリを選択して、新しくクライアントシークレットを作成します。
AADから作成したアプリを開きます。
左メニューから「証明書とシークレット」を選択して追加します。
※有効期限は「1年」にしました。
忘れずに、クライアントシークレットの値を控えておきます。
Databricksリソースを作成
AzurePortalにサインインし、「リソースの作成」から「Azure Databricks」を選択します。
対象のサブスクリプション、リソースグループを選択します。
ワークスペース名を任意で入力します。
価格レベルは、ここでは「試用版」を選択しました。
ネットワークタブの「自分の仮想ネットワーク (VNet) に Azure Databricks ワークスペースをデプロイします」は「いいえ」にしました。
作成ボタンを押下しデプロイが終わるまで多少時間かかります。
※なお、Azure無料アカウントだと「従量課金制」にアップグレードしないとデプロイエラーになります。
クラスターの作成
Databricksのデプロイが終わったらリソースを開き、ワークスペースを起動します。
↓この画面です。
Common Tasksの「New Cluster」からクラスターを作成します。
赤枠以外は既定値で作成しました。
「Terminate after 60 minutes of inactivity」は既定では「120」ですが「60」に変更しました。
※60分間動作しなかったらクラスターを落とすようです。無駄なリソースコストの削減にもつながります。
作成が終わると起動処理が始まりますので、ワークスペース名の左側が「緑色の●」なるとOKです。
Notebookを作成してデータ読込や編集、書出しをおこないます
左側のメニューから「Workspace」→「Workspace」→「Create」→「Notebook」を選択して新しいNotebookを作成します。
対話式のエディタが起動します。
このエディタで、Gen2からCSVファイルを読込んでデータフレームに格納します。
その後、必要な列の絞り込みや列名の変更をおこないます。
最後にDataWarehouseに新しいテーブルとして書き出します。
Gen2やテナント情報を変数に格納して設定します。
val storageAccountName = "Gen2のストレージアカウント名"
val appID = "アプリケーションID"
val password = "クライアントシークレット"
val fileSystemName = "Gen2のコンテナ名"
val tenantID = "テナントID"
spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + password + "")
spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
githubからcsvデータを一時取込み
%sh wget -P /tmp https://raw.githubusercontent.com/■■■■■/validation/master/datarank.csv
一時取込みしたcsvデータをGen2へコピー
dbutils.fs.cp("file:///tmp/datarank.csv", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")
データフレームに読込み
val df2 = spark.read.csv("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/datarank.csv")
中身を確認
df2.show()
必要な列だけに絞り込み
カラム「_C0」~「_C5」までに絞り込みます。
val specificColumnsDf2 = df2.select("_C0", "_C1", "_C2", "_C3", "_C4", "_C5")
specificColumnsDf2.show()
列名の変更
ここでは列名が分からないので、分かりやすい列名にします。
DataWarehouseに書き出す際に列名を合わせるために、、とかもあります。
val renamedColumnsDFtmp1 = specificColumnsDf2.withColumnRenamed("_C0", "SEQID")
specificColumnsDf2.show()
DataWarehouseに書き出し
Blobにテンポラリデータとして格納後、DataWarehouseに書き出す準備をします。
val blobStorage = "Blobストレージアカウント名.blob.core.windows.net"
val blobContainer = "Blobコンテナー名"
val blobAccessKey = "Blobストレージのアクセスキー"
val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
val acntInfo = "fs.azure.account.key."+ blobStorage
sc.hadoopConfiguration.set(acntInfo, blobAccessKey)
DataWarehouse情報の変数宣言と設定
val dwDatabase = "データベース名"
val dwServer = "SQLサーバー名.database.windows.net"
val dwUser = "SQLサーバー作成時のユーザー名"
val dwPass = "そのパスワード"
val dwJdbcPort = "1433"
val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
SampleTableというテーブルを作成して、データフレームのデータを格納
spark.conf.set(
"spark.sql.parquet.writeLegacyFormat",
"true")
renamedColumnsDF2.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable") .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()
確認はDataWarehouseのクエリエディタからSSMSでできます。
とりあえず以上です。