はじめに
Azure Data Lake Storageに保存された大量のデータを変換しつつ、アプリで利用するためにCosmos DBにデータをロードする方法を模索していたところ、Cosmos DB Spark connectorを発見したので、これを試してみた。
Cosmos DB Spark connectorとは
SparkからCosmos DBのデータを簡単に並列で読み書きするためのライブラリーである。バッチとストリーム処理の両方に対して使用可能なツールである。AzureのPaaSとしてはDatabricksやHDInsightと一緒に使用可能。
https://docs.microsoft.com/ja-jp/azure/cosmos-db/spark-connector
https://docs.microsoft.com/ja-jp/azure/databricks/data/data-sources/azure/cosmosdb-connector
コンポーネント | バージョン |
---|---|
Apache Spark | 2.4.x, 2.3.x, 2.2.x, 2.1.x |
Scala | 2.11 |
使用にあたって以下に注意する必要がある。
- このコネクタはCosmos DB Core (SQL) APIのみをサポートしている。その他コネクタとしては MongoDB Connector for Spark、Spark Cassandra Connector がある。
- 現在のところ利用できる最新版がSpark2.4.xのため、Databricks 7.0以降のSpark 3.0.xではまだ動作させることはできない。
使用方法
ライブラリのインストール
以下からSpark/Scalaバージョンに合わせたコネクタをダウンロードしてDatabricksにインストールする。
https://docs.microsoft.com/ja-jp/azure/cosmos-db/spark-connector#bk_working_with_connector
クラスターの Library
タブからJARファイルをアップロードしてインストールする。インストール後は以下のようにライブラリーを確認することができる。
データの書き込み方法
Storageアクセス
今回はData Lake Storageに保存されたデータをCosmos DBに書き込むことを想定しているため、まずはStorageへのアクセスを行う。
またジョブクラスターで実行することも考慮し、パススルー認証ではなくService PrincipalによるOAuth 2.0認証を使用する。OAuth 2.0認証を使用する方法は過去の [この記事[(https://qiita.com/whata/items/47827586fc45b247a0d1) にまとめてある。
spark.conf.set("fs.azure.account.auth.type.<storage-account-name>.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.<storage-account-name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.<storage-account-name>.dfs.core.windows.net", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret.<storage-account-name>.dfs.core.windows.net", dbutils.secrets.get(scope="<scope-name>",key="<service-credential-key-name>"))
spark.conf.set("fs.azure.account.oauth2.client.endpoint.<storage-account-name>.dfs.core.windows.net", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
データをADLSから読み取ってDataFrameを作成する
ここではParquetファイルが格納されていることとしてデータをロードしている。
val load_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/path/to/*.parquet"
val df = spark.read.format("parquet").option("header", "true").option("inferSchema", "true").load(load_path)
Cosmos DBへの書き込み
書き込み方法は非常にシンプルで、Cosmos DBへの接続情報と書き込み先のDatabase、Collectionを指定して、そのConfigとともに書き込みを実行する。
// Write configuration
val writeConfig = Config(Map(
"Endpoint" -> "https://<cosmos-db-account-name>.documents.azure.com:443/",
"Masterkey" -> "<comsmos-db-master-key>",
"Database" -> "<database-name>",
"Collection" -> "<collection-name>",
"Upsert" -> "true"
))
// Write to Cosmos DB from the DataFrame
import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)
まとめ
非常に簡単にDatabricksとCosmos DBを接続することができた。数千万件のレコードを8 workerノードくらいで400 RUに設定したCosmos DBに書き込みしたら、一瞬でRUの上限に達して動作しなくなってしまった。非常に高速に実行できるが、書き込みリクエストの加減を調整することができないので、Cosmos DB側のRUを見てどのような単位で (小分けにするなどして) 実行するか等を考える必要がある。